From 3340f9c6eeec4a3a08ef35650160dd247d322112 Mon Sep 17 00:00:00 2001 From: Donald Zou Date: Mon, 2 Dec 2024 15:09:54 +0800 Subject: [PATCH] Update dashboard.py - Updated WireguardConfiguration class to handle awg configuration files - Added AmneziaWireguardConfiguration class as a subclass of WireguardConfiguration --- src/dashboard.py | 86 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 20 deletions(-) diff --git a/src/dashboard.py b/src/dashboard.py index 4b9c1de..d4fa03b 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -445,7 +445,7 @@ class WireguardConfiguration: def __str__(self): return self.message - def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False): + def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False, wg: bool = True): self.__parser: configparser.ConfigParser = configparser.ConfigParser(strict=False) @@ -467,13 +467,14 @@ class WireguardConfiguration: self.PostDown: str = "" self.SaveConfig: bool = True self.Name = name - self.__configPath = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf') + self.Protocol = "wg" if wg else "awg" + self.__configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf') if wg else os.path.join(DashboardConfig.GetConfig("Server", "awg_conf_path")[1], f'{self.Name}.conf') if name is not None: if data is not None and "Backup" in data.keys(): db = self.__importDatabase( os.path.join( - DashboardConfig.GetConfig("Server", "wg_conf_path")[1], + self.__getProtocolPath(), 'WGDashboard_Backup', data["Backup"].replace(".conf", ".sql"))) else: @@ -484,7 +485,7 @@ class WireguardConfiguration: else: self.Name = data["ConfigurationName"] - self.__configPath = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf') + self.__configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf') for i in dir(self): if str(i) in data.keys(): @@ -515,6 +516,10 @@ class WireguardConfiguration: self.toggleConfiguration() print(f"[WGDashboard] Autostart Configuration: {name}") + def __getProtocolPath(self): + return DashboardConfig.GetConfig("Server", "wg_conf_path")[1] if self.Protocol == "wg" \ + else DashboardConfig.GetConfig("Server", "awg_conf_path")[1] + def __initPeersList(self): self.Peers: list[Peer] = [] self.getPeersList() @@ -668,7 +673,7 @@ class WireguardConfiguration: self.RestrictedPeers.append(Peer(i, self)) def configurationFileChanged(self) : - mt = os.path.getmtime(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf')) + mt = os.path.getmtime(self.__configPath) changed = self.__configFileModifiedTime is None or self.__configFileModifiedTime != mt self.__configFileModifiedTime = mt return changed @@ -676,7 +681,7 @@ class WireguardConfiguration: def __getPeers(self): if self.configurationFileChanged(): self.Peers = [] - with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'r') as configFile: + with open(self.__configPath, 'r') as configFile: p = [] pCounter = -1 content = configFile.read().split('\n') @@ -1034,21 +1039,21 @@ class WireguardConfiguration: } def backupConfigurationFile(self): - if not os.path.exists(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup')): - os.mkdir(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup')) + if not os.path.exists(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup')): + os.mkdir(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup')) time = datetime.now().strftime("%Y%m%d%H%M%S") shutil.copy( self.__configPath, - os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f'{self.Name}_{time}.conf') + os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f'{self.Name}_{time}.conf') ) - with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f'{self.Name}_{time}.sql'), 'w+') as f: + with open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f'{self.Name}_{time}.sql'), 'w+') as f: for l in self.__dumpDatabase(): f.write(l + "\n") def getBackups(self, databaseContent: bool = False) -> list[dict[str: str, str: str, str: str]]: backups = [] - directory = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup') + directory = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup') files = [(file, os.path.getctime(os.path.join(directory, file))) for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))] files.sort(key=lambda x: x[1], reverse=True) @@ -1060,12 +1065,12 @@ class WireguardConfiguration: d = { "filename": f, "backupDate": date, - "content": open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f), 'r').read() + "content": open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f), 'r').read() } if f.replace(".conf", ".sql") in list(os.listdir(directory)): d['database'] = True if databaseContent: - d['databaseContent'] = open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read() + d['databaseContent'] = open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read() backups.append(d) return backups @@ -1077,13 +1082,13 @@ class WireguardConfiguration: self.backupConfigurationFile() if self.Status: self.toggleConfiguration() - target = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName) - targetSQL = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName.replace(".conf", ".sql")) + target = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName) + targetSQL = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName.replace(".conf", ".sql")) if not os.path.exists(target): return False targetContent = open(target, 'r').read() try: - with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'w') as f: + with open(self.__configPath, 'w') as f: f.write(targetContent) except Exception as e: return False @@ -1098,7 +1103,7 @@ class WireguardConfiguration: if backupFileName not in backups: return False try: - os.remove(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName)) + os.remove(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName)) except Exception as e: return False return True @@ -1108,7 +1113,7 @@ class WireguardConfiguration: self.toggleConfiguration() original = [] dataChanged = False - with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'r') as f: + with open(self.__configPath, 'r') as f: original = [l.rstrip("\n") for l in f.readlines()] allowEdit = ["Address", "PreUp", "PostUp", "PreDown", "PostDown", "ListenPort"] start = original.index("[Interface]") @@ -1129,7 +1134,7 @@ class WireguardConfiguration: for line in range(end, len(original)): new.append(original[line]) self.backupConfigurationFile() - with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'w') as f: + with open(self.__configPath, 'w') as f: f.write("\n".join(new)) status, msg = self.toggleConfiguration() @@ -1158,7 +1163,7 @@ class WireguardConfiguration: AllPeerJobs.updateJobConfigurationName(self.Name, newConfigurationName) shutil.copy( self.__configPath, - os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{newConfigurationName}.conf') + os.path.join(self.__getProtocolPath(), f'{newConfigurationName}.conf') ) self.deleteConfiguration() except Exception as e: @@ -1201,6 +1206,24 @@ class WireguardConfiguration: break return True, availableAddress +""" +AmneziaWG Configuration +""" + +class AmneziaWireguardConfiguration(WireguardConfiguration): + def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False): + self.Jc = 0 + self.Jmin = 0 + self.Jmax = 0 + self.S1 = 0 + self.S2 = 0 + self.H1 = 1 + self.H2 = 2 + self.H3 = 3 + self.H4 = 4 + + super().__init__(name, data, backup, startup, wg=False) + """ Peer """ @@ -1384,6 +1407,7 @@ class DashboardConfig: }, "Server": { "wg_conf_path": "/etc/wireguard", + "awg_conf_path": "/etc/amnezia/amneziawg", "app_prefix": "", "app_ip": "0.0.0.0", "app_port": "10086", @@ -1708,6 +1732,7 @@ def API_SignOut(): @app.route(f'{APP_PREFIX}/api/getWireguardConfigurations', methods=["GET"]) def API_getWireguardConfigurations(): InitWireguardConfigurationsList() + InitAmneziaWireguardConfigurationsList() return ResponseObject(data=[wc for wc in WireguardConfigurations.values()]) @app.route(f'{APP_PREFIX}/api/addWireguardConfiguration', methods=["POST"]) @@ -1907,8 +1932,10 @@ def API_updateDashboardConfigurationItem(): if data['section'] == "Server": if data['key'] == 'wg_conf_path': + WireguardConfigurations.clear() WireguardConfigurations.clear() InitWireguardConfigurationsList() + InitAmneziaWireguardConfigurationsList() return ResponseObject(True, data=DashboardConfig.GetConfig(data["section"], data["key"])[1]) @@ -2614,6 +2641,23 @@ def InitWireguardConfigurationsList(startup: bool = False): except WireguardConfiguration.InvalidConfigurationFileException as e: print(f"{i} have an invalid configuration file.") + confs = os.listdir(DashboardConfig.GetConfig("Server", "awg_conf_path")[1]) + confs.sort() + for i in confs: + if RegexMatch("^(.{1,}).(conf)$", i): + i = i.replace('.conf', '') + try: + if i in WireguardConfigurations.keys(): + if WireguardConfigurations[i].configurationFileChanged(): + WireguardConfigurations[i] = AmneziaWireguardConfiguration(i) + else: + WireguardConfigurations[i] = AmneziaWireguardConfiguration(i, startup=startup) + except WireguardConfigurations.InvalidConfigurationFileException as e: + print(f"{i} have an invalid configuration file.") + +def InitAmneziaWireguardConfigurationsList(startup: bool = False): + pass + AllPeerShareLinks: PeerShareLinks = PeerShareLinks() AllPeerJobs: PeerJobs = PeerJobs() JobLogger: PeerJobLogger = PeerJobLogger() @@ -2623,7 +2667,9 @@ _, app_port = DashboardConfig.GetConfig("Server", "app_port") _, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path") WireguardConfigurations: dict[str, WireguardConfiguration] = {} +AmneziaWireguardConfigurations: dict[str, AmneziaWireguardConfiguration] = {} InitWireguardConfigurationsList(startup=True) +InitAmneziaWireguardConfigurationsList(startup=True) def startThreads(): bgThread = threading.Thread(target=backGroundThread)