Peer restore works

Still need to SQL Dump the table aswell
This commit is contained in:
Donald Zou
2024-10-16 17:44:49 +08:00
parent 51712ed2a8
commit e2e821881c
6 changed files with 367 additions and 138 deletions

View File

@@ -461,27 +461,10 @@ class WireguardConfiguration:
self.PreDown: str = ""
self.PostDown: str = ""
self.SaveConfig: bool = True
self.Name = name
if name is not None:
self.Name = name
self.__parser.read_file(open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf')))
sections = self.__parser.sections()
if "Interface" not in sections:
raise self.InvalidConfigurationFileException(
"[Interface] section not found in " + os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'))
interfaceConfig = dict(self.__parser.items("Interface", True))
for i in dir(self):
if str(i) in interfaceConfig.keys():
if isinstance(getattr(self, i), bool):
setattr(self, i, _strToBool(interfaceConfig[i]))
else:
setattr(self, i, interfaceConfig[i])
if self.PrivateKey:
self.PublicKey = self.__getPublicKey()
self.Status = self.getStatus()
self.__parseConfigurationFile()
else:
self.Name = data["ConfigurationName"]
for i in dir(self):
@@ -505,10 +488,33 @@ class WireguardConfiguration:
f"{self.Name}.conf"), "w+") as configFile:
self.__parser.write(configFile)
self.Peers: list[Peer] = []
self.__createDatabase()
self.__initPeersList()
def __initPeersList(self):
self.Peers: list[Peer] = []
self.getPeersList()
self.getRestrictedPeersList()
def __parseConfigurationFile(self):
self.__parser.read_file(open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf')))
sections = self.__parser.sections()
if "Interface" not in sections:
raise self.InvalidConfigurationFileException(
"[Interface] section not found in " + os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'))
interfaceConfig = dict(self.__parser.items("Interface", True))
for i in dir(self):
if str(i) in interfaceConfig.keys():
if isinstance(getattr(self, i), bool):
setattr(self, i, _strToBool(interfaceConfig[i]))
else:
setattr(self, i, interfaceConfig[i])
if self.PrivateKey:
self.PublicKey = self.__getPublicKey()
self.Status = self.getStatus()
def __createDatabase(self):
existingTables = sqlSelect("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
@@ -923,10 +929,9 @@ class WireguardConfiguration:
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'),
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f'{self.Name}_{datetime.now().strftime("%Y%m%d%H%M%S")}.conf')
)
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'w') as f:
f.write("\n".join(original))
def getBackups(self):
def getBackups(self) -> list[dict[str: str, str: str, str: str]]:
backups = []
directory = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup')
@@ -946,8 +951,35 @@ class WireguardConfiguration:
return backups
def restoreBackup(self, backupFileName: str):
pass
def restoreBackup(self, backupFileName: str) -> bool:
backups = list(map(lambda x : x['filename'], self.getBackups()))
if backupFileName not in backups:
return False
self.backupConfigurationFile()
if self.Status:
self.toggleConfiguration()
target = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName)
if not os.path.exists(target):
return False
targetContent = open(target, 'r').read()
try:
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'w') as f:
f.write(targetContent)
except Exception as e:
return False
self.__parseConfigurationFile()
self.__initPeersList()
return True
def deleteBackup(self, backupFileName: str) -> bool:
backups = list(map(lambda x : x['filename'], self.getBackups()))
if backupFileName not in backups:
return False
try:
os.remove(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', backupFileName))
except Exception as e:
return False
return True
def updateConfigurationSettings(self, newData: dict) -> tuple[bool, str]:
if self.Status:
@@ -976,6 +1008,8 @@ class WireguardConfiguration:
dataChanged = True
print(original[line])
if dataChanged:
with open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf'), 'w') as f:
f.write("\n".join(original))
self.backupConfigurationFile()
@@ -1691,6 +1725,46 @@ def API_getWireguardConfigurationBackup():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(data=WireguardConfigurations[configurationName].getBackups())
@app.get(f'{APP_PREFIX}/api/createWireguardConfigurationBackup')
def API_createWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(status=WireguardConfigurations[configurationName].backupConfigurationFile(),
data=WireguardConfigurations[configurationName].getBackups())
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfigurationBackup')
def API_deleteWireguardConfigurationBackup():
data = request.get_json()
if ("configurationName" not in data.keys() or
"backupFileName" not in data.keys() or
len(data['configurationName']) == 0 or
len(data['backupFileName']) == 0):
return ResponseObject(False,
"Please provide configurationName and backupFileName in body")
configurationName = data['configurationName']
backupFileName = data['backupFileName']
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(WireguardConfigurations[configurationName].deleteBackup(backupFileName))
@app.post(f'{APP_PREFIX}/api/restoreWireguardConfigurationBackup')
def API_restoreWireguardConfigurationBackup():
data = request.get_json()
if ("configurationName" not in data.keys() or
"backupFileName" not in data.keys() or
len(data['configurationName']) == 0 or
len(data['backupFileName']) == 0):
return ResponseObject(False,
"Please provide configurationName and backupFileName in body")
configurationName = data['configurationName']
backupFileName = data['backupFileName']
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(WireguardConfigurations[configurationName].restoreBackup(backupFileName))
@app.get(f'{APP_PREFIX}/api/getDashboardConfiguration')
def API_getDashboardConfiguration():
return ResponseObject(data=DashboardConfig.toJson())