From 2cee252b142e71ba63466a43ab4d6c733bf594f2 Mon Sep 17 00:00:00 2001 From: Donald Zou Date: Wed, 14 May 2025 09:24:29 +0800 Subject: [PATCH] Refactored `DashboardConfig` Refactored this file and moved `DashboardConfig` into its own file --- src/dashboard.py | 298 +-------------------------------- src/modules/DashboardConfig.py | 278 ++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+), 292 deletions(-) create mode 100644 src/modules/DashboardConfig.py diff --git a/src/dashboard.py b/src/dashboard.py index 2f05480..c113310 100644 --- a/src/dashboard.py +++ b/src/dashboard.py @@ -1,45 +1,31 @@ import random, shutil, sqlite3, configparser, hashlib, ipaddress, json, os, secrets, subprocess -import time, re, urllib.error, uuid, bcrypt, psutil, pyotp, threading +import time, re, uuid, bcrypt, psutil, pyotp, threading from uuid import uuid4 from zipfile import ZipFile from datetime import datetime, timedelta -from typing import Any from jinja2 import Template from flask import Flask, request, render_template, session, send_file -from json import JSONEncoder from flask_cors import CORS from icmplib import ping, traceroute from flask.json.provider import DefaultJSONProvider from itertools import islice -from Utilities import ( - RegexMatch, GetRemoteEndpoint, StringToBoolean, +from modules.Utilities import ( + RegexMatch, StringToBoolean, ValidateIPAddressesWithRange, ValidateDNSAddress, GenerateWireguardPublicKey, GenerateWireguardPrivateKey ) from packaging import version from modules.Email import EmailSender -from modules.Log import Log from modules.DashboardLogger import DashboardLogger -from modules.PeerJobLogger import PeerJobLogger from modules.PeerJob import PeerJob from modules.SystemStatus import SystemStatus from modules.PeerShareLinks import PeerShareLinks -from modules.DashboardAPIKey import DashboardAPIKey from modules.PeerJobs import PeerJobs +from modules.DashboardConfig import DashboardConfig SystemStatus = SystemStatus() -from sqlalchemy_utils import database_exists, create_database -import sqlalchemy as db - -DASHBOARD_VERSION = 'v4.2.3' - CONFIGURATION_PATH = os.getenv('CONFIGURATION_PATH', '.') -DB_PATH = os.path.join(CONFIGURATION_PATH, 'db') -if not os.path.isdir(DB_PATH): - os.mkdir(DB_PATH) -DASHBOARD_CONF = os.path.join(CONFIGURATION_PATH, 'wg-dashboard.ini') -UPDATE = None app = Flask("WGDashboard", template_folder=os.path.abspath("./static/app/dist")) app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 5206928 app.secret_key = secrets.token_urlsafe(32) @@ -1512,278 +1498,6 @@ PersistentKeepalive = {str(self.keepalive)} return ResponseObject() except subprocess.CalledProcessError as exc: return ResponseObject(False, exc.output.decode("UTF-8").strip()) - - -""" -Dashboard Configuration -""" -class DashboardConfig: - - def __init__(self): - if not os.path.exists(DASHBOARD_CONF): - open(DASHBOARD_CONF, "x") - self.__config = configparser.ConfigParser(strict=False) - self.__config.read_file(open(DASHBOARD_CONF, "r+")) - self.hiddenAttribute = ["totp_key", "auth_req"] - self.__default = { - "Account": { - "username": "admin", - "password": "admin", - "enable_totp": "false", - "totp_verified": "false", - "totp_key": pyotp.random_base32() - }, - "Server": { - "wg_conf_path": "/etc/wireguard", - "awg_conf_path": "/etc/amnezia/amneziawg", - "app_prefix": "", - "app_ip": "0.0.0.0", - "app_port": "10086", - "auth_req": "true", - "version": DASHBOARD_VERSION, - "dashboard_refresh_interval": "60000", - "dashboard_peer_list_display": "grid", - "dashboard_sort": "status", - "dashboard_theme": "dark", - "dashboard_api_key": "false", - "dashboard_language": "en" - }, - "Peers": { - "peer_global_DNS": "1.1.1.1", - "peer_endpoint_allowed_ip": "0.0.0.0/0", - "peer_display_mode": "grid", - "remote_endpoint": GetRemoteEndpoint(), - "peer_MTU": "1420", - "peer_keep_alive": "21" - }, - "Other": { - "welcome_session": "true" - }, - "Database":{ - "type": "sqlite", - "host": "", - "port": "", - "username": "", - "password": "" - }, - "Email":{ - "server": "", - "port": "", - "encryption": "", - "username": "", - "email_password": "", - "send_from": "", - "email_template": "" - }, - "WireGuardConfiguration": { - "autostart": "" - } - } - - for section, keys in self.__default.items(): - for key, value in keys.items(): - exist, currentData = self.GetConfig(section, key) - if not exist: - self.SetConfig(section, key, value, True) - - self.engine = db.create_engine(self.getConnectionString('wgdashboard')) - self.dbMetadata = db.MetaData() - - - - - self.__createAPIKeyTable() - self.DashboardAPIKeys = self.__getAPIKeys() - self.APIAccessed = False - self.SetConfig("Server", "version", DASHBOARD_VERSION) - - def getConnectionString(self, database) -> str or None: - cn = None - if self.GetConfig("Database", "type")[1] == "sqlite": - cn = f'sqlite:///{os.path.join(CONFIGURATION_PATH, "db", f"{database}.db")}' - elif self.GetConfig("Database", "type")[1] == "postgresql": - cn = f'postgresql+psycopg2://{self.GetConfig("Database", "username")[1]}:{self.GetConfig("Database", "password")[1]}@{self.GetConfig("Database", "host")[1]}/{database}' - - if not database_exists(cn): - create_database(cn) - - return cn - - def __createAPIKeyTable(self): - self.apiKeyTable = db.Table('DashboardAPIKeys', self.dbMetadata, - db.Column("Key", db.String, nullable=False, primary_key=True), - db.Column("CreatedAt", - (db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP), - server_default=db.func.now() - ), - db.Column("ExpiredAt", - (db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP) - ) - ) - self.dbMetadata.create_all(self.engine) - - # existingTable = sqlSelect("SELECT name FROM sqlite_master WHERE type='table' AND name = 'DashboardAPIKeys'").fetchall() - # if len(existingTable) == 0: - # sqlUpdate("CREATE TABLE DashboardAPIKeys (Key VARCHAR NOT NULL PRIMARY KEY, CreatedAt DATETIME NOT NULL DEFAULT (datetime('now', 'localtime')), ExpiredAt VARCHAR)") - - def __getAPIKeys(self) -> list[DashboardAPIKey]: - # keys = sqlSelect("SELECT * FROM DashboardAPIKeys WHERE ExpiredAt IS NULL OR ExpiredAt > datetime('now', 'localtime') ORDER BY CreatedAt DESC").fetchall() - try: - with self.engine.connect() as conn: - keys = conn.execute(self.apiKeyTable.select().where( - db.or_(self.apiKeyTable.columns.ExpiredAt == None, self.apiKeyTable.columns.ExpiredAt > datetime.now()) - )).fetchall() - fKeys = [] - for k in keys: - fKeys.append(DashboardAPIKey(k[0], k[1].strftime("%Y-%m-%d %H:%M:%S"), (k[2].strftime("%Y-%m-%d %H:%M:%S") if k[2] else None))) - return fKeys - except Exception as e: - print("") - return [] - - def createAPIKeys(self, ExpiredAt = None): - newKey = secrets.token_urlsafe(32) - # sqlUpdate('INSERT INTO DashboardAPIKeys (Key, ExpiredAt) VALUES (?, ?)', (newKey, ExpiredAt,)) - with self.engine.begin() as conn: - conn.execute( - self.apiKeyTable.insert().values({ - "Key": newKey, - "ExpiredAt": ExpiredAt - }) - ) - - self.DashboardAPIKeys = self.__getAPIKeys() - - def deleteAPIKey(self, key): - # sqlUpdate("UPDATE DashboardAPIKeys SET ExpiredAt = datetime('now', 'localtime') WHERE Key = ?", (key, )) - with self.engine.begin() as conn: - conn.execute( - self.apiKeyTable.update().values({ - "ExpiredAt": datetime.now(), - }).where(self.apiKeyTable.columns.Key == key) - ) - - self.DashboardAPIKeys = self.__getAPIKeys() - - def __configValidation(self, section : str, key: str, value: Any) -> [bool, str]: - if (type(value) is str and len(value) == 0 - and section not in ['Email', 'WireGuardConfiguration'] and - (section == 'Peer' and key == 'peer_global_dns')): - return False, "Field cannot be empty!" - if section == "Peers" and key == "peer_global_dns" and len(value) > 0: - return ValidateDNSAddress(value) - if section == "Peers" and key == "peer_endpoint_allowed_ip": - value = value.split(",") - for i in value: - i = i.strip() - try: - ipaddress.ip_network(i, strict=False) - except Exception as e: - return False, str(e) - if section == "Server" and key == "wg_conf_path": - if not os.path.exists(value): - return False, f"{value} is not a valid path" - if section == "Account" and key == "password": - if self.GetConfig("Account", "password")[0]: - if not self.__checkPassword( - value["currentPassword"], self.GetConfig("Account", "password")[1].encode("utf-8")): - return False, "Current password does not match." - if value["newPassword"] != value["repeatNewPassword"]: - return False, "New passwords does not match" - return True, "" - - def generatePassword(self, plainTextPassword: str): - return bcrypt.hashpw(plainTextPassword.encode("utf-8"), bcrypt.gensalt()) - - def __checkPassword(self, plainTextPassword: str, hashedPassword: bytes): - return bcrypt.checkpw(plainTextPassword.encode("utf-8"), hashedPassword) - - def SetConfig(self, section: str, key: str, value: any, init: bool = False) -> [bool, str]: - if key in self.hiddenAttribute and not init: - return False, None - - if not init: - valid, msg = self.__configValidation(section, key, value) - if not valid: - return False, msg - - if section == "Account" and key == "password": - if not init: - value = self.generatePassword(value["newPassword"]).decode("utf-8") - else: - value = self.generatePassword(value).decode("utf-8") - - if section == "Email" and key == "email_template": - value = value.encode('unicode_escape').decode('utf-8') - - if section == "Server" and key == "wg_conf_path": - if not os.path.exists(value): - return False, "Path does not exist" - - if section not in self.__config: - if init: - self.__config[section] = {} - else: - return False, "Section does not exist" - - if ((key not in self.__config[section].keys() and init) or - (key in self.__config[section].keys())): - if type(value) is bool: - if value: - self.__config[section][key] = "true" - else: - self.__config[section][key] = "false" - elif type(value) in [int, float]: - self.__config[section][key] = str(value) - elif type(value) is list: - self.__config[section][key] = "||".join(value).strip("||") - else: - self.__config[section][key] = value - return self.SaveConfig(), "" - else: - return False, f"{key} does not exist under {section}" - return True, "" - - def SaveConfig(self) -> bool: - try: - with open(DASHBOARD_CONF, "w+", encoding='utf-8') as configFile: - self.__config.write(configFile) - return True - except Exception as e: - return False - - def GetConfig(self, section, key) -> [bool, any]: - if section not in self.__config: - return False, None - - if key not in self.__config[section]: - return False, None - - if section == "Email" and key == "email_template": - return True, self.__config[section][key].encode('utf-8').decode('unicode_escape') - - if section == "WireGuardConfiguration" and key == "autostart": - return True, list(filter(lambda x: len(x) > 0, self.__config[section][key].split("||"))) - - if self.__config[section][key] in ["1", "yes", "true", "on"]: - return True, True - - if self.__config[section][key] in ["0", "no", "false", "off"]: - return True, False - - - return True, self.__config[section][key] - - def toJson(self) -> dict[str, dict[Any, Any]]: - the_dict = {} - - for section in self.__config.sections(): - the_dict[section] = {} - for key, val in self.__config.items(section): - if key not in self.hiddenAttribute: - the_dict[section][key] = self.GetConfig(section, key)[1] - return the_dict - """ Database Connection Functions @@ -2753,7 +2467,7 @@ def API_getDashboardUpdate(): tagName = data.get('tag_name') htmlUrl = data.get('html_url') if tagName is not None and htmlUrl is not None: - if version.parse(tagName) > version.parse(DASHBOARD_VERSION): + if version.parse(tagName) > version.parse(DashboardConfig.DashboardVersion): return ResponseObject(message=f"{tagName} is now available for update!", data=htmlUrl) else: return ResponseObject(message="You're on the latest version") @@ -3003,7 +2717,7 @@ AmneziaWireguardConfigurations: dict[str, AmneziaWireguardConfiguration] = {} AllPeerShareLinks: PeerShareLinks = PeerShareLinks(DashboardConfig) AllPeerJobs: PeerJobs = PeerJobs(DashboardConfig, WireguardConfigurations) -DashboardLogger: DashboardLogger = DashboardLogger(CONFIGURATION_PATH, DashboardConfig) +DashboardLogger: DashboardLogger = DashboardLogger(DashboardConfig) InitWireguardConfigurationsList(startup=True) diff --git a/src/modules/DashboardConfig.py b/src/modules/DashboardConfig.py new file mode 100644 index 0000000..5917355 --- /dev/null +++ b/src/modules/DashboardConfig.py @@ -0,0 +1,278 @@ +""" +Dashboard Configuration +""" +import configparser, secrets, os, pyotp, ipaddress, bcrypt +from sqlalchemy_utils import database_exists, create_database +import sqlalchemy as db +from datetime import datetime +from typing import Any +from .Utilities import ( + GetRemoteEndpoint, ValidateDNSAddress +) +from .DashboardAPIKey import DashboardAPIKey + + +class DashboardConfig: + DashboardVersion = 'v4.2.3' + ConfigurationPath = os.getenv('CONFIGURATION_PATH', '.') + ConfigurationFilePath = os.path.join(ConfigurationPath, 'wg-dashboard.ini') + + def __init__(self): + if not os.path.exists(DashboardConfig.ConfigurationFilePath): + open(DashboardConfig.ConfigurationFilePath, "x") + self.__config = configparser.ConfigParser(strict=False) + self.__config.read_file(open(DashboardConfig.ConfigurationFilePath, "r+")) + self.hiddenAttribute = ["totp_key", "auth_req"] + self.__default = { + "Account": { + "username": "admin", + "password": "admin", + "enable_totp": "false", + "totp_verified": "false", + "totp_key": pyotp.random_base32() + }, + "Server": { + "wg_conf_path": "/etc/wireguard", + "awg_conf_path": "/etc/amnezia/amneziawg", + "app_prefix": "", + "app_ip": "0.0.0.0", + "app_port": "10086", + "auth_req": "true", + "version": DashboardConfig.DashboardVersion, + "dashboard_refresh_interval": "60000", + "dashboard_peer_list_display": "grid", + "dashboard_sort": "status", + "dashboard_theme": "dark", + "dashboard_api_key": "false", + "dashboard_language": "en" + }, + "Peers": { + "peer_global_DNS": "1.1.1.1", + "peer_endpoint_allowed_ip": "0.0.0.0/0", + "peer_display_mode": "grid", + "remote_endpoint": GetRemoteEndpoint(), + "peer_MTU": "1420", + "peer_keep_alive": "21" + }, + "Other": { + "welcome_session": "true" + }, + "Database":{ + "type": "sqlite", + "host": "", + "port": "", + "username": "", + "password": "" + }, + "Email":{ + "server": "", + "port": "", + "encryption": "", + "username": "", + "email_password": "", + "send_from": "", + "email_template": "" + }, + "WireGuardConfiguration": { + "autostart": "" + } + } + + for section, keys in self.__default.items(): + for key, value in keys.items(): + exist, currentData = self.GetConfig(section, key) + if not exist: + self.SetConfig(section, key, value, True) + + self.engine = db.create_engine(self.getConnectionString('wgdashboard')) + self.dbMetadata = db.MetaData() + self.__createAPIKeyTable() + self.DashboardAPIKeys = self.__getAPIKeys() + self.APIAccessed = False + self.SetConfig("Server", "version", DashboardConfig.DashboardVersion) + + def getConnectionString(self, database) -> str or None: + sqlitePath = os.path.join(DashboardConfig.ConfigurationPath, "db") + + if not os.path.isdir(sqlitePath): + os.mkdir(sqlitePath) + + cn = None + if self.GetConfig("Database", "type")[1] == "sqlite": + cn = f'sqlite:///{os.path.join(sqlitePath, f"{database}.db")}' + elif self.GetConfig("Database", "type")[1] == "postgresql": + cn = f'postgresql+psycopg2://{self.GetConfig("Database", "username")[1]}:{self.GetConfig("Database", "password")[1]}@{self.GetConfig("Database", "host")[1]}/{database}' + + if not database_exists(cn): + create_database(cn) + return cn + + def __createAPIKeyTable(self): + self.apiKeyTable = db.Table('DashboardAPIKeys', self.dbMetadata, + db.Column("Key", db.String, nullable=False, primary_key=True), + db.Column("CreatedAt", + (db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP), + server_default=db.func.now() + ), + db.Column("ExpiredAt", + (db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP) + ) + ) + self.dbMetadata.create_all(self.engine) + def __getAPIKeys(self) -> list[DashboardAPIKey]: + # keys = sqlSelect("SELECT * FROM DashboardAPIKeys WHERE ExpiredAt IS NULL OR ExpiredAt > datetime('now', 'localtime') ORDER BY CreatedAt DESC").fetchall() + try: + with self.engine.connect() as conn: + keys = conn.execute(self.apiKeyTable.select().where( + db.or_(self.apiKeyTable.columns.ExpiredAt == None, self.apiKeyTable.columns.ExpiredAt > datetime.now()) + )).fetchall() + fKeys = [] + for k in keys: + fKeys.append(DashboardAPIKey(k[0], k[1].strftime("%Y-%m-%d %H:%M:%S"), (k[2].strftime("%Y-%m-%d %H:%M:%S") if k[2] else None))) + return fKeys + except Exception as e: + print("") + return [] + + def createAPIKeys(self, ExpiredAt = None): + newKey = secrets.token_urlsafe(32) + # sqlUpdate('INSERT INTO DashboardAPIKeys (Key, ExpiredAt) VALUES (?, ?)', (newKey, ExpiredAt,)) + with self.engine.begin() as conn: + conn.execute( + self.apiKeyTable.insert().values({ + "Key": newKey, + "ExpiredAt": ExpiredAt + }) + ) + + self.DashboardAPIKeys = self.__getAPIKeys() + + def deleteAPIKey(self, key): + # sqlUpdate("UPDATE DashboardAPIKeys SET ExpiredAt = datetime('now', 'localtime') WHERE Key = ?", (key, )) + with self.engine.begin() as conn: + conn.execute( + self.apiKeyTable.update().values({ + "ExpiredAt": datetime.now(), + }).where(self.apiKeyTable.columns.Key == key) + ) + + self.DashboardAPIKeys = self.__getAPIKeys() + + def __configValidation(self, section : str, key: str, value: Any) -> [bool, str]: + if (type(value) is str and len(value) == 0 + and section not in ['Email', 'WireGuardConfiguration'] and + (section == 'Peer' and key == 'peer_global_dns')): + return False, "Field cannot be empty!" + if section == "Peers" and key == "peer_global_dns" and len(value) > 0: + return ValidateDNSAddress(value) + if section == "Peers" and key == "peer_endpoint_allowed_ip": + value = value.split(",") + for i in value: + i = i.strip() + try: + ipaddress.ip_network(i, strict=False) + except Exception as e: + return False, str(e) + if section == "Server" and key == "wg_conf_path": + if not os.path.exists(value): + return False, f"{value} is not a valid path" + if section == "Account" and key == "password": + if self.GetConfig("Account", "password")[0]: + if not self.__checkPassword( + value["currentPassword"], self.GetConfig("Account", "password")[1].encode("utf-8")): + return False, "Current password does not match." + if value["newPassword"] != value["repeatNewPassword"]: + return False, "New passwords does not match" + return True, "" + + def generatePassword(self, plainTextPassword: str): + return bcrypt.hashpw(plainTextPassword.encode("utf-8"), bcrypt.gensalt()) + + def __checkPassword(self, plainTextPassword: str, hashedPassword: bytes): + return bcrypt.checkpw(plainTextPassword.encode("utf-8"), hashedPassword) + + def SetConfig(self, section: str, key: str, value: any, init: bool = False) -> [bool, str]: + if key in self.hiddenAttribute and not init: + return False, None + + if not init: + valid, msg = self.__configValidation(section, key, value) + if not valid: + return False, msg + + if section == "Account" and key == "password": + if not init: + value = self.generatePassword(value["newPassword"]).decode("utf-8") + else: + value = self.generatePassword(value).decode("utf-8") + + if section == "Email" and key == "email_template": + value = value.encode('unicode_escape').decode('utf-8') + + if section == "Server" and key == "wg_conf_path": + if not os.path.exists(value): + return False, "Path does not exist" + + if section not in self.__config: + if init: + self.__config[section] = {} + else: + return False, "Section does not exist" + + if ((key not in self.__config[section].keys() and init) or + (key in self.__config[section].keys())): + if type(value) is bool: + if value: + self.__config[section][key] = "true" + else: + self.__config[section][key] = "false" + elif type(value) in [int, float]: + self.__config[section][key] = str(value) + elif type(value) is list: + self.__config[section][key] = "||".join(value).strip("||") + else: + self.__config[section][key] = value + return self.SaveConfig(), "" + else: + return False, f"{key} does not exist under {section}" + return True, "" + + def SaveConfig(self) -> bool: + try: + with open(DashboardConfig.ConfigurationFilePath, "w+", encoding='utf-8') as configFile: + self.__config.write(configFile) + return True + except Exception as e: + return False + + def GetConfig(self, section, key) -> [bool, any]: + if section not in self.__config: + return False, None + + if key not in self.__config[section]: + return False, None + + if section == "Email" and key == "email_template": + return True, self.__config[section][key].encode('utf-8').decode('unicode_escape') + + if section == "WireGuardConfiguration" and key == "autostart": + return True, list(filter(lambda x: len(x) > 0, self.__config[section][key].split("||"))) + + if self.__config[section][key] in ["1", "yes", "true", "on"]: + return True, True + + if self.__config[section][key] in ["0", "no", "false", "off"]: + return True, False + + + return True, self.__config[section][key] + + def toJson(self) -> dict[str, dict[Any, Any]]: + the_dict = {} + + for section in self.__config.sections(): + the_dict[section] = {} + for key, val in self.__config.items(section): + if key not in self.hiddenAttribute: + the_dict[section][key] = self.GetConfig(section, key)[1] + return the_dict