Backup and restore for AWG is done

This commit is contained in:
Donald Zou
2024-12-04 17:50:16 +08:00
parent 434c236210
commit 57583b6747
7 changed files with 144 additions and 83 deletions

View File

@@ -509,6 +509,7 @@ class WireguardConfiguration:
self.createDatabase()
with open(self.configPath, "w+") as configFile:
self.__parser.write(configFile)
print(f"[WGDashboard] Configuration file {self.configPath} created")
self.__initPeersList()
print(f"[WGDashboard] Initialized Configuration: {name}")
@@ -2086,11 +2087,14 @@ def API_getWireguardConfigurations():
def API_addWireguardConfiguration():
data = request.get_json()
requiredKeys = [
"ConfigurationName", "Address", "ListenPort", "PrivateKey"
"ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol"
]
for i in requiredKeys:
if i not in data.keys():
return ResponseObject(False, "Please provide all required parameters.")
if data.get("Protocol") not in ProtocolsEnabled():
return ResponseObject(False, "Please provide a valid protocol: wg / awg.")
# Check duplicate names, ports, address
for i in WireguardConfigurations.values():
@@ -2110,22 +2114,27 @@ def API_addWireguardConfiguration():
"Address")
if "Backup" in data.keys():
if not os.path.exists(os.path.join(
DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
'WGDashboard_Backup',
data["Backup"])) or not os.path.exists(os.path.join(
DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
'WGDashboard_Backup',
data["Backup"].replace('.conf', '.sql'))):
return ResponseObject(False, "Backup file does not exist")
path = {
"wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
"awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
}
if (os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"])) and
os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
protocol = "wg"
elif (os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"])) and
os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
protocol = "awg"
else:
return ResponseObject(False, "Backup does not exist")
shutil.copy(
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', data["Backup"]),
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{data["ConfigurationName"]}.conf')
os.path.join(path[protocol], 'WGDashboard_Backup', data["Backup"]),
os.path.join(path[protocol], f'{data["ConfigurationName"]}.conf')
)
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data, name=data['ConfigurationName'])
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data, name=data['ConfigurationName']) if protocol == 'wg' else AmneziaWireguardConfiguration(data=data, name=data['ConfigurationName'])
else:
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data)
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data) if data.get('Protocol') == 'wg' else AmneziaWireguardConfiguration(data=data)
return ResponseObject()
@app.get(f'{APP_PREFIX}/api/toggleWireguardConfiguration/')
@@ -2197,30 +2206,32 @@ def API_getAllWireguardConfigurationBackup():
b = WireguardConfigurations[i].getBackups(True)
if len(b) > 0:
data['ExistingConfigurations'][i] = WireguardConfigurations[i].getBackups(True)
directory = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup')
files = [(file, os.path.getctime(os.path.join(directory, file)))
for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
files.sort(key=lambda x: x[1], reverse=True)
for f, ct in files:
if RegexMatch(r"^(.*)_(.*)\.(conf)$", f):
s = re.search(r"^(.*)_(.*)\.(conf)$", f)
name = s.group(1)
if name not in existingConfiguration:
if name not in data['NonExistingConfigurations'].keys():
data['NonExistingConfigurations'][name] = []
date = s.group(2)
d = {
"filename": f,
"backupDate": date,
"content": open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f), 'r').read()
}
if f.replace(".conf", ".sql") in list(os.listdir(directory)):
d['database'] = True
d['databaseContent'] = open(os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read()
data['NonExistingConfigurations'][name].append(d)
for protocol in ProtocolsEnabled():
directory = os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup')
files = [(file, os.path.getctime(os.path.join(directory, file)))
for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
files.sort(key=lambda x: x[1], reverse=True)
for f, ct in files:
if RegexMatch(r"^(.*)_(.*)\.(conf)$", f):
s = re.search(r"^(.*)_(.*)\.(conf)$", f)
name = s.group(1)
if name not in existingConfiguration:
if name not in data['NonExistingConfigurations'].keys():
data['NonExistingConfigurations'][name] = []
date = s.group(2)
d = {
"protocol": protocol,
"filename": f,
"backupDate": date,
"content": open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f), 'r').read()
}
if f.replace(".conf", ".sql") in list(os.listdir(directory)):
d['database'] = True
d['databaseContent'] = open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read()
data['NonExistingConfigurations'][name].append(d)
return ResponseObject(data=data)
@app.get(f'{APP_PREFIX}/api/createWireguardConfigurationBackup')
@@ -2944,6 +2955,9 @@ def API_SystemStatus():
# pass
return ResponseObject(data=status)
@app.get(f'{APP_PREFIX}/api/protocolsEnabled')
def API_ProtocolsEnabled():
return ResponseObject(data=ProtocolsEnabled())
@app.get(f'{APP_PREFIX}/')
def index():
@@ -2980,10 +2994,15 @@ def gunicornConfig():
_, app_port = DashboardConfig.GetConfig("Server", "app_port")
return app_ip, app_port
def AmneziaWGEnabled():
def ProtocolsEnabled() -> list[str]:
from shutil import which
protocols = []
if which('awg') is not None and which('awg-quick') is not None:
protocols.append("awg")
if which('wg') is not None and which('wg-quick') is not None:
protocols.append("wg")
return protocols
return which('awg') is not None and which('awg-quick') is not None
def InitWireguardConfigurationsList(startup: bool = False):
confs = os.listdir(DashboardConfig.GetConfig("Server", "wg_conf_path")[1])
@@ -3000,7 +3019,7 @@ def InitWireguardConfigurationsList(startup: bool = False):
except WireguardConfiguration.InvalidConfigurationFileException as e:
print(f"{i} have an invalid configuration file.")
if AmneziaWGEnabled():
if "awg" in ProtocolsEnabled():
confs = os.listdir(DashboardConfig.GetConfig("Server", "awg_conf_path")[1])
confs.sort()
for i in confs: