#!/usr/bin/env python3 # -*- coding: utf-8 -*- import ipaddress import logging import os import qrcode import textwrap import wgconfig import wgconfig.wgexec as wgexec logger = logging.getLogger(__name__) class WGCfg(): """Class for reading/writing the WireGuard configuration file""" def __init__(self, filename, libdir): """Initialize instance for the given config file""" self.filename = filename self.libdir = libdir self.wc = wgconfig.WGConfig(self.filename) self.wc.read_file() def get_interface(self): """Get WireGuard interface data""" return self.wc.interface def transform_to_clientdata(self, peer, peerdata): """Transform data of a single peer from server into a dictionary of client config data""" result = dict() description = peerdata['_rawdata'][0] if description[0] == '#': description = description[2:] else: description = 'Peer: ' + peer result['Description'] = description for item in peerdata['_rawdata']: if item.startswith('# PrivateKey = '): result['PrivateKey'] = item[15:] result['PublicKey'] = peer result['PresharedKey'] = peerdata['PresharedKey'] address = peerdata['AllowedIPs'].partition(',')[0] # get first allowed ip range address = address.partition('/')[0] + '/' + self.get_interface()['Address'].partition('/')[2] # take prefix length from interface address result['Address'] = address result['Id'] = address.partition('/')[0].replace('.', '-') result['QRCode'] = os.path.join(self.libdir, result['Id'] + '.png') return result def get_peer(self, peer): """Get data of the given WireGuard peer""" return self.transform_to_clientdata(peer, self.wc.peers[peer]) def get_peers(self): """Get data of all WireGuard peers""" return { peer: self.get_peer(peer) for peer in self.wc.peers.keys() } def get_peer_byid(self, id): """Get data WireGuard peer with the given id""" peer = next(peer for peer, peerdata in self.get_peers().items() if peerdata['Id'] == id) return peer, self.get_peer(peer) def get_peerconfig(self, peer): """Get config for the given WireGuard peer""" peerdata = self.get_peer(peer) for item in self.get_interface()['_rawdata']: if item.startswith('# Endpoint = '): endpoint = item[13:] if item.startswith('# Networks = '): allowed_ips = item[13:] public_key = wgexec.get_publickey(peerdata['PrivateKey']) public_key_server = wgexec.get_publickey(self.get_interface()['PrivateKey']) config = textwrap.dedent(f'''\ # {peerdata['Description']} [Interface] ListenPort = 51820 PrivateKey = {peerdata['PrivateKey']} # PublicKey = {public_key} Address = {peerdata['Address']} [Peer] Endpoint = {endpoint} PublicKey = {public_key_server} PresharedKey = {peerdata['PresharedKey']} AllowedIPs = {allowed_ips} PersistentKeepalive = 25 ''') return config, peerdata def create_peer(self, description, ip=None): """Create peer with the given description""" if ip is None: ip = self.find_free_ip() private_key = wgexec.generate_privatekey() peer = wgexec.get_publickey(private_key) self.wc.add_peer(peer, '# ' + description) comment = '# PrivateKey = ' + private_key self.wc.add_attr(peer, 'PresharedKey', wgexec.generate_presharedkey(), comment, append_as_line=True) self.wc.add_attr(peer, 'AllowedIPs', ip + '/32') self.wc.add_attr(peer, 'PersistentKeepalive', 25) self.wc.write_file() self.write_qrcode(peer) return peer def update_peer(self, peer, description): """Update the given peer""" peerdata = self.wc.peers[peer] first_line = peerdata['_index_firstline'] if self.wc.lines[first_line][0] != '#': raise ValueError(f'Comment expected in first line of config for peer [{peerdata}]') self.wc.lines[first_line] = '# ' + description self.wc.invalidate_data() self.wc.write_file() return self.get_peer(peer) def delete_peer(self, peer): """Delete the given peer""" self.wc.del_peer(peer) self.wc.write_file() def find_free_ip(self): """Find the first free address in the given network""" interface_address = ipaddress.ip_interface(self.get_interface()['Address']) network = interface_address.network interface_address = interface_address.ip addresses = [ ipaddress.ip_interface(peerdata['Address']).ip for peer, peerdata in self.get_peers().items() ] print(addresses) # *** ip = None for addr in network.hosts(): if addr == interface_address: continue if addr in addresses: continue ip = addr break if ip is None: raise ValueError('No free IP address available any more') return str(ip) def write_qrcode(self, peer): """Generate a QRCode for the given peers configuration file and store in lib directory""" config, peerdata = self.get_peerconfig(peer) #img = qrcode.make(config) qr = qrcode.QRCode(version=15, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=2, border=5) qr.add_data(config) qr.make(fit=True) img = qr.make_image(fill_color='black', back_color='white') img.save(peerdata['QRCode']) if __name__ == '__main__': import pprint wg = WGCfg('/etc/wireguard/wg_rw.conf', '/var/lib/wgfrontend') peer = wg.create_peer('This is a first test peer') peer2 = wg.create_peer('This is a second test peer') wg.update_peer(peer2, 'CHANGED') print(wg.get_peerconfig(peer2)[0]) wg.delete_peer(peer) wg.delete_peer(peer2)