diff --git a/.gitignore b/.gitignore index 1b1e0d2c..aa9f10cd 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ node_modules/** */proxy.js src/static/app/proxy.js .secrets +*.pid # Logs logs diff --git a/src/dashboard.py b/src/dashboard.py index 223e9d5f..fac8634c 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -1,27 +1,45 @@ +# --- Standard library imports --- +import configparser +import hashlib +import ipaddress +import json import logging -import random, shutil, sqlite3, configparser, hashlib, ipaddress, json, os, secrets, subprocess -import time, re, uuid, bcrypt, psutil, pyotp, threading +import os +import random +import re +import secrets +import shutil +import sqlite3 +import subprocess +import threading +import time import traceback +import uuid +from datetime import datetime, timedelta +from itertools import islice from uuid import uuid4 from zipfile import ZipFile -from datetime import datetime, timedelta +# --- Third-party imports --- +import bcrypt +import psutil +import pyotp import sqlalchemy -from jinja2 import Template -from flask import Flask, request, render_template, session, send_file +from flask import Flask, request, render_template, session, send_file, Response from flask_cors import CORS -from icmplib import ping, traceroute from flask.json.provider import DefaultJSONProvider -from itertools import islice - +from icmplib import ping, traceroute +from jinja2 import Template +from packaging import version from sqlalchemy import RowMapping +# --- Local module imports --- +from client import createClientBlueprint from modules.Utilities import ( RegexMatch, StringToBoolean, ValidateIPAddressesWithRange, ValidateDNSAddress, GenerateWireguardPublicKey, GenerateWireguardPrivateKey ) -from packaging import version from modules.Email import EmailSender from modules.DashboardLogger import DashboardLogger from modules.PeerJob import PeerJob @@ -31,16 +49,15 @@ from modules.PeerJobs import PeerJobs from modules.DashboardConfig import DashboardConfig from modules.WireguardConfiguration import WireguardConfiguration from modules.AmneziaWireguardConfiguration import AmneziaWireguardConfiguration - -from client import createClientBlueprint - -from logging.config import dictConfig - from modules.DashboardClients import DashboardClients from modules.DashboardPlugins import DashboardPlugins from modules.DashboardWebHooks import DashboardWebHooks from modules.NewConfigurationTemplates import NewConfigurationTemplates +# --- Logging configuration --- +from logging.config import dictConfig + + class CustomJsonEncoder(DefaultJSONProvider): def __init__(self, app): super().__init__(app) @@ -195,12 +212,12 @@ with app.app_context(): EmailSender = EmailSender(DashboardConfig) AllPeerShareLinks: PeerShareLinks = PeerShareLinks(DashboardConfig, WireguardConfigurations) AllPeerJobs: PeerJobs = PeerJobs(DashboardConfig, WireguardConfigurations) - DashboardLogger: DashboardLogger = DashboardLogger() - DashboardPlugins: DashboardPlugins = DashboardPlugins(app, WireguardConfigurations) - DashboardWebHooks: DashboardWebHooks = DashboardWebHooks(DashboardConfig) - NewConfigurationTemplates: NewConfigurationTemplates = NewConfigurationTemplates() + DashboardLogger = DashboardLogger() + DashboardPlugins = DashboardPlugins(app, WireguardConfigurations) + DashboardWebHooks = DashboardWebHooks(DashboardConfig) + NewConfigurationTemplates = NewConfigurationTemplates() InitWireguardConfigurationsList(startup=True) - DashboardClients: DashboardClients = DashboardClients(WireguardConfigurations) + DashboardClients = DashboardClients(WireguardConfigurations) app.register_blueprint(createClientBlueprint(WireguardConfigurations, DashboardConfig, DashboardClients)) _, APP_PREFIX = DashboardConfig.GetConfig("Server", "app_prefix") @@ -213,32 +230,103 @@ _, app_ip = DashboardConfig.GetConfig("Server", "app_ip") _, app_port = DashboardConfig.GetConfig("Server", "app_port") _, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path") + ''' API Routes ''' +def _enforce_session_auth(): + """Enforce session authentication for non-API key access.""" + white_list = [ + '/static/', + 'validateAuthentication', + 'authenticate', + 'getDashboardConfiguration', + 'getDashboardTheme', + 'getDashboardVersion', + 'sharePeer/get', + 'isTotpEnabled', + 'locale', + '/fileDownload', + '/client' + ] + + path_ok = ( + ("username" in session and session.get("role") == "admin") + or (f"{APP_PREFIX}/" == request.path or f"{APP_PREFIX}" == request.path) + or not all(sub not in request.path for sub in white_list) + ) + + if not path_ok: + response = Flask.make_response(app, { + "status": False, + "message": "Unauthorized access.", + "data": None + }) + response.content_type = "application/json" + response.status_code = 401 + return response + + +def _login_with_token(key): + auth_token = hashlib.sha256(f"{key}{datetime.now()}".encode()).hexdigest() + session.update({'role': 'admin', 'username': auth_token}) + session.permanent = True + resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1]) + resp.set_cookie("authToken", auth_token) + return resp + + +def _login_with_credentials(data): + username = data.get('username') + password = data.get('password') + totp_code = data.get('totp') + + valid_password = bcrypt.checkpw(password.encode("utf-8"), + DashboardConfig.GetConfig("Account", "password")[1].encode("utf-8")) + totp_enabled = DashboardConfig.GetConfig("Account", "enable_totp")[1] + totp_valid = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == totp_code if totp_enabled else True + + if username == DashboardConfig.GetConfig("Account", "username")[1] and valid_password and totp_valid: + auth_token = hashlib.sha256(f"{username}{datetime.now()}".encode()).hexdigest() + session.update({'role': 'admin', 'username': auth_token}) + session.permanent = True + DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login success: {username}") + resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1]) + resp.set_cookie("authToken", auth_token) + return resp + + DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login failed: {username}") + msg = "Sorry, your username, password or OTP is incorrect." if totp_enabled else "Sorry, your username or password is incorrect." + return ResponseObject(False, msg) + + @app.before_request def auth_req(): + # Skip preflight requests if request.method.lower() == 'options': - return ResponseObject(True) + return ResponseObject(True) DashboardConfig.APIAccessed = False + + # Logging if "api" in request.path: - if str(request.method) == "GET": - DashboardLogger.log(str(request.url), str(request.remote_addr), Message=str(request.args)) - elif str(request.method) == "POST": - DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Request Args: {str(request.args)} Body:{str(request.get_json())}") - - - authenticationRequired = DashboardConfig.GetConfig("Server", "auth_req")[1] - d = request.headers - if authenticationRequired: - apiKey = d.get('wg-dashboard-apikey') - apiKeyEnabled = DashboardConfig.GetConfig("Server", "dashboard_api_key")[1] - if apiKey is not None and len(apiKey) > 0 and apiKeyEnabled: - apiKeyExist = len(list(filter(lambda x : x.Key == apiKey, DashboardConfig.DashboardAPIKeys))) == 1 - DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"API Key Access: {('true' if apiKeyExist else 'false')} - Key: {apiKey}") - if not apiKeyExist: + log_message = str(request.args) if request.method.upper() == "GET" else f"Request Args: {str(request.args)} Body:{str(request.get_json())}" + DashboardLogger.log(str(request.url), str(request.remote_addr), Message=log_message) + + authentication_required = DashboardConfig.GetConfig("Server", "auth_req")[1] + headers = request.headers + + if authentication_required: + api_key = headers.get('wg-dashboard-apikey') + api_key_enabled = DashboardConfig.GetConfig("Server", "dashboard_api_key")[1] + + # API key authentication + if api_key and api_key_enabled: + api_key_exists = any(k.Key == api_key for k in DashboardConfig.DashboardAPIKeys) + DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"API Key Access: {api_key_exists} - Key: {api_key}") + + if not api_key_exists: DashboardConfig.APIAccessed = False response = Flask.make_response(app, { "status": False, @@ -248,84 +336,48 @@ def auth_req(): response.content_type = "application/json" response.status_code = 401 return response + DashboardConfig.APIAccessed = True else: DashboardConfig.APIAccessed = False - whiteList = [ - '/static/', 'validateAuthentication', 'authenticate', 'getDashboardConfiguration', - 'getDashboardTheme', 'getDashboardVersion', 'sharePeer/get', 'isTotpEnabled', 'locale', - '/fileDownload', - '/client' - ] - - if (("username" not in session or session.get("role") != "admin") - and (f"{(APP_PREFIX if len(APP_PREFIX) > 0 else '')}/" != request.path - and f"{(APP_PREFIX if len(APP_PREFIX) > 0 else '')}" != request.path) - and len(list(filter(lambda x : x not in request.path, whiteList))) == len(whiteList) - ): - response = Flask.make_response(app, { - "status": False, - "message": "Unauthorized access.", - "data": None - }) - response.content_type = "application/json" - response.status_code = 401 - return response + _enforce_session_auth() + @app.route(f'{APP_PREFIX}/api/handshake', methods=["GET", "OPTIONS"]) def API_Handshake(): return ResponseObject(True) + @app.get(f'{APP_PREFIX}/api/validateAuthentication') def API_ValidateAuthentication(): token = request.cookies.get("authToken") - if DashboardConfig.GetConfig("Server", "auth_req")[1]: - if token is None or token == "" or "username" not in session or session["username"] != token: - return ResponseObject(False, "Invalid authentication.") + auth_required = DashboardConfig.GetConfig("Server", "auth_req")[1] + + if auth_required and (not token or "username" not in session or session["username"] != token): + return ResponseObject(False, "Invalid authentication.") + return ResponseObject(True) + @app.get(f'{APP_PREFIX}/api/requireAuthentication') def API_RequireAuthentication(): return ResponseObject(data=DashboardConfig.GetConfig("Server", "auth_req")[1]) + @app.post(f'{APP_PREFIX}/api/authenticate') def API_AuthenticateLogin(): data = request.get_json() + if not DashboardConfig.GetConfig("Server", "auth_req")[1]: return ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1]) - - if DashboardConfig.APIAccessed: - authToken = hashlib.sha256(f"{request.headers.get('wg-dashboard-apikey')}{datetime.now()}".encode()).hexdigest() - session['role'] = 'admin' - session['username'] = authToken - resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1]) - resp.set_cookie("authToken", authToken) - session.permanent = True - return resp - valid = bcrypt.checkpw(data['password'].encode("utf-8"), - DashboardConfig.GetConfig("Account", "password")[1].encode("utf-8")) - totpEnabled = DashboardConfig.GetConfig("Account", "enable_totp")[1] - totpValid = False - if totpEnabled: - totpValid = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == data['totp'] - if (valid - and data['username'] == DashboardConfig.GetConfig("Account", "username")[1] - and ((totpEnabled and totpValid) or not totpEnabled) - ): - authToken = hashlib.sha256(f"{data['username']}{datetime.now()}".encode()).hexdigest() - session['role'] = 'admin' - session['username'] = authToken - resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1]) - resp.set_cookie("authToken", authToken) - session.permanent = True - DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login success: {data['username']}") - return resp - DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login failed: {data['username']}") - if totpEnabled: - return ResponseObject(False, "Sorry, your username, password or OTP is incorrect.") - else: - return ResponseObject(False, "Sorry, your username or password is incorrect.") + # API key login + if DashboardConfig.APIAccessed: + return _login_with_token(request.headers.get('wg-dashboard-apikey')) + + # User login + return _login_with_credentials(data) + @app.get(f'{APP_PREFIX}/api/signout') def API_SignOut(): @@ -334,202 +386,236 @@ def API_SignOut(): session.clear() return resp + @app.get(f'{APP_PREFIX}/api/getWireguardConfigurations') def API_getWireguardConfigurations(): InitWireguardConfigurationsList() - return ResponseObject(data=[wc for wc in WireguardConfigurations.values()]) + return ResponseObject(data=list(WireguardConfigurations.values())) + @app.get(f'{APP_PREFIX}/api/newConfigurationTemplates') def API_NewConfigurationTemplates(): return ResponseObject(data=NewConfigurationTemplates.GetTemplates()) + @app.get(f'{APP_PREFIX}/api/newConfigurationTemplates/createTemplate') def API_NewConfigurationTemplates_CreateTemplate(): return ResponseObject(data=NewConfigurationTemplates.CreateTemplate().model_dump()) + @app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/updateTemplate') def API_NewConfigurationTemplates_UpdateTemplate(): data = request.get_json() - template = data.get('Template', None) + template = data.get('Template') if not template: return ResponseObject(False, "Please provide template") - status, msg = NewConfigurationTemplates.UpdateTemplate(template) return ResponseObject(status, msg) + @app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/deleteTemplate') def API_NewConfigurationTemplates_DeleteTemplate(): data = request.get_json() - template = data.get('Template', None) + template = data.get('Template') if not template: return ResponseObject(False, "Please provide template") - status, msg = NewConfigurationTemplates.DeleteTemplate(template) return ResponseObject(status, msg) + @app.post(f'{APP_PREFIX}/api/addWireguardConfiguration') def API_addWireguardConfiguration(): data = request.get_json() - requiredKeys = [ - "ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol" - ] - for i in requiredKeys: - if i not in data.keys(): - return ResponseObject(False, "Please provide all required parameters.") - - if data.get("Protocol") not in ProtocolsEnabled(): + + required_keys = {"ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol"} + if not required_keys.issubset(data.keys()): + return ResponseObject(False, "Please provide all required parameters.") + + protocol = data.get("Protocol") + if protocol not in ProtocolsEnabled(): return ResponseObject(False, "Please provide a valid protocol: wg / awg.") - # Check duplicate names, ports, address - for i in WireguardConfigurations.values(): - if i.Name == data['ConfigurationName']: - return ResponseObject(False, - f"Already have a configuration with the name \"{data['ConfigurationName']}\"", - "ConfigurationName") + for cfg in WireguardConfigurations.values(): + duplicates = { + "ConfigurationName": cfg.Name == data['ConfigurationName'], + "ListenPort": str(cfg.ListenPort) == str(data["ListenPort"]), + "Address": cfg.Address == data["Address"] + } + for key, is_duplicate in duplicates.items(): + if is_duplicate: + return ResponseObject(False, + f"Already have a configuration with the {key.lower()} \"{data[key]}\"", + key) - if str(i.ListenPort) == str(data["ListenPort"]): - return ResponseObject(False, - f"Already have a configuration with the port \"{data['ListenPort']}\"", - "ListenPort") - - if i.Address == data["Address"]: - return ResponseObject(False, - f"Already have a configuration with the address \"{data['Address']}\"", - "Address") - - if "Backup" in data.keys(): - path = { + if "Backup" in data: + paths = { "wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1], "awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1] } - - if (os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"])) and - os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))): - protocol = "wg" - elif (os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"])) and - os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))): - protocol = "awg" - else: + + backup_file = data["Backup"] + protocol_detected = None + for proto, base_path in paths.items(): + conf_path = os.path.join(base_path, 'WGDashboard_Backup', backup_file) + sql_path = os.path.join(base_path, 'WGDashboard_Backup', backup_file.replace('.conf', '.sql')) + if os.path.exists(conf_path) and os.path.exists(sql_path): + protocol_detected = proto + break + + if not protocol_detected: return ResponseObject(False, "Backup does not exist") - + shutil.copy( - os.path.join(path[protocol], 'WGDashboard_Backup', data["Backup"]), - os.path.join(path[protocol], f'{data["ConfigurationName"]}.conf') + os.path.join(paths[protocol_detected], 'WGDashboard_Backup', backup_file), + os.path.join(paths[protocol_detected], f'{data["ConfigurationName"]}.conf') ) - WireguardConfigurations[data['ConfigurationName']] = ( - WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, data=data, name=data['ConfigurationName'])) if protocol == 'wg' else ( - AmneziaWireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data, name=data['ConfigurationName'])) + protocol = protocol_detected # Use backup protocol for object creation + + if protocol == "wg": + ConfigClass = WireguardConfiguration else: - WireguardConfigurations[data['ConfigurationName']] = ( - WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data)) if data.get('Protocol') == 'wg' else ( - AmneziaWireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data)) + ConfigClass = AmneziaWireguardConfiguration + + WireguardConfigurations[data['ConfigurationName']] = ConfigClass( + DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data, name=data['ConfigurationName'] + ) + return ResponseObject() + @app.get(f'{APP_PREFIX}/api/toggleWireguardConfiguration') def API_toggleWireguardConfiguration(): - configurationName = request.args.get('configurationName') - if configurationName is None or len( - configurationName) == 0 or configurationName not in WireguardConfigurations.keys(): + configuration_name = request.args.get('configurationName') + if not configuration_name or configuration_name not in WireguardConfigurations: return ResponseObject(False, "Please provide a valid configuration name", status_code=404) - toggleStatus, msg = WireguardConfigurations[configurationName].toggleConfiguration() - return ResponseObject(toggleStatus, msg, WireguardConfigurations[configurationName].Status) + + target_configuration = WireguardConfigurations[configuration_name] + status, msg = target_configuration.toggleConfiguration() + + return ResponseObject(status, msg, target_configuration.Status) + @app.post(f'{APP_PREFIX}/api/updateWireguardConfiguration') def API_updateWireguardConfiguration(): - data = request.get_json() - requiredKeys = ["Name"] - for i in requiredKeys: - if i not in data.keys(): - return ResponseObject(False, "Please provide these following field: " + ", ".join(requiredKeys)) + data = request.get_json() or {} name = data.get("Name") - if name not in WireguardConfigurations.keys(): - return ResponseObject(False, "Configuration does not exist", status_code=404) - status, msg = WireguardConfigurations[name].updateConfigurationSettings(data) + if not name: + return ResponseObject(False, "Please provide the field: Name") - return ResponseObject(status, message=msg, data=WireguardConfigurations[name]) + config = _get_wireguard_config(name) + if isinstance(config, ResponseObject): + return config # Return 404 if config not found + + status, msg = config.updateConfigurationSettings(data) + return ResponseObject(status, message=msg, data=config) + @app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationInfo') def API_updateWireguardConfigurationInfo(): - data = request.get_json() + data = request.get_json() or {} name = data.get('Name') key = data.get('Key') value = data.get('Value') - if not all([data, key, name]): - return ResponseObject(status=False, message="Please provide configuration name, key and value") - if name not in WireguardConfigurations.keys(): - return ResponseObject(False, "Configuration does not exist", status_code=404) - - status, msg, key = WireguardConfigurations[name].updateConfigurationInfo(key, value) - - return ResponseObject(status=status, message=msg, data=key) + + if not all([name, key, value]): + return ResponseObject(False, "Please provide configuration name, key, and value") + + config = _get_wireguard_config(name) + if isinstance(config, ResponseObject): + return config # Return 404 if config not found + + status, msg, updated_key = config.updateConfigurationInfo(key, value) + return ResponseObject(status=status, message=msg, data=updated_key) + @app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRawFile') -def API_GetWireguardConfigurationRawFile(): - configurationName = request.args.get('configurationName') - if configurationName is None or len( - configurationName) == 0 or configurationName not in WireguardConfigurations.keys(): +def API_getWireguardConfigurationRawFile(): + configuration_name = request.args.get('configurationName') + + if not configuration_name or configuration_name not in WireguardConfigurations: return ResponseObject(False, "Please provide a valid configuration name", status_code=404) + config = WireguardConfigurations[configuration_name] + return ResponseObject(data={ - "path": WireguardConfigurations[configurationName].configPath, - "content": WireguardConfigurations[configurationName].getRawConfigurationFile() + "path": config.configPath, + "content": config.getRawConfigurationFile() }) @app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationRawFile') def API_UpdateWireguardConfigurationRawFile(): - data = request.get_json() - configurationName = data.get('configurationName') - rawConfiguration = data.get('rawConfiguration') - if configurationName is None or len( - configurationName) == 0 or configurationName not in WireguardConfigurations.keys(): + data = request.get_json() or {} + configuration_name = data.get('configurationName') + raw_configuration = data.get('rawConfiguration') + + if not configuration_name or configuration_name not in WireguardConfigurations: return ResponseObject(False, "Please provide a valid configuration name") - if rawConfiguration is None or len(rawConfiguration) == 0: + + if not raw_configuration: return ResponseObject(False, "Please provide content") - status, err = WireguardConfigurations[configurationName].updateRawConfigurationFile(rawConfiguration) + config = WireguardConfigurations[configuration_name] + status, err = config.updateRawConfigurationFile(raw_configuration) return ResponseObject(status=status, message=err) @app.post(f'{APP_PREFIX}/api/deleteWireguardConfiguration') def API_deleteWireguardConfiguration(): - data = request.get_json() - if "ConfigurationName" not in data.keys() or data.get("ConfigurationName") is None or data.get("ConfigurationName") not in WireguardConfigurations.keys(): + data = request.get_json() or {} + configuration_name = data.get("ConfigurationName") + + if not configuration_name or configuration_name not in WireguardConfigurations: return ResponseObject(False, "Please provide the configuration name you want to delete", status_code=404) - rp = WireguardConfigurations.pop(data.get("ConfigurationName")) + rp = WireguardConfigurations.pop(configuration_name) status = rp.deleteConfiguration() + if not status: - WireguardConfigurations[data.get("ConfigurationName")] = rp + WireguardConfigurations[configuration_name] = rp return ResponseObject(status) @app.post(f'{APP_PREFIX}/api/renameWireguardConfiguration') def API_renameWireguardConfiguration(): - data = request.get_json() - keys = ["ConfigurationName", "NewConfigurationName"] - for k in keys: - if (k not in data.keys() or data.get(k) is None or len(data.get(k)) == 0 or - (k == "ConfigurationName" and data.get(k) not in WireguardConfigurations.keys())): - return ResponseObject(False, "Please provide the configuration name you want to rename", status_code=404) + data = request.get_json() or {} + + old_name = data.get("ConfigurationName") + new_name = data.get("NewConfigurationName") + + if not old_name or old_name not in WireguardConfigurations: + return ResponseObject(False, "Please provide a valid configuration name to rename", status_code=404) - if data.get("NewConfigurationName") in WireguardConfigurations.keys(): - return ResponseObject(False, "Configuration name already exist", status_code=400) + if not new_name: + return ResponseObject(False, "Please provide a new configuration name", status=400) - rc = WireguardConfigurations.pop(data.get("ConfigurationName")) + if new_name in WireguardConfigurations: + return ResponseObject(False, "The configuration name already exists", status_code=400) - status, message = rc.renameConfiguration(data.get("NewConfigurationName")) + rc = WireguardConfigurations.pop(old_name) + status, message = rc.renameConfiguration(new_name) + if status: - WireguardConfigurations[data.get("NewConfigurationName")] = (WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data.get("NewConfigurationName")) if rc.Protocol == 'wg' else AmneziaWireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data.get("NewConfigurationName"))) + if rc.Protocol == 'wg': + ConfigClass = WireguardConfiguration + else: + ConfigClass = AmneziaWireguardConfiguration + + WireguardConfigurations[new_name] = ConfigClass( + DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, new_name + ) else: - WireguardConfigurations[data.get("ConfigurationName")] = rc + WireguardConfigurations[old_name] = rc + return ResponseObject(status, message) @app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRealtimeTraffic') def API_getWireguardConfigurationRealtimeTraffic(): - configurationName = request.args.get('configurationName') - if configurationName is None or configurationName not in WireguardConfigurations.keys(): + configuration_name = requests.args.get('configurationName') + if not configuration_name or configuration_name not in WireguardConfigurations: return ResponseObject(False, "Configuration does not exist", status_code=404) - return ResponseObject(data=WireguardConfigurations[configurationName].getRealtimeTrafficUsage()) + + rt_traffic_usage = WireguardConfigurations[configuration_name] + return ResponseObject(data=rt_traffic_usage) @app.get(f'{APP_PREFIX}/api/getWireguardConfigurationBackup') def API_getWireguardConfigurationBackup():