refac: intermediate commit

This commit is contained in:
Daan Selen
2025-09-20 01:02:58 +02:00
parent 6bebe89202
commit ed896b6545

View File

@@ -72,7 +72,6 @@ class CustomJsonEncoder(DefaultJSONProvider):
return super().default(self)
'''
Response Object
'''
@@ -354,7 +353,8 @@ def API_ValidateAuthentication():
auth_required = DashboardConfig.GetConfig("Server", "auth_req")[1]
if auth_required and (not token or "username" not in session or session["username"] != token):
return ResponseObject(False, "Invalid authentication.")
return ResponseObject(False, "Invalid authentication.",
status_code=401)
return ResponseObject(True)
@@ -367,8 +367,9 @@ def API_RequireAuthentication():
@app.post(f'{APP_PREFIX}/api/authenticate')
def API_AuthenticateLogin():
data = request.get_json()
auth_req = DashboardConfig.GetConfig("Server", "auth_req")[1]
if not DashboardConfig.GetConfig("Server", "auth_req")[1]:
if not auth_req:
return ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
# API key login
@@ -378,7 +379,6 @@ def API_AuthenticateLogin():
# User login
return _login_with_credentials(data)
@app.get(f'{APP_PREFIX}/api/signout')
def API_SignOut():
resp = ResponseObject(True, "")
@@ -386,30 +386,30 @@ def API_SignOut():
session.clear()
return resp
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurations')
def API_getWireguardConfigurations():
InitWireguardConfigurationsList()
return ResponseObject(data=list(WireguardConfigurations.values()))
@app.get(f'{APP_PREFIX}/api/newConfigurationTemplates')
def API_NewConfigurationTemplates():
return ResponseObject(data=NewConfigurationTemplates.GetTemplates())
@app.get(f'{APP_PREFIX}/api/newConfigurationTemplates/createTemplate')
def API_NewConfigurationTemplates_CreateTemplate():
return ResponseObject(data=NewConfigurationTemplates.CreateTemplate().model_dump())
@app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/updateTemplate')
def API_NewConfigurationTemplates_UpdateTemplate():
data = request.get_json()
template = data.get('Template')
if not template:
return ResponseObject(False, "Please provide template")
return ResponseObject(False, "Please provide template",
status_code=400)
status, msg = NewConfigurationTemplates.UpdateTemplate(template)
return ResponseObject(status, msg)
@@ -417,23 +417,27 @@ def API_NewConfigurationTemplates_UpdateTemplate():
def API_NewConfigurationTemplates_DeleteTemplate():
data = request.get_json()
template = data.get('Template')
if not template:
return ResponseObject(False, "Please provide template")
return ResponseObject(False, "Please provide template",
status_code=400)
status, msg = NewConfigurationTemplates.DeleteTemplate(template)
return ResponseObject(status, msg)
@app.post(f'{APP_PREFIX}/api/addWireguardConfiguration')
def API_addWireguardConfiguration():
data = request.get_json()
protocol = data.get("Protocol")
required_keys = {"ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol"}
if not required_keys.issubset(data.keys()):
return ResponseObject(False, "Please provide all required parameters.")
return ResponseObject(False, "Please provide all required parameters.", status_code=400)
protocol = data.get("Protocol")
if protocol not in ProtocolsEnabled():
return ResponseObject(False, "Please provide a valid protocol: wg / awg.")
return ResponseObject(False, "Please provide a valid protocol: wg / awg.", status_code=400)
for cfg in WireguardConfigurations.values():
duplicates = {
@@ -443,16 +447,19 @@ def API_addWireguardConfiguration():
}
for key, is_duplicate in duplicates.items():
if is_duplicate:
return ResponseObject(False,
f"Already have a configuration with the {key.lower()} \"{data[key]}\"",
key)
return ResponseObject(
False,
f"Already have a configuration with the {key.lower()} \"{data[key]}\"",
key,
status_code=400
)
paths = {
"wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
"awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
}
if "Backup" in data:
paths = {
"wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
"awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
}
backup_file = data["Backup"]
protocol_detected = None
for proto, base_path in paths.items():
@@ -463,21 +470,30 @@ def API_addWireguardConfiguration():
break
if not protocol_detected:
return ResponseObject(False, "Backup does not exist")
return ResponseObject(False, "Backup does not exist", status_code=400)
shutil.copy(
os.path.join(paths[protocol_detected], 'WGDashboard_Backup', backup_file),
os.path.join(paths[protocol_detected], f'{data["ConfigurationName"]}.conf')
)
protocol = protocol_detected # Use backup protocol for object creation
if protocol == "wg":
ConfigClass = WireguardConfiguration
protocol = protocol_detected # Use backup protocol
else:
ConfigClass = AmneziaWireguardConfiguration
conf_path = os.path.join(paths[protocol], f'{data["ConfigurationName"]}.conf')
if not os.path.exists(conf_path):
with open(conf_path, 'w') as f:
f.write(
f"[Interface]\n"
f"Address = {data['Address']}\n"
f"ListenPort = {data['ListenPort']}\n"
f"PrivateKey = {data['PrivateKey']}\n"
)
os.chmod(conf_path, 0o600) # secure file permissions
ConfigClass = WireguardConfiguration if protocol == "wg" else AmneziaWireguardConfiguration
WireguardConfigurations[data['ConfigurationName']] = ConfigClass(
DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data, name=data['ConfigurationName']
DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks,
data=data, name=data['ConfigurationName']
)
return ResponseObject()
@@ -486,13 +502,16 @@ def API_addWireguardConfiguration():
@app.get(f'{APP_PREFIX}/api/toggleWireguardConfiguration')
def API_toggleWireguardConfiguration():
configuration_name = request.args.get('configurationName')
if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
return ResponseObject(False, "Please provide a valid configuration name",
status_code=404)
target_configuration = WireguardConfigurations[configuration_name]
status, msg = target_configuration.toggleConfiguration()
configuration_status = target_configuration.Status
return ResponseObject(status, msg, target_configuration.Status)
return ResponseObject(status, msg, configuration_status)
@app.post(f'{APP_PREFIX}/api/updateWireguardConfiguration')
@@ -501,14 +520,17 @@ def API_updateWireguardConfiguration():
name = data.get("Name")
if not name:
return ResponseObject(False, "Please provide the field: Name")
return ResponseObject(False, "Please provide the field: Name",
status_code=400)
config = _get_wireguard_config(name)
if isinstance(config, ResponseObject):
return config # Return 404 if config not found
if name not in WireguardConfigurations:
return ResponseObject(False, "Configuration does not exist",
status_code=404)
status, msg = config.updateConfigurationSettings(data)
return ResponseObject(status, message=msg, data=config)
target_configuration = WireguardConfigurations[name]
status, msg = target_configuration.updateConfigurationSettings(data)
return ResponseObject(status, msg, target_configuration)
@app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationInfo')
@@ -518,15 +540,16 @@ def API_updateWireguardConfigurationInfo():
key = data.get('Key')
value = data.get('Value')
if not all([name, key, value]):
if not all([name, key, value]): # Required values
return ResponseObject(False, "Please provide configuration name, key, and value")
config = _get_wireguard_config(name)
if isinstance(config, ResponseObject):
return config # Return 404 if config not found
if name not in WireguardConfigurations:
return ResponseObject(False, "Configuration does not exist", status_code=404)
status, msg, updated_key = config.updateConfigurationInfo(key, value)
return ResponseObject(status=status, message=msg, data=updated_key)
target_configuration = WireguardConfigurations[name]
status, msg, key = target_configuration.updateConfigurationInfo(key, value)
return ResponseObject(status, msg, key)
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRawFile')
@@ -558,7 +581,7 @@ def API_UpdateWireguardConfigurationRawFile():
config = WireguardConfigurations[configuration_name]
status, err = config.updateRawConfigurationFile(raw_configuration)
return ResponseObject(status=status, message=err)
return ResponseObject(status, err)
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfiguration')
def API_deleteWireguardConfiguration():
@@ -573,6 +596,7 @@ def API_deleteWireguardConfiguration():
if not status:
WireguardConfigurations[configuration_name] = rp
return ResponseObject(status)
@app.post(f'{APP_PREFIX}/api/renameWireguardConfiguration')
@@ -611,6 +635,7 @@ def API_renameWireguardConfiguration():
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRealtimeTraffic')
def API_getWireguardConfigurationRealtimeTraffic():
configuration_name = requests.args.get('configurationName')
if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Configuration does not exist", status_code=404)
@@ -619,10 +644,13 @@ def API_getWireguardConfigurationRealtimeTraffic():
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationBackup')
def API_getWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
configuration_name = request.args.get('configurationName')
if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(data=WireguardConfigurations[configurationName].getBackups())
target_configuration = WireguardConfigurations[configuration_name]
return ResponseObject(data=target_configuration.getBackups())
@app.get(f'{APP_PREFIX}/api/getAllWireguardConfigurationBackup')
def API_getAllWireguardConfigurationBackup():
@@ -630,47 +658,86 @@ def API_getAllWireguardConfigurationBackup():
"ExistingConfigurations": {},
"NonExistingConfigurations": {}
}
existingConfiguration = WireguardConfigurations.keys()
for i in existingConfiguration:
b = WireguardConfigurations[i].getBackups(True)
if len(b) > 0:
data['ExistingConfigurations'][i] = WireguardConfigurations[i].getBackups(True)
existing_configurations = WireguardConfigurations.keys()
for single_conf in existing_configurations:
backups = WireguardConfigurations[single_conf].getBackups(True)
if len(backups) > 0:
data['ExistingConfigurations'][single_conf] = WireguardConfigurations[single_conf].getBackups(True)
for protocol in ProtocolsEnabled():
directory = os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup')
if os.path.exists(directory):
files = [(file, os.path.getctime(os.path.join(directory, file)))
for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
files.sort(key=lambda x: x[1], reverse=True)
config_path_info = DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")
configuration_path = config_path_info[1]
backup_directory = os.path.join(configuration_path, 'WGDashboard_Backup')
for f, ct in files:
if RegexMatch(r"^(.*)_(.*)\.(conf)$", f):
s = re.search(r"^(.*)_(.*)\.(conf)$", f)
name = s.group(1)
if name not in existingConfiguration:
if name not in data['NonExistingConfigurations'].keys():
data['NonExistingConfigurations'][name] = []
if not os.path.exists(backup_directory):
continue
backup_files = []
for file_name in os.listdir(backup_directory):
full_file_path = os.path.join(backup_directory, file_name)
if os.path.isfile(full_file_path):
creation_time = os.path.getctime(full_file_path)
backup_files.append((file_name, creation_time))
backup_files.sort(key=lambda file_info: file_info[1], reverse=True)
for file_name, creation_time in backup_files:
pattern = r"^(.*)_(.*)\.conf$"
match_result = re.match(pattern, file_name)
if not match_result:
continue
configuration_name = match_result.group(1)
backup_date = match_result.group(2)
if configuration_name in existing_configurations:
continue
if 'NonExistingConfigurations' not in data:
data['NonExistingConfigurations'] = {}
if configuration_name not in data['NonExistingConfigurations']:
data['NonExistingConfigurations'][configuration_name] = []
configuration_file_path = os.path.join(backup_directory, file_name)
with open(configuration_file_path, 'r') as configuration_file:
configuration_content = configuration_file.read()
backup_data = {
"protocol": protocol,
"filename": file_name,
"backupDate": backup_date,
"content": configuration_content
}
sql_file_name = file_name.replace(".conf", ".sql")
sql_file_path = os.path.join(backup_directory, sql_file_name)
if os.path.isfile(sql_file_path):
with open(sql_file_path, 'r') as sql_file:
sql_content = sql_file.read()
backup_data["database"] = True
backup_data["databaseContent"] = sql_content
data['NonExistingConfigurations'][configuration_name].append(backup_data)
date = s.group(2)
d = {
"protocol": protocol,
"filename": f,
"backupDate": date,
"content": open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f), 'r').read()
}
if f.replace(".conf", ".sql") in list(os.listdir(directory)):
d['database'] = True
d['databaseContent'] = open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read()
data['NonExistingConfigurations'][name].append(d)
return ResponseObject(data=data)
@app.get(f'{APP_PREFIX}/api/createWireguardConfigurationBackup')
def API_createWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
configuration_name = request.args.get('configurationName')
if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(status=WireguardConfigurations[configurationName].backupConfigurationFile()[0],
data=WireguardConfigurations[configurationName].getBackups())
conf_backup_file = WireguardConfigurations[configuration_name].backupConfigurationFile()[0]
conf_backups = WireguardConfigurations[configuration_name].getBackups()
return ResponseObject(status=conf_backup_file,data=conf_backups)
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfigurationBackup')
def API_deleteWireguardConfigurationBackup():
@@ -702,18 +769,19 @@ def API_downloadWireguardConfigurationBackup():
@app.post(f'{APP_PREFIX}/api/restoreWireguardConfigurationBackup')
def API_restoreWireguardConfigurationBackup():
data = request.get_json()
configuration_name = data['ConfigurationName']
backup_file_name = data['BackupFileName']
if ("ConfigurationName" not in data.keys() or
"BackupFileName" not in data.keys() or
len(data['ConfigurationName']) == 0 or
len(data['BackupFileName']) == 0):
return ResponseObject(False,
"Please provide ConfigurationName and BackupFileName in body", status_code=400)
configurationName = data['ConfigurationName']
backupFileName = data['BackupFileName']
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False,"Please provide ConfigurationName and BackupFileName in body", status_code=400)
if configuration_name not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
status = WireguardConfigurations[configurationName].restoreBackup(backupFileName)
status = WireguardConfigurations[configuration_name].restoreBackup(backup_file_name)
return ResponseObject(status=status, message=(None if status else 'Restore backup failed'))
@app.get(f'{APP_PREFIX}/api/getDashboardConfiguration')
@@ -1455,31 +1523,6 @@ def API_Welcome_Finish():
DashboardConfig.SetConfig("Other", "welcome_session", False)
return ResponseObject()
class Locale:
def __init__(self):
self.localePath = './static/locales/'
self.activeLanguages = {}
with open(os.path.join(f"{self.localePath}supported_locales.json"), "r") as f:
self.activeLanguages = sorted(json.loads(''.join(f.readlines())), key=lambda x : x['lang_name'])
def getLanguage(self) -> dict | None:
currentLanguage = DashboardConfig.GetConfig("Server", "dashboard_language")[1]
if currentLanguage == "en":
return None
if os.path.exists(os.path.join(f"{self.localePath}{currentLanguage}.json")):
with open(os.path.join(f"{self.localePath}{currentLanguage}.json"), "r") as f:
return dict(json.loads(''.join(f.readlines())))
else:
return None
def updateLanguage(self, lang_id):
if not os.path.exists(os.path.join(f"{self.localePath}{lang_id}.json")):
DashboardConfig.SetConfig("Server", "dashboard_language", "en-US")
else:
DashboardConfig.SetConfig("Server", "dashboard_language", lang_id)
Locale = Locale()
@app.get(f'{APP_PREFIX}/api/locale')
def API_Locale_CurrentLang():
return ResponseObject(data=Locale.getLanguage())
@@ -1564,6 +1607,31 @@ def API_SystemStatus():
def API_ProtocolsEnabled():
return ResponseObject(data=ProtocolsEnabled())
class Locale:
def __init__(self):
self.localePath = './static/locales/'
self.activeLanguages = {}
with open(os.path.join(f"{self.localePath}supported_locales.json"), "r") as f:
self.activeLanguages = sorted(json.loads(''.join(f.readlines())), key=lambda x : x['lang_name'])
def getLanguage(self) -> dict | None:
currentLanguage = DashboardConfig.GetConfig("Server", "dashboard_language")[1]
if currentLanguage == "en":
return None
if os.path.exists(os.path.join(f"{self.localePath}{currentLanguage}.json")):
with open(os.path.join(f"{self.localePath}{currentLanguage}.json"), "r") as f:
return dict(json.loads(''.join(f.readlines())))
else:
return None
def updateLanguage(self, lang_id):
if not os.path.exists(os.path.join(f"{self.localePath}{lang_id}.json")):
DashboardConfig.SetConfig("Server", "dashboard_language", "en-US")
else:
DashboardConfig.SetConfig("Server", "dashboard_language", lang_id)
Locale = Locale()
'''
OIDC Controller
'''