refac: rewrite first part of dashboard.py

This commit is contained in:
Daan Selen
2025-09-20 00:10:19 +02:00
parent 101ac5e985
commit 6bebe89202
2 changed files with 280 additions and 193 deletions

1
.gitignore vendored
View File

@@ -19,6 +19,7 @@ node_modules/**
*/proxy.js */proxy.js
src/static/app/proxy.js src/static/app/proxy.js
.secrets .secrets
*.pid
# Logs # Logs
logs logs

View File

@@ -1,27 +1,45 @@
# --- Standard library imports ---
import configparser
import hashlib
import ipaddress
import json
import logging import logging
import random, shutil, sqlite3, configparser, hashlib, ipaddress, json, os, secrets, subprocess import os
import time, re, uuid, bcrypt, psutil, pyotp, threading import random
import re
import secrets
import shutil
import sqlite3
import subprocess
import threading
import time
import traceback import traceback
import uuid
from datetime import datetime, timedelta
from itertools import islice
from uuid import uuid4 from uuid import uuid4
from zipfile import ZipFile from zipfile import ZipFile
from datetime import datetime, timedelta
# --- Third-party imports ---
import bcrypt
import psutil
import pyotp
import sqlalchemy import sqlalchemy
from jinja2 import Template from flask import Flask, request, render_template, session, send_file, Response
from flask import Flask, request, render_template, session, send_file
from flask_cors import CORS from flask_cors import CORS
from icmplib import ping, traceroute
from flask.json.provider import DefaultJSONProvider from flask.json.provider import DefaultJSONProvider
from itertools import islice from icmplib import ping, traceroute
from jinja2 import Template
from packaging import version
from sqlalchemy import RowMapping from sqlalchemy import RowMapping
# --- Local module imports ---
from client import createClientBlueprint
from modules.Utilities import ( from modules.Utilities import (
RegexMatch, StringToBoolean, RegexMatch, StringToBoolean,
ValidateIPAddressesWithRange, ValidateDNSAddress, ValidateIPAddressesWithRange, ValidateDNSAddress,
GenerateWireguardPublicKey, GenerateWireguardPrivateKey GenerateWireguardPublicKey, GenerateWireguardPrivateKey
) )
from packaging import version
from modules.Email import EmailSender from modules.Email import EmailSender
from modules.DashboardLogger import DashboardLogger from modules.DashboardLogger import DashboardLogger
from modules.PeerJob import PeerJob from modules.PeerJob import PeerJob
@@ -31,16 +49,15 @@ from modules.PeerJobs import PeerJobs
from modules.DashboardConfig import DashboardConfig from modules.DashboardConfig import DashboardConfig
from modules.WireguardConfiguration import WireguardConfiguration from modules.WireguardConfiguration import WireguardConfiguration
from modules.AmneziaWireguardConfiguration import AmneziaWireguardConfiguration from modules.AmneziaWireguardConfiguration import AmneziaWireguardConfiguration
from client import createClientBlueprint
from logging.config import dictConfig
from modules.DashboardClients import DashboardClients from modules.DashboardClients import DashboardClients
from modules.DashboardPlugins import DashboardPlugins from modules.DashboardPlugins import DashboardPlugins
from modules.DashboardWebHooks import DashboardWebHooks from modules.DashboardWebHooks import DashboardWebHooks
from modules.NewConfigurationTemplates import NewConfigurationTemplates from modules.NewConfigurationTemplates import NewConfigurationTemplates
# --- Logging configuration ---
from logging.config import dictConfig
class CustomJsonEncoder(DefaultJSONProvider): class CustomJsonEncoder(DefaultJSONProvider):
def __init__(self, app): def __init__(self, app):
super().__init__(app) super().__init__(app)
@@ -195,12 +212,12 @@ with app.app_context():
EmailSender = EmailSender(DashboardConfig) EmailSender = EmailSender(DashboardConfig)
AllPeerShareLinks: PeerShareLinks = PeerShareLinks(DashboardConfig, WireguardConfigurations) AllPeerShareLinks: PeerShareLinks = PeerShareLinks(DashboardConfig, WireguardConfigurations)
AllPeerJobs: PeerJobs = PeerJobs(DashboardConfig, WireguardConfigurations) AllPeerJobs: PeerJobs = PeerJobs(DashboardConfig, WireguardConfigurations)
DashboardLogger: DashboardLogger = DashboardLogger() DashboardLogger = DashboardLogger()
DashboardPlugins: DashboardPlugins = DashboardPlugins(app, WireguardConfigurations) DashboardPlugins = DashboardPlugins(app, WireguardConfigurations)
DashboardWebHooks: DashboardWebHooks = DashboardWebHooks(DashboardConfig) DashboardWebHooks = DashboardWebHooks(DashboardConfig)
NewConfigurationTemplates: NewConfigurationTemplates = NewConfigurationTemplates() NewConfigurationTemplates = NewConfigurationTemplates()
InitWireguardConfigurationsList(startup=True) InitWireguardConfigurationsList(startup=True)
DashboardClients: DashboardClients = DashboardClients(WireguardConfigurations) DashboardClients = DashboardClients(WireguardConfigurations)
app.register_blueprint(createClientBlueprint(WireguardConfigurations, DashboardConfig, DashboardClients)) app.register_blueprint(createClientBlueprint(WireguardConfigurations, DashboardConfig, DashboardClients))
_, APP_PREFIX = DashboardConfig.GetConfig("Server", "app_prefix") _, APP_PREFIX = DashboardConfig.GetConfig("Server", "app_prefix")
@@ -213,56 +230,34 @@ _, app_ip = DashboardConfig.GetConfig("Server", "app_ip")
_, app_port = DashboardConfig.GetConfig("Server", "app_port") _, app_port = DashboardConfig.GetConfig("Server", "app_port")
_, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path") _, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path")
''' '''
API Routes API Routes
''' '''
@app.before_request def _enforce_session_auth():
def auth_req(): """Enforce session authentication for non-API key access."""
if request.method.lower() == 'options': white_list = [
return ResponseObject(True) '/static/',
'validateAuthentication',
DashboardConfig.APIAccessed = False 'authenticate',
if "api" in request.path: 'getDashboardConfiguration',
if str(request.method) == "GET": 'getDashboardTheme',
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=str(request.args)) 'getDashboardVersion',
elif str(request.method) == "POST": 'sharePeer/get',
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Request Args: {str(request.args)} Body:{str(request.get_json())}") 'isTotpEnabled',
'locale',
authenticationRequired = DashboardConfig.GetConfig("Server", "auth_req")[1]
d = request.headers
if authenticationRequired:
apiKey = d.get('wg-dashboard-apikey')
apiKeyEnabled = DashboardConfig.GetConfig("Server", "dashboard_api_key")[1]
if apiKey is not None and len(apiKey) > 0 and apiKeyEnabled:
apiKeyExist = len(list(filter(lambda x : x.Key == apiKey, DashboardConfig.DashboardAPIKeys))) == 1
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"API Key Access: {('true' if apiKeyExist else 'false')} - Key: {apiKey}")
if not apiKeyExist:
DashboardConfig.APIAccessed = False
response = Flask.make_response(app, {
"status": False,
"message": "API Key does not exist",
"data": None
})
response.content_type = "application/json"
response.status_code = 401
return response
DashboardConfig.APIAccessed = True
else:
DashboardConfig.APIAccessed = False
whiteList = [
'/static/', 'validateAuthentication', 'authenticate', 'getDashboardConfiguration',
'getDashboardTheme', 'getDashboardVersion', 'sharePeer/get', 'isTotpEnabled', 'locale',
'/fileDownload', '/fileDownload',
'/client' '/client'
] ]
if (("username" not in session or session.get("role") != "admin") path_ok = (
and (f"{(APP_PREFIX if len(APP_PREFIX) > 0 else '')}/" != request.path ("username" in session and session.get("role") == "admin")
and f"{(APP_PREFIX if len(APP_PREFIX) > 0 else '')}" != request.path) or (f"{APP_PREFIX}/" == request.path or f"{APP_PREFIX}" == request.path)
and len(list(filter(lambda x : x not in request.path, whiteList))) == len(whiteList) or not all(sub not in request.path for sub in white_list)
): )
if not path_ok:
response = Flask.make_response(app, { response = Flask.make_response(app, {
"status": False, "status": False,
"message": "Unauthorized access.", "message": "Unauthorized access.",
@@ -272,60 +267,117 @@ def auth_req():
response.status_code = 401 response.status_code = 401
return response return response
def _login_with_token(key):
auth_token = hashlib.sha256(f"{key}{datetime.now()}".encode()).hexdigest()
session.update({'role': 'admin', 'username': auth_token})
session.permanent = True
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
resp.set_cookie("authToken", auth_token)
return resp
def _login_with_credentials(data):
username = data.get('username')
password = data.get('password')
totp_code = data.get('totp')
valid_password = bcrypt.checkpw(password.encode("utf-8"),
DashboardConfig.GetConfig("Account", "password")[1].encode("utf-8"))
totp_enabled = DashboardConfig.GetConfig("Account", "enable_totp")[1]
totp_valid = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == totp_code if totp_enabled else True
if username == DashboardConfig.GetConfig("Account", "username")[1] and valid_password and totp_valid:
auth_token = hashlib.sha256(f"{username}{datetime.now()}".encode()).hexdigest()
session.update({'role': 'admin', 'username': auth_token})
session.permanent = True
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login success: {username}")
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
resp.set_cookie("authToken", auth_token)
return resp
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login failed: {username}")
msg = "Sorry, your username, password or OTP is incorrect." if totp_enabled else "Sorry, your username or password is incorrect."
return ResponseObject(False, msg)
@app.before_request
def auth_req():
# Skip preflight requests
if request.method.lower() == 'options':
return ResponseObject(True)
DashboardConfig.APIAccessed = False
# Logging
if "api" in request.path:
log_message = str(request.args) if request.method.upper() == "GET" else f"Request Args: {str(request.args)} Body:{str(request.get_json())}"
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=log_message)
authentication_required = DashboardConfig.GetConfig("Server", "auth_req")[1]
headers = request.headers
if authentication_required:
api_key = headers.get('wg-dashboard-apikey')
api_key_enabled = DashboardConfig.GetConfig("Server", "dashboard_api_key")[1]
# API key authentication
if api_key and api_key_enabled:
api_key_exists = any(k.Key == api_key for k in DashboardConfig.DashboardAPIKeys)
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"API Key Access: {api_key_exists} - Key: {api_key}")
if not api_key_exists:
DashboardConfig.APIAccessed = False
response = Flask.make_response(app, {
"status": False,
"message": "API Key does not exist",
"data": None
})
response.content_type = "application/json"
response.status_code = 401
return response
DashboardConfig.APIAccessed = True
else:
DashboardConfig.APIAccessed = False
_enforce_session_auth()
@app.route(f'{APP_PREFIX}/api/handshake', methods=["GET", "OPTIONS"]) @app.route(f'{APP_PREFIX}/api/handshake', methods=["GET", "OPTIONS"])
def API_Handshake(): def API_Handshake():
return ResponseObject(True) return ResponseObject(True)
@app.get(f'{APP_PREFIX}/api/validateAuthentication') @app.get(f'{APP_PREFIX}/api/validateAuthentication')
def API_ValidateAuthentication(): def API_ValidateAuthentication():
token = request.cookies.get("authToken") token = request.cookies.get("authToken")
if DashboardConfig.GetConfig("Server", "auth_req")[1]: auth_required = DashboardConfig.GetConfig("Server", "auth_req")[1]
if token is None or token == "" or "username" not in session or session["username"] != token:
if auth_required and (not token or "username" not in session or session["username"] != token):
return ResponseObject(False, "Invalid authentication.") return ResponseObject(False, "Invalid authentication.")
return ResponseObject(True) return ResponseObject(True)
@app.get(f'{APP_PREFIX}/api/requireAuthentication') @app.get(f'{APP_PREFIX}/api/requireAuthentication')
def API_RequireAuthentication(): def API_RequireAuthentication():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "auth_req")[1]) return ResponseObject(data=DashboardConfig.GetConfig("Server", "auth_req")[1])
@app.post(f'{APP_PREFIX}/api/authenticate') @app.post(f'{APP_PREFIX}/api/authenticate')
def API_AuthenticateLogin(): def API_AuthenticateLogin():
data = request.get_json() data = request.get_json()
if not DashboardConfig.GetConfig("Server", "auth_req")[1]: if not DashboardConfig.GetConfig("Server", "auth_req")[1]:
return ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1]) return ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
# API key login
if DashboardConfig.APIAccessed: if DashboardConfig.APIAccessed:
authToken = hashlib.sha256(f"{request.headers.get('wg-dashboard-apikey')}{datetime.now()}".encode()).hexdigest() return _login_with_token(request.headers.get('wg-dashboard-apikey'))
session['role'] = 'admin'
session['username'] = authToken # User login
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1]) return _login_with_credentials(data)
resp.set_cookie("authToken", authToken)
session.permanent = True
return resp
valid = bcrypt.checkpw(data['password'].encode("utf-8"),
DashboardConfig.GetConfig("Account", "password")[1].encode("utf-8"))
totpEnabled = DashboardConfig.GetConfig("Account", "enable_totp")[1]
totpValid = False
if totpEnabled:
totpValid = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == data['totp']
if (valid
and data['username'] == DashboardConfig.GetConfig("Account", "username")[1]
and ((totpEnabled and totpValid) or not totpEnabled)
):
authToken = hashlib.sha256(f"{data['username']}{datetime.now()}".encode()).hexdigest()
session['role'] = 'admin'
session['username'] = authToken
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
resp.set_cookie("authToken", authToken)
session.permanent = True
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login success: {data['username']}")
return resp
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login failed: {data['username']}")
if totpEnabled:
return ResponseObject(False, "Sorry, your username, password or OTP is incorrect.")
else:
return ResponseObject(False, "Sorry, your username or password is incorrect.")
@app.get(f'{APP_PREFIX}/api/signout') @app.get(f'{APP_PREFIX}/api/signout')
def API_SignOut(): def API_SignOut():
@@ -334,202 +386,236 @@ def API_SignOut():
session.clear() session.clear()
return resp return resp
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurations') @app.get(f'{APP_PREFIX}/api/getWireguardConfigurations')
def API_getWireguardConfigurations(): def API_getWireguardConfigurations():
InitWireguardConfigurationsList() InitWireguardConfigurationsList()
return ResponseObject(data=[wc for wc in WireguardConfigurations.values()]) return ResponseObject(data=list(WireguardConfigurations.values()))
@app.get(f'{APP_PREFIX}/api/newConfigurationTemplates') @app.get(f'{APP_PREFIX}/api/newConfigurationTemplates')
def API_NewConfigurationTemplates(): def API_NewConfigurationTemplates():
return ResponseObject(data=NewConfigurationTemplates.GetTemplates()) return ResponseObject(data=NewConfigurationTemplates.GetTemplates())
@app.get(f'{APP_PREFIX}/api/newConfigurationTemplates/createTemplate') @app.get(f'{APP_PREFIX}/api/newConfigurationTemplates/createTemplate')
def API_NewConfigurationTemplates_CreateTemplate(): def API_NewConfigurationTemplates_CreateTemplate():
return ResponseObject(data=NewConfigurationTemplates.CreateTemplate().model_dump()) return ResponseObject(data=NewConfigurationTemplates.CreateTemplate().model_dump())
@app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/updateTemplate') @app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/updateTemplate')
def API_NewConfigurationTemplates_UpdateTemplate(): def API_NewConfigurationTemplates_UpdateTemplate():
data = request.get_json() data = request.get_json()
template = data.get('Template', None) template = data.get('Template')
if not template: if not template:
return ResponseObject(False, "Please provide template") return ResponseObject(False, "Please provide template")
status, msg = NewConfigurationTemplates.UpdateTemplate(template) status, msg = NewConfigurationTemplates.UpdateTemplate(template)
return ResponseObject(status, msg) return ResponseObject(status, msg)
@app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/deleteTemplate') @app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/deleteTemplate')
def API_NewConfigurationTemplates_DeleteTemplate(): def API_NewConfigurationTemplates_DeleteTemplate():
data = request.get_json() data = request.get_json()
template = data.get('Template', None) template = data.get('Template')
if not template: if not template:
return ResponseObject(False, "Please provide template") return ResponseObject(False, "Please provide template")
status, msg = NewConfigurationTemplates.DeleteTemplate(template) status, msg = NewConfigurationTemplates.DeleteTemplate(template)
return ResponseObject(status, msg) return ResponseObject(status, msg)
@app.post(f'{APP_PREFIX}/api/addWireguardConfiguration') @app.post(f'{APP_PREFIX}/api/addWireguardConfiguration')
def API_addWireguardConfiguration(): def API_addWireguardConfiguration():
data = request.get_json() data = request.get_json()
requiredKeys = [
"ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol" required_keys = {"ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol"}
] if not required_keys.issubset(data.keys()):
for i in requiredKeys:
if i not in data.keys():
return ResponseObject(False, "Please provide all required parameters.") return ResponseObject(False, "Please provide all required parameters.")
if data.get("Protocol") not in ProtocolsEnabled(): protocol = data.get("Protocol")
if protocol not in ProtocolsEnabled():
return ResponseObject(False, "Please provide a valid protocol: wg / awg.") return ResponseObject(False, "Please provide a valid protocol: wg / awg.")
# Check duplicate names, ports, address for cfg in WireguardConfigurations.values():
for i in WireguardConfigurations.values(): duplicates = {
if i.Name == data['ConfigurationName']: "ConfigurationName": cfg.Name == data['ConfigurationName'],
"ListenPort": str(cfg.ListenPort) == str(data["ListenPort"]),
"Address": cfg.Address == data["Address"]
}
for key, is_duplicate in duplicates.items():
if is_duplicate:
return ResponseObject(False, return ResponseObject(False,
f"Already have a configuration with the name \"{data['ConfigurationName']}\"", f"Already have a configuration with the {key.lower()} \"{data[key]}\"",
"ConfigurationName") key)
if str(i.ListenPort) == str(data["ListenPort"]): if "Backup" in data:
return ResponseObject(False, paths = {
f"Already have a configuration with the port \"{data['ListenPort']}\"",
"ListenPort")
if i.Address == data["Address"]:
return ResponseObject(False,
f"Already have a configuration with the address \"{data['Address']}\"",
"Address")
if "Backup" in data.keys():
path = {
"wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1], "wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
"awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1] "awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
} }
if (os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"])) and backup_file = data["Backup"]
os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))): protocol_detected = None
protocol = "wg" for proto, base_path in paths.items():
elif (os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"])) and conf_path = os.path.join(base_path, 'WGDashboard_Backup', backup_file)
os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))): sql_path = os.path.join(base_path, 'WGDashboard_Backup', backup_file.replace('.conf', '.sql'))
protocol = "awg" if os.path.exists(conf_path) and os.path.exists(sql_path):
else: protocol_detected = proto
break
if not protocol_detected:
return ResponseObject(False, "Backup does not exist") return ResponseObject(False, "Backup does not exist")
shutil.copy( shutil.copy(
os.path.join(path[protocol], 'WGDashboard_Backup', data["Backup"]), os.path.join(paths[protocol_detected], 'WGDashboard_Backup', backup_file),
os.path.join(path[protocol], f'{data["ConfigurationName"]}.conf') os.path.join(paths[protocol_detected], f'{data["ConfigurationName"]}.conf')
) )
WireguardConfigurations[data['ConfigurationName']] = ( protocol = protocol_detected # Use backup protocol for object creation
WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, data=data, name=data['ConfigurationName'])) if protocol == 'wg' else (
AmneziaWireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data, name=data['ConfigurationName'])) if protocol == "wg":
ConfigClass = WireguardConfiguration
else: else:
WireguardConfigurations[data['ConfigurationName']] = ( ConfigClass = AmneziaWireguardConfiguration
WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data)) if data.get('Protocol') == 'wg' else (
AmneziaWireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data)) WireguardConfigurations[data['ConfigurationName']] = ConfigClass(
DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data, name=data['ConfigurationName']
)
return ResponseObject() return ResponseObject()
@app.get(f'{APP_PREFIX}/api/toggleWireguardConfiguration') @app.get(f'{APP_PREFIX}/api/toggleWireguardConfiguration')
def API_toggleWireguardConfiguration(): def API_toggleWireguardConfiguration():
configurationName = request.args.get('configurationName') configuration_name = request.args.get('configurationName')
if configurationName is None or len( if not configuration_name or configuration_name not in WireguardConfigurations:
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide a valid configuration name", status_code=404) return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
toggleStatus, msg = WireguardConfigurations[configurationName].toggleConfiguration()
return ResponseObject(toggleStatus, msg, WireguardConfigurations[configurationName].Status) target_configuration = WireguardConfigurations[configuration_name]
status, msg = target_configuration.toggleConfiguration()
return ResponseObject(status, msg, target_configuration.Status)
@app.post(f'{APP_PREFIX}/api/updateWireguardConfiguration') @app.post(f'{APP_PREFIX}/api/updateWireguardConfiguration')
def API_updateWireguardConfiguration(): def API_updateWireguardConfiguration():
data = request.get_json() data = request.get_json() or {}
requiredKeys = ["Name"]
for i in requiredKeys:
if i not in data.keys():
return ResponseObject(False, "Please provide these following field: " + ", ".join(requiredKeys))
name = data.get("Name") name = data.get("Name")
if name not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
status, msg = WireguardConfigurations[name].updateConfigurationSettings(data) if not name:
return ResponseObject(False, "Please provide the field: Name")
config = _get_wireguard_config(name)
if isinstance(config, ResponseObject):
return config # Return 404 if config not found
status, msg = config.updateConfigurationSettings(data)
return ResponseObject(status, message=msg, data=config)
return ResponseObject(status, message=msg, data=WireguardConfigurations[name])
@app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationInfo') @app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationInfo')
def API_updateWireguardConfigurationInfo(): def API_updateWireguardConfigurationInfo():
data = request.get_json() data = request.get_json() or {}
name = data.get('Name') name = data.get('Name')
key = data.get('Key') key = data.get('Key')
value = data.get('Value') value = data.get('Value')
if not all([data, key, name]):
return ResponseObject(status=False, message="Please provide configuration name, key and value")
if name not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
status, msg, key = WireguardConfigurations[name].updateConfigurationInfo(key, value) if not all([name, key, value]):
return ResponseObject(False, "Please provide configuration name, key, and value")
config = _get_wireguard_config(name)
if isinstance(config, ResponseObject):
return config # Return 404 if config not found
status, msg, updated_key = config.updateConfigurationInfo(key, value)
return ResponseObject(status=status, message=msg, data=updated_key)
return ResponseObject(status=status, message=msg, data=key)
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRawFile') @app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRawFile')
def API_GetWireguardConfigurationRawFile(): def API_getWireguardConfigurationRawFile():
configurationName = request.args.get('configurationName') configuration_name = request.args.get('configurationName')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys(): if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Please provide a valid configuration name", status_code=404) return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
config = WireguardConfigurations[configuration_name]
return ResponseObject(data={ return ResponseObject(data={
"path": WireguardConfigurations[configurationName].configPath, "path": config.configPath,
"content": WireguardConfigurations[configurationName].getRawConfigurationFile() "content": config.getRawConfigurationFile()
}) })
@app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationRawFile') @app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationRawFile')
def API_UpdateWireguardConfigurationRawFile(): def API_UpdateWireguardConfigurationRawFile():
data = request.get_json() data = request.get_json() or {}
configurationName = data.get('configurationName') configuration_name = data.get('configurationName')
rawConfiguration = data.get('rawConfiguration') raw_configuration = data.get('rawConfiguration')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys(): if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Please provide a valid configuration name") return ResponseObject(False, "Please provide a valid configuration name")
if rawConfiguration is None or len(rawConfiguration) == 0:
if not raw_configuration:
return ResponseObject(False, "Please provide content") return ResponseObject(False, "Please provide content")
status, err = WireguardConfigurations[configurationName].updateRawConfigurationFile(rawConfiguration) config = WireguardConfigurations[configuration_name]
status, err = config.updateRawConfigurationFile(raw_configuration)
return ResponseObject(status=status, message=err) return ResponseObject(status=status, message=err)
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfiguration') @app.post(f'{APP_PREFIX}/api/deleteWireguardConfiguration')
def API_deleteWireguardConfiguration(): def API_deleteWireguardConfiguration():
data = request.get_json() data = request.get_json() or {}
if "ConfigurationName" not in data.keys() or data.get("ConfigurationName") is None or data.get("ConfigurationName") not in WireguardConfigurations.keys(): configuration_name = data.get("ConfigurationName")
return ResponseObject(False, "Please provide the configuration name you want to delete", status_code=404)
rp = WireguardConfigurations.pop(data.get("ConfigurationName"))
if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Please provide the configuration name you want to delete", status_code=404)
rp = WireguardConfigurations.pop(configuration_name)
status = rp.deleteConfiguration() status = rp.deleteConfiguration()
if not status: if not status:
WireguardConfigurations[data.get("ConfigurationName")] = rp WireguardConfigurations[configuration_name] = rp
return ResponseObject(status) return ResponseObject(status)
@app.post(f'{APP_PREFIX}/api/renameWireguardConfiguration') @app.post(f'{APP_PREFIX}/api/renameWireguardConfiguration')
def API_renameWireguardConfiguration(): def API_renameWireguardConfiguration():
data = request.get_json() data = request.get_json() or {}
keys = ["ConfigurationName", "NewConfigurationName"]
for k in keys:
if (k not in data.keys() or data.get(k) is None or len(data.get(k)) == 0 or
(k == "ConfigurationName" and data.get(k) not in WireguardConfigurations.keys())):
return ResponseObject(False, "Please provide the configuration name you want to rename", status_code=404)
if data.get("NewConfigurationName") in WireguardConfigurations.keys(): old_name = data.get("ConfigurationName")
return ResponseObject(False, "Configuration name already exist", status_code=400) new_name = data.get("NewConfigurationName")
rc = WireguardConfigurations.pop(data.get("ConfigurationName")) if not old_name or old_name not in WireguardConfigurations:
return ResponseObject(False, "Please provide a valid configuration name to rename", status_code=404)
if not new_name:
return ResponseObject(False, "Please provide a new configuration name", status=400)
if new_name in WireguardConfigurations:
return ResponseObject(False, "The configuration name already exists", status_code=400)
rc = WireguardConfigurations.pop(old_name)
status, message = rc.renameConfiguration(new_name)
status, message = rc.renameConfiguration(data.get("NewConfigurationName"))
if status: if status:
WireguardConfigurations[data.get("NewConfigurationName")] = (WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data.get("NewConfigurationName")) if rc.Protocol == 'wg' else AmneziaWireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data.get("NewConfigurationName"))) if rc.Protocol == 'wg':
ConfigClass = WireguardConfiguration
else: else:
WireguardConfigurations[data.get("ConfigurationName")] = rc ConfigClass = AmneziaWireguardConfiguration
WireguardConfigurations[new_name] = ConfigClass(
DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, new_name
)
else:
WireguardConfigurations[old_name] = rc
return ResponseObject(status, message) return ResponseObject(status, message)
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRealtimeTraffic') @app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRealtimeTraffic')
def API_getWireguardConfigurationRealtimeTraffic(): def API_getWireguardConfigurationRealtimeTraffic():
configurationName = request.args.get('configurationName') configuration_name = requests.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys(): if not configuration_name or configuration_name not in WireguardConfigurations:
return ResponseObject(False, "Configuration does not exist", status_code=404) return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(data=WireguardConfigurations[configurationName].getRealtimeTrafficUsage())
rt_traffic_usage = WireguardConfigurations[configuration_name]
return ResponseObject(data=rt_traffic_usage)
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationBackup') @app.get(f'{APP_PREFIX}/api/getWireguardConfigurationBackup')
def API_getWireguardConfigurationBackup(): def API_getWireguardConfigurationBackup():