mirror of
https://github.com/donaldzou/WGDashboard.git
synced 2025-06-28 09:16:55 +00:00
279 lines
11 KiB
Python
279 lines
11 KiB
Python
"""
|
|
Dashboard Configuration
|
|
"""
|
|
import configparser, secrets, os, pyotp, ipaddress, bcrypt
|
|
from sqlalchemy_utils import database_exists, create_database
|
|
import sqlalchemy as db
|
|
from datetime import datetime
|
|
from typing import Any
|
|
from .Utilities import (
|
|
GetRemoteEndpoint, ValidateDNSAddress
|
|
)
|
|
from .DashboardAPIKey import DashboardAPIKey
|
|
|
|
|
|
class DashboardConfig:
|
|
DashboardVersion = 'v4.2.3'
|
|
ConfigurationPath = os.getenv('CONFIGURATION_PATH', '.')
|
|
ConfigurationFilePath = os.path.join(ConfigurationPath, 'wg-dashboard.ini')
|
|
|
|
def __init__(self):
|
|
if not os.path.exists(DashboardConfig.ConfigurationFilePath):
|
|
open(DashboardConfig.ConfigurationFilePath, "x")
|
|
self.__config = configparser.ConfigParser(strict=False)
|
|
self.__config.read_file(open(DashboardConfig.ConfigurationFilePath, "r+"))
|
|
self.hiddenAttribute = ["totp_key", "auth_req"]
|
|
self.__default = {
|
|
"Account": {
|
|
"username": "admin",
|
|
"password": "admin",
|
|
"enable_totp": "false",
|
|
"totp_verified": "false",
|
|
"totp_key": pyotp.random_base32()
|
|
},
|
|
"Server": {
|
|
"wg_conf_path": "/etc/wireguard",
|
|
"awg_conf_path": "/etc/amnezia/amneziawg",
|
|
"app_prefix": "",
|
|
"app_ip": "0.0.0.0",
|
|
"app_port": "10086",
|
|
"auth_req": "true",
|
|
"version": DashboardConfig.DashboardVersion,
|
|
"dashboard_refresh_interval": "60000",
|
|
"dashboard_peer_list_display": "grid",
|
|
"dashboard_sort": "status",
|
|
"dashboard_theme": "dark",
|
|
"dashboard_api_key": "false",
|
|
"dashboard_language": "en"
|
|
},
|
|
"Peers": {
|
|
"peer_global_DNS": "1.1.1.1",
|
|
"peer_endpoint_allowed_ip": "0.0.0.0/0",
|
|
"peer_display_mode": "grid",
|
|
"remote_endpoint": GetRemoteEndpoint(),
|
|
"peer_MTU": "1420",
|
|
"peer_keep_alive": "21"
|
|
},
|
|
"Other": {
|
|
"welcome_session": "true"
|
|
},
|
|
"Database":{
|
|
"type": "sqlite",
|
|
"host": "",
|
|
"port": "",
|
|
"username": "",
|
|
"password": ""
|
|
},
|
|
"Email":{
|
|
"server": "",
|
|
"port": "",
|
|
"encryption": "",
|
|
"username": "",
|
|
"email_password": "",
|
|
"send_from": "",
|
|
"email_template": ""
|
|
},
|
|
"WireGuardConfiguration": {
|
|
"autostart": ""
|
|
}
|
|
}
|
|
|
|
for section, keys in self.__default.items():
|
|
for key, value in keys.items():
|
|
exist, currentData = self.GetConfig(section, key)
|
|
if not exist:
|
|
self.SetConfig(section, key, value, True)
|
|
|
|
self.engine = db.create_engine(self.getConnectionString('wgdashboard'))
|
|
self.dbMetadata = db.MetaData()
|
|
self.__createAPIKeyTable()
|
|
self.DashboardAPIKeys = self.__getAPIKeys()
|
|
self.APIAccessed = False
|
|
self.SetConfig("Server", "version", DashboardConfig.DashboardVersion)
|
|
|
|
def getConnectionString(self, database) -> str or None:
|
|
sqlitePath = os.path.join(DashboardConfig.ConfigurationPath, "db")
|
|
|
|
if not os.path.isdir(sqlitePath):
|
|
os.mkdir(sqlitePath)
|
|
|
|
cn = None
|
|
if self.GetConfig("Database", "type")[1] == "sqlite":
|
|
cn = f'sqlite:///{os.path.join(sqlitePath, f"{database}.db")}'
|
|
elif self.GetConfig("Database", "type")[1] == "postgresql":
|
|
cn = f'postgresql+psycopg2://{self.GetConfig("Database", "username")[1]}:{self.GetConfig("Database", "password")[1]}@{self.GetConfig("Database", "host")[1]}/{database}'
|
|
|
|
if not database_exists(cn):
|
|
create_database(cn)
|
|
return cn
|
|
|
|
def __createAPIKeyTable(self):
|
|
self.apiKeyTable = db.Table('DashboardAPIKeys', self.dbMetadata,
|
|
db.Column("Key", db.String, nullable=False, primary_key=True),
|
|
db.Column("CreatedAt",
|
|
(db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP),
|
|
server_default=db.func.now()
|
|
),
|
|
db.Column("ExpiredAt",
|
|
(db.DATETIME if self.GetConfig('Database', 'type')[1] == 'sqlite' else db.TIMESTAMP)
|
|
)
|
|
)
|
|
self.dbMetadata.create_all(self.engine)
|
|
def __getAPIKeys(self) -> list[DashboardAPIKey]:
|
|
# keys = sqlSelect("SELECT * FROM DashboardAPIKeys WHERE ExpiredAt IS NULL OR ExpiredAt > datetime('now', 'localtime') ORDER BY CreatedAt DESC").fetchall()
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
keys = conn.execute(self.apiKeyTable.select().where(
|
|
db.or_(self.apiKeyTable.columns.ExpiredAt == None, self.apiKeyTable.columns.ExpiredAt > datetime.now())
|
|
)).fetchall()
|
|
fKeys = []
|
|
for k in keys:
|
|
fKeys.append(DashboardAPIKey(k[0], k[1].strftime("%Y-%m-%d %H:%M:%S"), (k[2].strftime("%Y-%m-%d %H:%M:%S") if k[2] else None)))
|
|
return fKeys
|
|
except Exception as e:
|
|
print("")
|
|
return []
|
|
|
|
def createAPIKeys(self, ExpiredAt = None):
|
|
newKey = secrets.token_urlsafe(32)
|
|
# sqlUpdate('INSERT INTO DashboardAPIKeys (Key, ExpiredAt) VALUES (?, ?)', (newKey, ExpiredAt,))
|
|
with self.engine.begin() as conn:
|
|
conn.execute(
|
|
self.apiKeyTable.insert().values({
|
|
"Key": newKey,
|
|
"ExpiredAt": ExpiredAt
|
|
})
|
|
)
|
|
|
|
self.DashboardAPIKeys = self.__getAPIKeys()
|
|
|
|
def deleteAPIKey(self, key):
|
|
# sqlUpdate("UPDATE DashboardAPIKeys SET ExpiredAt = datetime('now', 'localtime') WHERE Key = ?", (key, ))
|
|
with self.engine.begin() as conn:
|
|
conn.execute(
|
|
self.apiKeyTable.update().values({
|
|
"ExpiredAt": datetime.now(),
|
|
}).where(self.apiKeyTable.columns.Key == key)
|
|
)
|
|
|
|
self.DashboardAPIKeys = self.__getAPIKeys()
|
|
|
|
def __configValidation(self, section : str, key: str, value: Any) -> [bool, str]:
|
|
if (type(value) is str and len(value) == 0
|
|
and section not in ['Email', 'WireGuardConfiguration'] and
|
|
(section == 'Peer' and key == 'peer_global_dns')):
|
|
return False, "Field cannot be empty!"
|
|
if section == "Peers" and key == "peer_global_dns" and len(value) > 0:
|
|
return ValidateDNSAddress(value)
|
|
if section == "Peers" and key == "peer_endpoint_allowed_ip":
|
|
value = value.split(",")
|
|
for i in value:
|
|
i = i.strip()
|
|
try:
|
|
ipaddress.ip_network(i, strict=False)
|
|
except Exception as e:
|
|
return False, str(e)
|
|
if section == "Server" and key == "wg_conf_path":
|
|
if not os.path.exists(value):
|
|
return False, f"{value} is not a valid path"
|
|
if section == "Account" and key == "password":
|
|
if self.GetConfig("Account", "password")[0]:
|
|
if not self.__checkPassword(
|
|
value["currentPassword"], self.GetConfig("Account", "password")[1].encode("utf-8")):
|
|
return False, "Current password does not match."
|
|
if value["newPassword"] != value["repeatNewPassword"]:
|
|
return False, "New passwords does not match"
|
|
return True, ""
|
|
|
|
def generatePassword(self, plainTextPassword: str):
|
|
return bcrypt.hashpw(plainTextPassword.encode("utf-8"), bcrypt.gensalt())
|
|
|
|
def __checkPassword(self, plainTextPassword: str, hashedPassword: bytes):
|
|
return bcrypt.checkpw(plainTextPassword.encode("utf-8"), hashedPassword)
|
|
|
|
def SetConfig(self, section: str, key: str, value: any, init: bool = False) -> [bool, str]:
|
|
if key in self.hiddenAttribute and not init:
|
|
return False, None
|
|
|
|
if not init:
|
|
valid, msg = self.__configValidation(section, key, value)
|
|
if not valid:
|
|
return False, msg
|
|
|
|
if section == "Account" and key == "password":
|
|
if not init:
|
|
value = self.generatePassword(value["newPassword"]).decode("utf-8")
|
|
else:
|
|
value = self.generatePassword(value).decode("utf-8")
|
|
|
|
if section == "Email" and key == "email_template":
|
|
value = value.encode('unicode_escape').decode('utf-8')
|
|
|
|
if section == "Server" and key == "wg_conf_path":
|
|
if not os.path.exists(value):
|
|
return False, "Path does not exist"
|
|
|
|
if section not in self.__config:
|
|
if init:
|
|
self.__config[section] = {}
|
|
else:
|
|
return False, "Section does not exist"
|
|
|
|
if ((key not in self.__config[section].keys() and init) or
|
|
(key in self.__config[section].keys())):
|
|
if type(value) is bool:
|
|
if value:
|
|
self.__config[section][key] = "true"
|
|
else:
|
|
self.__config[section][key] = "false"
|
|
elif type(value) in [int, float]:
|
|
self.__config[section][key] = str(value)
|
|
elif type(value) is list:
|
|
self.__config[section][key] = "||".join(value).strip("||")
|
|
else:
|
|
self.__config[section][key] = value
|
|
return self.SaveConfig(), ""
|
|
else:
|
|
return False, f"{key} does not exist under {section}"
|
|
return True, ""
|
|
|
|
def SaveConfig(self) -> bool:
|
|
try:
|
|
with open(DashboardConfig.ConfigurationFilePath, "w+", encoding='utf-8') as configFile:
|
|
self.__config.write(configFile)
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
|
|
def GetConfig(self, section, key) -> [bool, any]:
|
|
if section not in self.__config:
|
|
return False, None
|
|
|
|
if key not in self.__config[section]:
|
|
return False, None
|
|
|
|
if section == "Email" and key == "email_template":
|
|
return True, self.__config[section][key].encode('utf-8').decode('unicode_escape')
|
|
|
|
if section == "WireGuardConfiguration" and key == "autostart":
|
|
return True, list(filter(lambda x: len(x) > 0, self.__config[section][key].split("||")))
|
|
|
|
if self.__config[section][key] in ["1", "yes", "true", "on"]:
|
|
return True, True
|
|
|
|
if self.__config[section][key] in ["0", "no", "false", "off"]:
|
|
return True, False
|
|
|
|
|
|
return True, self.__config[section][key]
|
|
|
|
def toJson(self) -> dict[str, dict[Any, Any]]:
|
|
the_dict = {}
|
|
|
|
for section in self.__config.sections():
|
|
the_dict[section] = {}
|
|
for key, val in self.__config.items(section):
|
|
if key not in self.hiddenAttribute:
|
|
the_dict[section][key] = self.GetConfig(section, key)[1]
|
|
return the_dict
|