Update dashboard.py

- Updated WireguardConfiguration class to handle awg configuration files
- Added AmneziaWireguardConfiguration class as a subclass of WireguardConfiguration
This commit is contained in:
Donald Zou 2024-12-02 15:09:54 +08:00
parent b21cfe8504
commit 3340f9c6ee

View File

@ -445,7 +445,7 @@ class WireguardConfiguration:
def __str__(self): def __str__(self):
return self.message return self.message
def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False): def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False, wg: bool = True):
self.__parser: configparser.ConfigParser = configparser.ConfigParser(strict=False) self.__parser: configparser.ConfigParser = configparser.ConfigParser(strict=False)
@ -467,13 +467,14 @@ class WireguardConfiguration:
self.PostDown: str = "" self.PostDown: str = ""
self.SaveConfig: bool = True self.SaveConfig: bool = True
self.Name = name self.Name = name
self.__configPath = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf') self.Protocol = "wg" if wg else "awg"
self.__configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf') if wg else os.path.join(DashboardConfig.GetConfig("Server", "awg_conf_path")[1], f'{self.Name}.conf')
if name is not None: if name is not None:
if data is not None and "Backup" in data.keys(): if data is not None and "Backup" in data.keys():
db = self.__importDatabase( db = self.__importDatabase(
os.path.join( os.path.join(
DashboardConfig.GetConfig("Server", "wg_conf_path")[1], self.__getProtocolPath(),
'WGDashboard_Backup', 'WGDashboard_Backup',
data["Backup"].replace(".conf", ".sql"))) data["Backup"].replace(".conf", ".sql")))
else: else:
@ -484,7 +485,7 @@ class WireguardConfiguration:
else: else:
self.Name = data["ConfigurationName"] self.Name = data["ConfigurationName"]
self.__configPath = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf') self.__configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf')
for i in dir(self): for i in dir(self):
if str(i) in data.keys(): if str(i) in data.keys():
@ -515,6 +516,10 @@ class WireguardConfiguration:
self.toggleConfiguration() self.toggleConfiguration()
print(f"[WGDashboard] Autostart Configuration: {name}") print(f"[WGDashboard] Autostart Configuration: {name}")
def __getProtocolPath(self):
return DashboardConfig.GetConfig("Server", "wg_conf_path")[1] if self.Protocol == "wg" \
else DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
def __initPeersList(self): def __initPeersList(self):
self.Peers: list[Peer] = [] self.Peers: list[Peer] = []
self.getPeersList() self.getPeersList()
@ -668,7 +673,7 @@ class WireguardConfiguration:
self.RestrictedPeers.append(Peer(i, self)) self.RestrictedPeers.append(Peer(i, self))
def configurationFileChanged(self) : def configurationFileChanged(self) :
mt = os.path.getmtime(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf')) mt = os.path.getmtime(self.__configPath)
changed = self.__configFileModifiedTime is None or self.__configFileModifiedTime != mt changed = self.__configFileModifiedTime is None or self.__configFileModifiedTime != mt
self.__configFileModifiedTime = mt self.__configFileModifiedTime = mt
return changed return changed
@ -676,7 +681,7 @@ class WireguardConfiguration:
def __getPeers(self): def __getPeers(self):
if self.configurationFileChanged(): if self.configurationFileChanged():
self.Peers = [] self.Peers = []
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'r') as configFile: with open(self.__configPath, 'r') as configFile:
p = [] p = []
pCounter = -1 pCounter = -1
content = configFile.read().split('\n') content = configFile.read().split('\n')
@ -1034,21 +1039,21 @@ class WireguardConfiguration:
} }
def backupConfigurationFile(self): def backupConfigurationFile(self):
if not os.path.exists(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup')): if not os.path.exists(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup')):
os.mkdir(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup')) os.mkdir(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup'))
time = datetime.now().strftime("%Y%m%d%H%M%S") time = datetime.now().strftime("%Y%m%d%H%M%S")
shutil.copy( shutil.copy(
self.__configPath, self.__configPath,
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f'{self.Name}_{time}.conf') os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f'{self.Name}_{time}.conf')
) )
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f'{self.Name}_{time}.sql'), 'w+') as f: with open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f'{self.Name}_{time}.sql'), 'w+') as f:
for l in self.__dumpDatabase(): for l in self.__dumpDatabase():
f.write(l + "\n") f.write(l + "\n")
def getBackups(self, databaseContent: bool = False) -> list[dict[str: str, str: str, str: str]]: def getBackups(self, databaseContent: bool = False) -> list[dict[str: str, str: str, str: str]]:
backups = [] backups = []
directory = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup') directory = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup')
files = [(file, os.path.getctime(os.path.join(directory, file))) files = [(file, os.path.getctime(os.path.join(directory, file)))
for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))] for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
files.sort(key=lambda x: x[1], reverse=True) files.sort(key=lambda x: x[1], reverse=True)
@ -1060,12 +1065,12 @@ class WireguardConfiguration:
d = { d = {
"filename": f, "filename": f,
"backupDate": date, "backupDate": date,
"content": open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f), 'r').read() "content": open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f), 'r').read()
} }
if f.replace(".conf", ".sql") in list(os.listdir(directory)): if f.replace(".conf", ".sql") in list(os.listdir(directory)):
d['database'] = True d['database'] = True
if databaseContent: if databaseContent:
d['databaseContent'] = open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read() d['databaseContent'] = open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read()
backups.append(d) backups.append(d)
return backups return backups
@ -1077,13 +1082,13 @@ class WireguardConfiguration:
self.backupConfigurationFile() self.backupConfigurationFile()
if self.Status: if self.Status:
self.toggleConfiguration() self.toggleConfiguration()
target = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName) target = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName)
targetSQL = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName.replace(".conf", ".sql")) targetSQL = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName.replace(".conf", ".sql"))
if not os.path.exists(target): if not os.path.exists(target):
return False return False
targetContent = open(target, 'r').read() targetContent = open(target, 'r').read()
try: try:
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'w') as f: with open(self.__configPath, 'w') as f:
f.write(targetContent) f.write(targetContent)
except Exception as e: except Exception as e:
return False return False
@ -1098,7 +1103,7 @@ class WireguardConfiguration:
if backupFileName not in backups: if backupFileName not in backups:
return False return False
try: try:
os.remove(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName)) os.remove(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName))
except Exception as e: except Exception as e:
return False return False
return True return True
@ -1108,7 +1113,7 @@ class WireguardConfiguration:
self.toggleConfiguration() self.toggleConfiguration()
original = [] original = []
dataChanged = False dataChanged = False
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'r') as f: with open(self.__configPath, 'r') as f:
original = [l.rstrip("\n") for l in f.readlines()] original = [l.rstrip("\n") for l in f.readlines()]
allowEdit = ["Address", "PreUp", "PostUp", "PreDown", "PostDown", "ListenPort"] allowEdit = ["Address", "PreUp", "PostUp", "PreDown", "PostDown", "ListenPort"]
start = original.index("[Interface]") start = original.index("[Interface]")
@ -1129,7 +1134,7 @@ class WireguardConfiguration:
for line in range(end, len(original)): for line in range(end, len(original)):
new.append(original[line]) new.append(original[line])
self.backupConfigurationFile() self.backupConfigurationFile()
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'w') as f: with open(self.__configPath, 'w') as f:
f.write("\n".join(new)) f.write("\n".join(new))
status, msg = self.toggleConfiguration() status, msg = self.toggleConfiguration()
@ -1158,7 +1163,7 @@ class WireguardConfiguration:
AllPeerJobs.updateJobConfigurationName(self.Name, newConfigurationName) AllPeerJobs.updateJobConfigurationName(self.Name, newConfigurationName)
shutil.copy( shutil.copy(
self.__configPath, self.__configPath,
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{newConfigurationName}.conf') os.path.join(self.__getProtocolPath(), f'{newConfigurationName}.conf')
) )
self.deleteConfiguration() self.deleteConfiguration()
except Exception as e: except Exception as e:
@ -1201,6 +1206,24 @@ class WireguardConfiguration:
break break
return True, availableAddress return True, availableAddress
"""
AmneziaWG Configuration
"""
class AmneziaWireguardConfiguration(WireguardConfiguration):
def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False):
self.Jc = 0
self.Jmin = 0
self.Jmax = 0
self.S1 = 0
self.S2 = 0
self.H1 = 1
self.H2 = 2
self.H3 = 3
self.H4 = 4
super().__init__(name, data, backup, startup, wg=False)
""" """
Peer Peer
""" """
@ -1384,6 +1407,7 @@ class DashboardConfig:
}, },
"Server": { "Server": {
"wg_conf_path": "/etc/wireguard", "wg_conf_path": "/etc/wireguard",
"awg_conf_path": "/etc/amnezia/amneziawg",
"app_prefix": "", "app_prefix": "",
"app_ip": "0.0.0.0", "app_ip": "0.0.0.0",
"app_port": "10086", "app_port": "10086",
@ -1708,6 +1732,7 @@ def API_SignOut():
@app.route(f'{APP_PREFIX}/api/getWireguardConfigurations', methods=["GET"]) @app.route(f'{APP_PREFIX}/api/getWireguardConfigurations', methods=["GET"])
def API_getWireguardConfigurations(): def API_getWireguardConfigurations():
InitWireguardConfigurationsList() InitWireguardConfigurationsList()
InitAmneziaWireguardConfigurationsList()
return ResponseObject(data=[wc for wc in WireguardConfigurations.values()]) return ResponseObject(data=[wc for wc in WireguardConfigurations.values()])
@app.route(f'{APP_PREFIX}/api/addWireguardConfiguration', methods=["POST"]) @app.route(f'{APP_PREFIX}/api/addWireguardConfiguration', methods=["POST"])
@ -1907,8 +1932,10 @@ def API_updateDashboardConfigurationItem():
if data['section'] == "Server": if data['section'] == "Server":
if data['key'] == 'wg_conf_path': if data['key'] == 'wg_conf_path':
WireguardConfigurations.clear()
WireguardConfigurations.clear() WireguardConfigurations.clear()
InitWireguardConfigurationsList() InitWireguardConfigurationsList()
InitAmneziaWireguardConfigurationsList()
return ResponseObject(True, data=DashboardConfig.GetConfig(data["section"], data["key"])[1]) return ResponseObject(True, data=DashboardConfig.GetConfig(data["section"], data["key"])[1])
@ -2614,6 +2641,23 @@ def InitWireguardConfigurationsList(startup: bool = False):
except WireguardConfiguration.InvalidConfigurationFileException as e: except WireguardConfiguration.InvalidConfigurationFileException as e:
print(f"{i} have an invalid configuration file.") print(f"{i} have an invalid configuration file.")
confs = os.listdir(DashboardConfig.GetConfig("Server", "awg_conf_path")[1])
confs.sort()
for i in confs:
if RegexMatch("^(.{1,}).(conf)$", i):
i = i.replace('.conf', '')
try:
if i in WireguardConfigurations.keys():
if WireguardConfigurations[i].configurationFileChanged():
WireguardConfigurations[i] = AmneziaWireguardConfiguration(i)
else:
WireguardConfigurations[i] = AmneziaWireguardConfiguration(i, startup=startup)
except WireguardConfigurations.InvalidConfigurationFileException as e:
print(f"{i} have an invalid configuration file.")
def InitAmneziaWireguardConfigurationsList(startup: bool = False):
pass
AllPeerShareLinks: PeerShareLinks = PeerShareLinks() AllPeerShareLinks: PeerShareLinks = PeerShareLinks()
AllPeerJobs: PeerJobs = PeerJobs() AllPeerJobs: PeerJobs = PeerJobs()
JobLogger: PeerJobLogger = PeerJobLogger() JobLogger: PeerJobLogger = PeerJobLogger()
@ -2623,7 +2667,9 @@ _, app_port = DashboardConfig.GetConfig("Server", "app_port")
_, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path") _, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path")
WireguardConfigurations: dict[str, WireguardConfiguration] = {} WireguardConfigurations: dict[str, WireguardConfiguration] = {}
AmneziaWireguardConfigurations: dict[str, AmneziaWireguardConfiguration] = {}
InitWireguardConfigurationsList(startup=True) InitWireguardConfigurationsList(startup=True)
InitAmneziaWireguardConfigurationsList(startup=True)
def startThreads(): def startThreads():
bgThread = threading.Thread(target=backGroundThread) bgThread = threading.Thread(target=backGroundThread)