WGDashboard/src/modules/DashboardConfig.py

281 lines
12 KiB
Python
Raw Normal View History

"""
Dashboard Configuration
"""
import configparser, secrets, os, pyotp, ipaddress, bcrypt
from sqlalchemy_utils import database_exists, create_database
import sqlalchemy as db
from datetime import datetime
from typing import Any
from .Utilities import (
GetRemoteEndpoint, ValidateDNSAddress
)
from .DashboardAPIKey import DashboardAPIKey
class DashboardConfig:
DashboardVersion = 'v4.2.3'
ConfigurationPath = os.getenv('CONFIGURATION_PATH', '.')
ConfigurationFilePath = os.path.join(ConfigurationPath, 'wg-dashboard.ini')
def __init__(self):
if not os.path.exists(DashboardConfig.ConfigurationFilePath):
open(DashboardConfig.ConfigurationFilePath, "x")
self.__config = configparser.ConfigParser(strict=False)
self.__config.read_file(open(DashboardConfig.ConfigurationFilePath, "r+"))
self.hiddenAttribute = ["totp_key", "auth_req"]
self.__default = {
"Account": {
"username": "admin",
"password": "admin",
"enable_totp": "false",
"totp_verified": "false",
"totp_key": pyotp.random_base32()
},
"Server": {
"wg_conf_path": "/etc/wireguard",
"awg_conf_path": "/etc/amnezia/amneziawg",
"app_prefix": "",
"app_ip": "0.0.0.0",
"app_port": "10086",
"auth_req": "true",
"version": DashboardConfig.DashboardVersion,
"dashboard_refresh_interval": "60000",
"dashboard_peer_list_display": "grid",
"dashboard_sort": "status",
"dashboard_theme": "dark",
"dashboard_api_key": "false",
"dashboard_language": "en"
},
"Peers": {
"peer_global_DNS": "1.1.1.1",
"peer_endpoint_allowed_ip": "0.0.0.0/0",
"peer_display_mode": "grid",
"remote_endpoint": GetRemoteEndpoint(),
"peer_MTU": "1420",
"peer_keep_alive": "21"
},
"Other": {
"welcome_session": "true"
},
"Database":{
"type": "sqlite",
"host": "",
"port": "",
"username": "",
"password": ""
},
"Email":{
"server": "",
"port": "",
"encryption": "",
"username": "",
"email_password": "",
"send_from": "",
"email_template": ""
},
"WireGuardConfiguration": {
"autostart": ""
}
}
for section, keys in self.__default.items():
for key, value in keys.items():
exist, currentData = self.GetConfig(section, key)
if not exist:
self.SetConfig(section, key, value, True)
self.engine = db.create_engine(self.getConnectionString('wgdashboard'))
self.dbMetadata = db.MetaData()
self.__createAPIKeyTable()
self.DashboardAPIKeys = self.__getAPIKeys()
self.APIAccessed = False
self.SetConfig("Server", "version", DashboardConfig.DashboardVersion)
def getConnectionString(self, database) -> str or None:
sqlitePath = os.path.join(DashboardConfig.ConfigurationPath, "db")
if not os.path.isdir(sqlitePath):
os.mkdir(sqlitePath)
2025-05-24 18:25:52 +08:00
# cn = None
if self.GetConfig("Database", "type")[1] == "postgresql":
cn = f'postgresql+psycopg2://{self.GetConfig("Database", "username")[1]}:{self.GetConfig("Database", "password")[1]}@{self.GetConfig("Database", "host")[1]}/{database}'
2025-05-24 18:25:52 +08:00
elif self.GetConfig("Database", "type")[1] == "mysql":
cn = f'mysql+mysqldb://{self.GetConfig("Database", "username")[1]}:{self.GetConfig("Database", "password")[1]}@{self.GetConfig("Database", "host")[1]}/{database}'
else:
cn = f'sqlite:///{os.path.join(sqlitePath, f"{database}.db")}'
if not database_exists(cn):
create_database(cn)
return cn
def __createAPIKeyTable(self):
self.apiKeyTable = db.Table('DashboardAPIKeys', self.dbMetadata,
2025-05-24 18:25:52 +08:00
db.Column("Key", db.String(255), nullable=False, primary_key=True),
db.Column("CreatedAt",
(db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP),
server_default=db.func.now()
),
db.Column("ExpiredAt",
(db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP)
)
)
self.dbMetadata.create_all(self.engine)
def __getAPIKeys(self) -> list[DashboardAPIKey]:
# keys = sqlSelect("SELECT * FROM DashboardAPIKeys WHERE ExpiredAt IS NULL OR ExpiredAt > datetime('now', 'localtime') ORDER BY CreatedAt DESC").fetchall()
try:
with self.engine.connect() as conn:
keys = conn.execute(self.apiKeyTable.select().where(
db.or_(self.apiKeyTable.columns.ExpiredAt == None, self.apiKeyTable.columns.ExpiredAt > datetime.now())
)).fetchall()
fKeys = []
for k in keys:
fKeys.append(DashboardAPIKey(k[0], k[1].strftime("%Y-%m-%d %H:%M:%S"), (k[2].strftime("%Y-%m-%d %H:%M:%S") if k[2] else None)))
return fKeys
except Exception as e:
print("")
return []
def createAPIKeys(self, ExpiredAt = None):
newKey = secrets.token_urlsafe(32)
# sqlUpdate('INSERT INTO DashboardAPIKeys (Key, ExpiredAt) VALUES (?, ?)', (newKey, ExpiredAt,))
with self.engine.begin() as conn:
conn.execute(
self.apiKeyTable.insert().values({
"Key": newKey,
"ExpiredAt": ExpiredAt
})
)
self.DashboardAPIKeys = self.__getAPIKeys()
def deleteAPIKey(self, key):
# sqlUpdate("UPDATE DashboardAPIKeys SET ExpiredAt = datetime('now', 'localtime') WHERE Key = ?", (key, ))
with self.engine.begin() as conn:
conn.execute(
self.apiKeyTable.update().values({
"ExpiredAt": datetime.now(),
}).where(self.apiKeyTable.columns.Key == key)
)
self.DashboardAPIKeys = self.__getAPIKeys()
def __configValidation(self, section : str, key: str, value: Any) -> [bool, str]:
if (type(value) is str and len(value) == 0
and section not in ['Email', 'WireGuardConfiguration'] and
(section == 'Peer' and key == 'peer_global_dns')):
return False, "Field cannot be empty!"
if section == "Peers" and key == "peer_global_dns" and len(value) > 0:
return ValidateDNSAddress(value)
if section == "Peers" and key == "peer_endpoint_allowed_ip":
value = value.split(",")
for i in value:
i = i.strip()
try:
ipaddress.ip_network(i, strict=False)
except Exception as e:
return False, str(e)
if section == "Server" and key == "wg_conf_path":
if not os.path.exists(value):
return False, f"{value} is not a valid path"
if section == "Account" and key == "password":
if self.GetConfig("Account", "password")[0]:
if not self.__checkPassword(
value["currentPassword"], self.GetConfig("Account", "password")[1].encode("utf-8")):
return False, "Current password does not match."
if value["newPassword"] != value["repeatNewPassword"]:
return False, "New passwords does not match"
return True, ""
def generatePassword(self, plainTextPassword: str):
return bcrypt.hashpw(plainTextPassword.encode("utf-8"), bcrypt.gensalt())
def __checkPassword(self, plainTextPassword: str, hashedPassword: bytes):
return bcrypt.checkpw(plainTextPassword.encode("utf-8"), hashedPassword)
def SetConfig(self, section: str, key: str, value: any, init: bool = False) -> [bool, str]:
if key in self.hiddenAttribute and not init:
return False, None
if not init:
valid, msg = self.__configValidation(section, key, value)
if not valid:
return False, msg
if section == "Account" and key == "password":
if not init:
value = self.generatePassword(value["newPassword"]).decode("utf-8")
else:
value = self.generatePassword(value).decode("utf-8")
if section == "Email" and key == "email_template":
value = value.encode('unicode_escape').decode('utf-8')
if section == "Server" and key == "wg_conf_path":
if not os.path.exists(value):
return False, "Path does not exist"
if section not in self.__config:
if init:
self.__config[section] = {}
else:
return False, "Section does not exist"
if ((key not in self.__config[section].keys() and init) or
(key in self.__config[section].keys())):
if type(value) is bool:
if value:
self.__config[section][key] = "true"
else:
self.__config[section][key] = "false"
elif type(value) in [int, float]:
self.__config[section][key] = str(value)
elif type(value) is list:
self.__config[section][key] = "||".join(value).strip("||")
else:
self.__config[section][key] = value
return self.SaveConfig(), ""
else:
return False, f"{key} does not exist under {section}"
return True, ""
def SaveConfig(self) -> bool:
try:
with open(DashboardConfig.ConfigurationFilePath, "w+", encoding='utf-8') as configFile:
self.__config.write(configFile)
return True
except Exception as e:
return False
def GetConfig(self, section, key) -> [bool, any]:
if section not in self.__config:
return False, None
if key not in self.__config[section]:
return False, None
if section == "Email" and key == "email_template":
return True, self.__config[section][key].encode('utf-8').decode('unicode_escape')
if section == "WireGuardConfiguration" and key == "autostart":
return True, list(filter(lambda x: len(x) > 0, self.__config[section][key].split("||")))
if self.__config[section][key] in ["1", "yes", "true", "on"]:
return True, True
if self.__config[section][key] in ["0", "no", "false", "off"]:
return True, False
return True, self.__config[section][key]
def toJson(self) -> dict[str, dict[Any, Any]]:
the_dict = {}
for section in self.__config.sections():
the_dict[section] = {}
for key, val in self.__config.items(section):
if key not in self.hiddenAttribute:
the_dict[section][key] = self.GetConfig(section, key)[1]
return the_dict