Files
WGDashboard/src/dashboard.py

1790 lines
79 KiB
Python
Raw Normal View History

2025-07-02 18:45:43 +08:00
import logging
import random, shutil, sqlite3, configparser, hashlib, ipaddress, json, os, secrets, subprocess
import time, re, uuid, bcrypt, psutil, pyotp, threading
import traceback
from functools import wraps
from urllib.parse import unquote
2025-01-13 16:47:15 +08:00
from uuid import uuid4
2024-12-26 00:06:37 +08:00
from zipfile import ZipFile
2021-12-28 22:53:51 +03:00
from datetime import datetime, timedelta
import sqlalchemy
2025-01-13 16:47:15 +08:00
from jinja2 import Template
from flask import Flask, request, render_template, session, send_file, current_app
2024-07-31 02:27:44 -04:00
from flask_cors import CORS
2021-12-26 02:26:39 +03:00
from icmplib import ping, traceroute
2024-06-18 03:16:42 +08:00
from flask.json.provider import DefaultJSONProvider
2025-02-16 17:42:32 +08:00
from itertools import islice
2025-07-10 23:39:21 +08:00
from sqlalchemy import RowMapping
from modules.Utilities import (
2026-02-07 03:03:35 +01:00
RegexMatch, StringToBoolean, ValidateDNSAddress,
2024-11-25 22:11:51 +08:00
GenerateWireguardPublicKey, GenerateWireguardPrivateKey
)
from packaging import version
from modules.Email import EmailSender
from modules.DashboardLogger import DashboardLogger
2025-01-22 15:46:04 +08:00
from modules.PeerJob import PeerJob
2025-02-08 15:45:09 +08:00
from modules.SystemStatus import SystemStatus
from modules.PeerShareLinks import PeerShareLinks
from modules.PeerJobs import PeerJobs
from modules.DashboardConfig import DashboardConfig
from modules.WireguardConfiguration import WireguardConfiguration
2026-02-07 03:03:35 +01:00
from modules.AmneziaConfiguration import AmneziaConfiguration
2025-06-02 12:04:01 +08:00
from client import createClientBlueprint
2025-07-02 18:45:43 +08:00
from logging.config import dictConfig
2025-07-10 23:39:21 +08:00
from modules.DashboardClients import DashboardClients
2025-08-14 17:14:52 +08:00
from modules.DashboardPlugins import DashboardPlugins
2025-08-20 16:47:07 +08:00
from modules.DashboardWebHooks import DashboardWebHooks
2025-09-08 15:12:16 +08:00
from modules.NewConfigurationTemplates import NewConfigurationTemplates
2025-07-10 23:39:21 +08:00
2024-06-18 03:16:42 +08:00
class CustomJsonEncoder(DefaultJSONProvider):
def __init__(self, app):
super().__init__(app)
def default(self, o):
if callable(getattr(o, "toJson", None)):
2024-06-18 03:16:42 +08:00
return o.toJson()
2025-07-10 23:39:21 +08:00
if type(o) is RowMapping:
return dict(o)
2025-08-22 18:26:31 +08:00
if type(o) is datetime:
return o.strftime("%Y-%m-%d %H:%M:%S")
2025-03-12 00:44:36 +08:00
return super().default(self)
2025-09-16 07:46:25 +08:00
2024-06-18 03:16:42 +08:00
2024-11-25 22:11:51 +08:00
'''
Response Object
'''
2025-04-19 02:54:47 +08:00
def ResponseObject(status=True, message=None, data=None, status_code = 200) -> Flask.response_class:
2024-11-25 22:11:51 +08:00
response = Flask.make_response(app, {
"status": status,
"message": message,
"data": data
})
2025-04-19 02:54:47 +08:00
response.status_code = status_code
2024-11-25 22:11:51 +08:00
response.content_type = "application/json"
2025-09-16 07:46:25 +08:00
return response
def require_fields(*fields):
"""Decorator that validates required fields in request.json."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
data = request.json
if data is None:
return ResponseObject(False, "Request body must be JSON", status_code=400)
missing = [field for field in fields if field not in data]
if missing:
return ResponseObject(False, f"Missing required fields: {', '.join(missing)}", status_code=400)
return f(*args, **kwargs)
return decorated
return decorator
2025-09-16 07:46:25 +08:00
'''
Flask App
'''
_, APP_PREFIX_INIT = DashboardConfig().GetConfig("Server", "app_prefix")
app = Flask("WGDashboard",
template_folder=os.path.abspath("./static/dist/WGDashboardAdmin"),
static_folder=os.path.abspath("./static/dist/WGDashboardAdmin"),
static_url_path=APP_PREFIX_INIT if APP_PREFIX_INIT else '')
2025-09-16 07:46:25 +08:00
def peerInformationBackgroundThread():
global WireguardConfigurations
app.logger.info("Background Thread #1 Started")
app.logger.info("Background Thread #1 PID:" + str(threading.get_native_id()))
delay = 6
time.sleep(10)
while True:
with app.app_context():
try:
curKeys = list(WireguardConfigurations.keys())
for name in curKeys:
if name in WireguardConfigurations.keys() and WireguardConfigurations.get(name) is not None:
c = WireguardConfigurations.get(name)
if c.getStatus():
c.getPeersLatestHandshake()
c.getPeersTransfer()
c.getPeersEndpoint()
c.getPeers()
if DashboardConfig.GetConfig('WireGuardConfiguration', 'peer_tracking')[1] is True:
print("[WGDashboard] Tracking Peers")
if delay == 6:
if c.configurationInfo.PeerTrafficTracking:
c.logPeersTraffic()
if c.configurationInfo.PeerHistoricalEndpointTracking:
c.logPeersHistoryEndpoint()
2025-09-16 07:46:25 +08:00
c.getRestrictedPeersList()
except Exception as e:
app.logger.error(f"[WGDashboard] Background Thread #1 Error", e)
if delay == 6:
delay = 1
else:
delay += 1
time.sleep(10)
def peerJobScheduleBackgroundThread():
with app.app_context():
app.logger.info(f"Background Thread #2 Started")
app.logger.info(f"Background Thread #2 PID:" + str(threading.get_native_id()))
time.sleep(10)
while True:
try:
AllPeerJobs.runJob()
time.sleep(180)
except Exception as e:
app.logger.error("Background Thread #2 Error", e)
def gunicornConfig():
_, app_ip = DashboardConfig.GetConfig("Server", "app_ip")
_, app_port = DashboardConfig.GetConfig("Server", "app_port")
return app_ip, app_port
def ProtocolsEnabled() -> list[str]:
from shutil import which
protocols = []
if which('awg') is not None and which('awg-quick') is not None:
protocols.append("awg")
if which('wg') is not None and which('wg-quick') is not None:
protocols.append("wg")
return protocols
def InitWireguardConfigurationsList(startup: bool = False):
if os.path.exists(DashboardConfig.GetConfig("Server", "wg_conf_path")[1]):
confs = os.listdir(DashboardConfig.GetConfig("Server", "wg_conf_path")[1])
confs.sort()
for i in confs:
if RegexMatch("^(.{1,}).(conf)$", i):
i = i.replace('.conf', '')
try:
if i in WireguardConfigurations.keys():
if WireguardConfigurations[i].configurationFileChanged():
with app.app_context():
WireguardConfigurations[i] = WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, i)
else:
with app.app_context():
WireguardConfigurations[i] = WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, i, startup=startup)
except WireguardConfiguration.InvalidConfigurationFileException as e:
app.logger.error(f"{i} have an invalid configuration file.")
if "awg" in ProtocolsEnabled():
confs = os.listdir(DashboardConfig.GetConfig("Server", "awg_conf_path")[1])
confs.sort()
for i in confs:
if RegexMatch("^(.{1,}).(conf)$", i):
i = i.replace('.conf', '')
try:
if i in WireguardConfigurations.keys():
if WireguardConfigurations[i].configurationFileChanged():
with app.app_context():
2026-02-07 03:03:35 +01:00
WireguardConfigurations[i] = AmneziaConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, i)
2025-09-16 07:46:25 +08:00
else:
with app.app_context():
2026-02-07 03:03:35 +01:00
WireguardConfigurations[i] = AmneziaConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, i, startup=startup)
2025-09-16 07:46:25 +08:00
except WireguardConfiguration.InvalidConfigurationFileException as e:
app.logger.error(f"{i} have an invalid configuration file.")
def startThreads():
bgThread = threading.Thread(target=peerInformationBackgroundThread, daemon=True)
bgThread.start()
scheduleJobThread = threading.Thread(target=peerJobScheduleBackgroundThread, daemon=True)
scheduleJobThread.start()
dictConfig({
'version': 1,
'formatters': {'default': {
'format': '[%(asctime)s] [%(levelname)s] in [%(module)s] %(message)s',
}},
'root': {
'level': 'INFO'
}
})
WireguardConfigurations: dict[str, WireguardConfiguration] = {}
CONFIGURATION_PATH = os.getenv('CONFIGURATION_PATH', '.')
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 5206928
app.secret_key = secrets.token_urlsafe(32)
app.json = CustomJsonEncoder(app)
with app.app_context():
SystemStatus = SystemStatus()
DashboardConfig = DashboardConfig()
EmailSender = EmailSender(DashboardConfig)
AllPeerShareLinks: PeerShareLinks = PeerShareLinks(DashboardConfig, WireguardConfigurations)
2025-10-24 08:47:02 +08:00
AllPeerJobs: PeerJobs = PeerJobs(DashboardConfig, WireguardConfigurations, AllPeerShareLinks)
2025-09-16 07:46:25 +08:00
DashboardLogger: DashboardLogger = DashboardLogger()
DashboardPlugins: DashboardPlugins = DashboardPlugins(app, WireguardConfigurations)
DashboardWebHooks: DashboardWebHooks = DashboardWebHooks(DashboardConfig)
NewConfigurationTemplates: NewConfigurationTemplates = NewConfigurationTemplates()
InitWireguardConfigurationsList(startup=True)
DashboardClients: DashboardClients = DashboardClients(WireguardConfigurations)
app.register_blueprint(createClientBlueprint(WireguardConfigurations, DashboardConfig, DashboardClients))
2025-01-08 18:09:05 +08:00
2024-08-14 01:17:47 -04:00
_, APP_PREFIX = DashboardConfig.GetConfig("Server", "app_prefix")
cors = CORS(app, resources={rf"{APP_PREFIX}/api/*": {
"origins": "*",
"methods": "DELETE, POST, GET, OPTIONS",
2025-08-29 16:55:33 +08:00
"allow_headers": ["Content-Type", "wg-dashboard-apikey"]
2024-08-14 01:17:47 -04:00
}})
2025-09-16 07:46:25 +08:00
_, app_ip = DashboardConfig.GetConfig("Server", "app_ip")
_, app_port = DashboardConfig.GetConfig("Server", "app_port")
_, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path")
2024-08-14 01:17:47 -04:00
2024-06-18 03:16:42 +08:00
'''
API Routes
'''
2021-05-04 01:32:34 -04:00
2024-06-18 03:16:42 +08:00
@app.before_request
def auth_req():
2024-08-10 19:03:21 -04:00
if request.method.lower() == 'options':
2024-11-24 00:22:33 +08:00
return ResponseObject(True)
2024-08-11 01:48:13 -04:00
2025-12-01 10:15:59 +08:00
DashboardConfig.APIAccessed = False
2024-06-18 03:16:42 +08:00
authenticationRequired = DashboardConfig.GetConfig("Server", "auth_req")[1]
2024-08-02 17:27:28 -04:00
d = request.headers
2024-06-18 03:16:42 +08:00
if authenticationRequired:
2024-08-02 17:27:28 -04:00
apiKey = d.get('wg-dashboard-apikey')
2024-07-31 02:27:44 -04:00
apiKeyEnabled = DashboardConfig.GetConfig("Server", "dashboard_api_key")[1]
if apiKey is not None and len(apiKey) > 0 and apiKeyEnabled:
apiKeyExist = len(list(filter(lambda x : x.Key == apiKey, DashboardConfig.DashboardAPIKeys))) == 1
2024-08-03 17:03:39 -04:00
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"API Key Access: {('true' if apiKeyExist else 'false')} - Key: {apiKey}")
2024-07-31 02:27:44 -04:00
if not apiKeyExist:
2024-08-11 01:48:13 -04:00
DashboardConfig.APIAccessed = False
2024-07-31 02:27:44 -04:00
response = Flask.make_response(app, {
"status": False,
"message": "API Key does not exist",
"data": None
})
response.content_type = "application/json"
response.status_code = 401
return response
2024-08-11 01:48:13 -04:00
DashboardConfig.APIAccessed = True
2024-07-31 02:27:44 -04:00
else:
2024-08-11 01:48:13 -04:00
DashboardConfig.APIAccessed = False
2026-03-29 14:30:24 +08:00
appPrefix = APP_PREFIX if len(APP_PREFIX) > 0 else ''
2024-12-26 00:06:37 +08:00
whiteList = [
2026-03-29 14:30:24 +08:00
# f'/static/',
f'{appPrefix}/api/validateAuthentication',
f'{appPrefix}/api/authenticate',
# f'{appPrefix}/api/getDashboardConfiguration',
f'{appPrefix}/api/getDashboardTheme',
f'{appPrefix}/api/getDashboardVersion',
f'{appPrefix}/api/sharePeer/get',
f'{appPrefix}/api/isTotpEnabled',
f'{appPrefix}/api/locale',
2024-12-26 00:06:37 +08:00
]
2026-03-29 14:30:24 +08:00
2026-03-29 14:30:24 +08:00
if (
("username" not in session or session.get("role") != "admin")
and (f"{appPrefix}/" != request.path and f"{appPrefix}" != request.path)
and not request.path.startswith(f'{appPrefix}/client')
2026-03-31 21:32:10 +02:00
and not request.path.startswith(f'{appPrefix}/img')
and not request.path.startswith(f'{appPrefix}/json')
2026-03-31 21:32:10 +02:00
and not request.path.startswith(f'{appPrefix}/assets')
2026-03-29 14:30:24 +08:00
and request.path not in whiteList
2024-07-31 02:27:44 -04:00
):
response = Flask.make_response(app, {
"status": False,
"message": "Unauthorized access.",
"data": None
})
response.content_type = "application/json"
response.status_code = 401
return response
2021-05-04 21:26:40 -04:00
2024-08-14 01:17:47 -04:00
@app.route(f'{APP_PREFIX}/api/handshake', methods=["GET", "OPTIONS"])
2024-11-24 00:22:33 +08:00
def API_Handshake():
2024-08-10 19:03:21 -04:00
return ResponseObject(True)
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/validateAuthentication')
2024-06-18 03:16:42 +08:00
def API_ValidateAuthentication():
2024-11-24 00:22:33 +08:00
token = request.cookies.get("authToken")
if DashboardConfig.GetConfig("Server", "auth_req")[1]:
if token is None or token == "" or "username" not in session or session["username"] != token:
return ResponseObject(False, "Invalid authentication.")
2024-06-18 03:16:42 +08:00
return ResponseObject(True)
2021-08-14 17:13:16 -04:00
2024-11-24 00:22:33 +08:00
@app.get(f'{APP_PREFIX}/api/requireAuthentication')
def API_RequireAuthentication():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "auth_req")[1])
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/authenticate')
2024-06-18 03:16:42 +08:00
def API_AuthenticateLogin():
data = request.get_json()
2024-11-25 02:48:55 +08:00
if not DashboardConfig.GetConfig("Server", "auth_req")[1]:
return ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
2024-08-11 01:48:13 -04:00
if DashboardConfig.APIAccessed:
authToken = hashlib.sha256(f"{request.headers.get('wg-dashboard-apikey')}{datetime.now()}".encode()).hexdigest()
2025-05-27 16:46:44 +08:00
session['role'] = 'admin'
2024-08-11 01:48:13 -04:00
session['username'] = authToken
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
2024-08-15 16:55:34 -04:00
resp.set_cookie("authToken", authToken)
2024-08-11 01:48:13 -04:00
session.permanent = True
return resp
2024-06-18 03:16:42 +08:00
valid = bcrypt.checkpw(data['password'].encode("utf-8"),
DashboardConfig.GetConfig("Account", "password")[1].encode("utf-8"))
totpEnabled = DashboardConfig.GetConfig("Account", "enable_totp")[1]
totpValid = False
if totpEnabled:
totpValid = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == data['totp']
if (valid
and data['username'] == DashboardConfig.GetConfig("Account", "username")[1]
and ((totpEnabled and totpValid) or not totpEnabled)
):
authToken = hashlib.sha256(f"{data['username']}{datetime.now()}".encode()).hexdigest()
2025-05-27 16:46:44 +08:00
session['role'] = 'admin'
2024-06-18 03:16:42 +08:00
session['username'] = authToken
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
resp.set_cookie("authToken", authToken)
session.permanent = True
2024-08-03 17:03:39 -04:00
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login success: {data['username']}")
2024-06-18 03:16:42 +08:00
return resp
2024-08-03 17:03:39 -04:00
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login failed: {data['username']}")
2024-06-18 03:16:42 +08:00
if totpEnabled:
return ResponseObject(False, "Sorry, your username, password or OTP is incorrect.")
else:
2024-06-18 03:16:42 +08:00
return ResponseObject(False, "Sorry, your username or password is incorrect.")
2021-05-13 18:00:40 -04:00
2024-08-15 16:55:34 -04:00
@app.get(f'{APP_PREFIX}/api/signout')
2024-06-18 03:16:42 +08:00
def API_SignOut():
resp = ResponseObject(True, "")
resp.delete_cookie("authToken")
session.clear()
2024-06-18 03:16:42 +08:00
return resp
2025-09-08 15:12:16 +08:00
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurations')
2024-06-18 03:16:42 +08:00
def API_getWireguardConfigurations():
2024-11-25 22:11:51 +08:00
InitWireguardConfigurationsList()
2024-06-18 03:16:42 +08:00
return ResponseObject(data=[wc for wc in WireguardConfigurations.values()])
2022-01-02 14:44:27 -05:00
2025-09-08 15:12:16 +08:00
@app.get(f'{APP_PREFIX}/api/newConfigurationTemplates')
def API_NewConfigurationTemplates():
return ResponseObject(data=NewConfigurationTemplates.GetTemplates())
@app.get(f'{APP_PREFIX}/api/newConfigurationTemplates/createTemplate')
def API_NewConfigurationTemplates_CreateTemplate():
return ResponseObject(data=NewConfigurationTemplates.CreateTemplate().model_dump())
@app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/updateTemplate')
def API_NewConfigurationTemplates_UpdateTemplate():
data = request.get_json()
template = data.get('Template', None)
if not template:
return ResponseObject(False, "Please provide template")
status, msg = NewConfigurationTemplates.UpdateTemplate(template)
return ResponseObject(status, msg)
@app.post(f'{APP_PREFIX}/api/newConfigurationTemplates/deleteTemplate')
def API_NewConfigurationTemplates_DeleteTemplate():
data = request.get_json()
template = data.get('Template', None)
if not template:
return ResponseObject(False, "Please provide template")
status, msg = NewConfigurationTemplates.DeleteTemplate(template)
return ResponseObject(status, msg)
@app.post(f'{APP_PREFIX}/api/addWireguardConfiguration')
2024-06-18 03:16:42 +08:00
def API_addWireguardConfiguration():
data = request.get_json()
2024-06-18 03:16:42 +08:00
requiredKeys = [
2024-12-04 17:50:16 +08:00
"ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol"
2024-06-18 03:16:42 +08:00
]
for i in requiredKeys:
if i not in data.keys():
2024-06-18 03:16:42 +08:00
return ResponseObject(False, "Please provide all required parameters.")
2024-12-04 17:50:16 +08:00
if data.get("Protocol") not in ProtocolsEnabled():
return ResponseObject(False, "Please provide a valid protocol: wg / awg.")
2024-06-18 03:16:42 +08:00
# Check duplicate names, ports, address
for i in WireguardConfigurations.values():
if i.Name == data['ConfigurationName']:
return ResponseObject(False,
f"Already have a configuration with the name \"{data['ConfigurationName']}\"",
"ConfigurationName")
if str(i.ListenPort) == str(data["ListenPort"]):
return ResponseObject(False,
f"Already have a configuration with the port \"{data['ListenPort']}\"",
"ListenPort")
if i.Address == data["Address"]:
return ResponseObject(False,
f"Already have a configuration with the address \"{data['Address']}\"",
"Address")
if "Backup" in data.keys():
2024-12-04 17:50:16 +08:00
path = {
"wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
"awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
}
if (os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"])) and
os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
protocol = "wg"
elif (os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"])) and
os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
protocol = "awg"
else:
return ResponseObject(False, "Backup does not exist")
shutil.copy(
2024-12-04 17:50:16 +08:00
os.path.join(path[protocol], 'WGDashboard_Backup', data["Backup"]),
os.path.join(path[protocol], f'{data["ConfigurationName"]}.conf')
)
WireguardConfigurations[data['ConfigurationName']] = (
WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, data=data, name=data['ConfigurationName'])) if protocol == 'wg' else (
2026-02-07 03:03:35 +01:00
AmneziaConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data, name=data['ConfigurationName']))
else:
WireguardConfigurations[data['ConfigurationName']] = (
2025-08-28 16:11:01 +08:00
WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data)) if data.get('Protocol') == 'wg' else (
2026-02-07 03:03:35 +01:00
AmneziaConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data=data))
2024-06-18 03:16:42 +08:00
return ResponseObject()
2025-04-23 19:24:50 +08:00
@app.get(f'{APP_PREFIX}/api/toggleWireguardConfiguration')
2024-06-18 03:16:42 +08:00
def API_toggleWireguardConfiguration():
configurationName = request.args.get('configurationName')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
2024-06-18 03:16:42 +08:00
toggleStatus, msg = WireguardConfigurations[configurationName].toggleConfiguration()
return ResponseObject(toggleStatus, msg, WireguardConfigurations[configurationName].Status)
2024-10-04 16:58:47 +08:00
@app.post(f'{APP_PREFIX}/api/updateWireguardConfiguration')
def API_updateWireguardConfiguration():
data = request.get_json()
requiredKeys = ["Name"]
for i in requiredKeys:
if i not in data.keys():
return ResponseObject(False, "Please provide these following field: " + ", ".join(requiredKeys))
name = data.get("Name")
if name not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2024-10-04 16:58:47 +08:00
status, msg = WireguardConfigurations[name].updateConfigurationSettings(data)
return ResponseObject(status, message=msg, data=WireguardConfigurations[name])
2024-06-18 03:16:42 +08:00
2025-08-15 21:45:09 +08:00
@app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationInfo')
def API_updateWireguardConfigurationInfo():
data = request.get_json()
name = data.get('Name')
key = data.get('Key')
value = data.get('Value')
if not all([data, key, name]):
return ResponseObject(status=False, message="Please provide configuration name, key and value")
if name not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
2025-08-16 15:40:57 +08:00
status, msg, key = WireguardConfigurations[name].updateConfigurationInfo(key, value)
2025-08-15 21:45:09 +08:00
2025-08-16 15:40:57 +08:00
return ResponseObject(status=status, message=msg, data=key)
2025-08-15 21:45:09 +08:00
2024-12-06 20:27:04 +08:00
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRawFile')
def API_GetWireguardConfigurationRawFile():
configurationName = request.args.get('configurationName')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
2024-12-06 20:27:04 +08:00
return ResponseObject(data={
"path": WireguardConfigurations[configurationName].configPath,
"content": WireguardConfigurations[configurationName].getRawConfigurationFile()
})
@app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationRawFile')
def API_UpdateWireguardConfigurationRawFile():
data = request.get_json()
configurationName = data.get('configurationName')
rawConfiguration = data.get('rawConfiguration')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide a valid configuration name")
if rawConfiguration is None or len(rawConfiguration) == 0:
return ResponseObject(False, "Please provide content")
status, err = WireguardConfigurations[configurationName].updateRawConfigurationFile(rawConfiguration)
return ResponseObject(status=status, message=err)
2024-10-25 00:19:27 +08:00
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfiguration')
def API_deleteWireguardConfiguration():
data = request.get_json()
2025-03-28 00:13:38 +08:00
if "ConfigurationName" not in data.keys() or data.get("ConfigurationName") is None or data.get("ConfigurationName") not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Please provide the configuration name you want to delete", status_code=404)
rp = WireguardConfigurations.pop(data.get("ConfigurationName"))
status = rp.deleteConfiguration()
if not status:
WireguardConfigurations[data.get("ConfigurationName")] = rp
2024-10-25 00:19:27 +08:00
return ResponseObject(status)
2024-11-06 18:36:55 +08:00
@app.post(f'{APP_PREFIX}/api/renameWireguardConfiguration')
def API_renameWireguardConfiguration():
data = request.get_json()
2025-03-28 00:13:38 +08:00
keys = ["ConfigurationName", "NewConfigurationName"]
2024-11-06 18:36:55 +08:00
for k in keys:
if (k not in data.keys() or data.get(k) is None or len(data.get(k)) == 0 or
2025-03-28 00:13:38 +08:00
(k == "ConfigurationName" and data.get(k) not in WireguardConfigurations.keys())):
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Please provide the configuration name you want to rename", status_code=404)
if data.get("NewConfigurationName") in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration name already exist", status_code=400)
rc = WireguardConfigurations.pop(data.get("ConfigurationName"))
status, message = rc.renameConfiguration(data.get("NewConfigurationName"))
2024-11-06 18:36:55 +08:00
if status:
2026-02-07 03:03:35 +01:00
WireguardConfigurations[data.get("NewConfigurationName")] = (WireguardConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data.get("NewConfigurationName")) if rc.Protocol == 'wg' else AmneziaConfiguration(DashboardConfig, AllPeerJobs, AllPeerShareLinks, DashboardWebHooks, data.get("NewConfigurationName")))
else:
WireguardConfigurations[data.get("ConfigurationName")] = rc
2024-11-06 18:36:55 +08:00
return ResponseObject(status, message)
2024-12-30 20:30:09 +08:00
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRealtimeTraffic')
def API_getWireguardConfigurationRealtimeTraffic():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2024-12-30 20:30:09 +08:00
return ResponseObject(data=WireguardConfigurations[configurationName].getRealtimeTrafficUsage())
2024-10-15 00:30:20 +08:00
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationBackup')
def API_getWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2024-10-15 00:30:20 +08:00
return ResponseObject(data=WireguardConfigurations[configurationName].getBackups())
2024-10-25 00:19:27 +08:00
@app.get(f'{APP_PREFIX}/api/getAllWireguardConfigurationBackup')
def API_getAllWireguardConfigurationBackup():
data = {
"ExistingConfigurations": {},
"NonExistingConfigurations": {}
}
existingConfiguration = WireguardConfigurations.keys()
for i in existingConfiguration:
b = WireguardConfigurations[i].getBackups(True)
if len(b) > 0:
data['ExistingConfigurations'][i] = WireguardConfigurations[i].getBackups(True)
2024-12-04 17:50:16 +08:00
for protocol in ProtocolsEnabled():
directory = os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup')
2025-09-11 10:38:30 +08:00
if os.path.exists(directory):
files = [(file, os.path.getctime(os.path.join(directory, file)))
for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
files.sort(key=lambda x: x[1], reverse=True)
for f, ct in files:
if RegexMatch(r"^(.+)_(\d+)\.(conf)$", f):
s = re.search(r"^(.+)_(\d+)\.(conf)$", f)
2025-09-11 10:38:30 +08:00
name = s.group(1)
if name not in existingConfiguration:
if name not in data['NonExistingConfigurations'].keys():
data['NonExistingConfigurations'][name] = []
date = s.group(2)
d = {
"protocol": protocol,
"filename": f,
"backupDate": date,
"content": open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f), 'r').read()
}
if f.replace(".conf", ".sql") in list(os.listdir(directory)):
d['database'] = True
d['databaseContent'] = open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read()
data['NonExistingConfigurations'][name].append(d)
2024-10-25 00:19:27 +08:00
return ResponseObject(data=data)
@app.get(f'{APP_PREFIX}/api/createWireguardConfigurationBackup')
def API_createWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2024-12-06 20:27:04 +08:00
return ResponseObject(status=WireguardConfigurations[configurationName].backupConfigurationFile()[0],
data=WireguardConfigurations[configurationName].getBackups())
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfigurationBackup')
def API_deleteWireguardConfigurationBackup():
data = request.get_json()
2025-04-07 18:45:12 +08:00
if ("ConfigurationName" not in data.keys() or
"BackupFileName" not in data.keys() or
len(data['ConfigurationName']) == 0 or
len(data['BackupFileName']) == 0):
return ResponseObject(False,
2025-04-19 02:54:47 +08:00
"Please provide configurationName and backupFileName in body", status_code=400)
2025-04-07 18:45:12 +08:00
configurationName = data['ConfigurationName']
backupFileName = data['BackupFileName']
if configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2025-04-19 02:54:47 +08:00
status = WireguardConfigurations[configurationName].deleteBackup(backupFileName)
return ResponseObject(status=status, message=(None if status else 'Backup file does not exist'),
status_code = (200 if status else 404))
2024-12-26 00:06:37 +08:00
@app.get(f'{APP_PREFIX}/api/downloadWireguardConfigurationBackup')
def API_downloadWireguardConfigurationBackup():
configurationName = os.path.basename(request.args.get('configurationName'))
backupFileName = os.path.basename(request.args.get('backupFileName'))
2024-12-26 00:06:37 +08:00
if configurationName is None or configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2024-12-26 00:06:37 +08:00
status, zip = WireguardConfigurations[configurationName].downloadBackup(backupFileName)
if not status:
current_app.logger.error(f"Failed to download a requested backup.\nConfiguration Name: {configurationName}\nBackup File Name: {backupFileName}")
return ResponseObject(False, "Internal server error", status_code=500)
return send_file(os.path.join('download', zip), as_attachment=True)
2024-12-26 00:06:37 +08:00
@app.post(f'{APP_PREFIX}/api/restoreWireguardConfigurationBackup')
def API_restoreWireguardConfigurationBackup():
data = request.get_json()
2025-04-07 18:45:12 +08:00
if ("ConfigurationName" not in data.keys() or
"BackupFileName" not in data.keys() or
len(data['ConfigurationName']) == 0 or
len(data['BackupFileName']) == 0):
return ResponseObject(False,
2025-04-19 02:54:47 +08:00
"Please provide ConfigurationName and BackupFileName in body", status_code=400)
2025-04-07 18:45:12 +08:00
configurationName = data['ConfigurationName']
backupFileName = data['BackupFileName']
if configurationName not in WireguardConfigurations.keys():
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
status = WireguardConfigurations[configurationName].restoreBackup(backupFileName)
return ResponseObject(status=status, message=(None if status else 'Restore backup failed'))
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/getDashboardConfiguration')
2024-06-18 03:16:42 +08:00
def API_getDashboardConfiguration():
return ResponseObject(data=DashboardConfig.toJson())
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/updateDashboardConfigurationItem')
2024-06-18 03:16:42 +08:00
def API_updateDashboardConfigurationItem():
2020-12-26 23:42:41 -05:00
data = request.get_json()
2024-06-18 03:16:42 +08:00
if "section" not in data.keys() or "key" not in data.keys() or "value" not in data.keys():
return ResponseObject(False, "Invalid request.")
valid, msg = DashboardConfig.SetConfig(
data["section"], data["key"], data['value'])
if not valid:
2025-08-17 18:58:28 +08:00
return ResponseObject(False, msg)
2024-09-06 16:31:54 +08:00
if data['section'] == "Server":
if data['key'] == 'wg_conf_path':
WireguardConfigurations.clear()
2024-09-06 16:31:54 +08:00
WireguardConfigurations.clear()
2025-08-17 18:58:28 +08:00
InitWireguardConfigurationsList()
2024-11-02 14:26:47 +06:00
return ResponseObject(True, data=DashboardConfig.GetConfig(data["section"], data["key"])[1])
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/getDashboardAPIKeys')
def API_getDashboardAPIKeys():
if DashboardConfig.GetConfig('Server', 'dashboard_api_key'):
return ResponseObject(data=DashboardConfig.DashboardAPIKeys)
2024-09-22 21:50:30 +08:00
return ResponseObject(False, "WGDashboard API Keys function is disabled")
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/newDashboardAPIKey')
def API_newDashboardAPIKey():
data = request.get_json()
if DashboardConfig.GetConfig('Server', 'dashboard_api_key'):
try:
2025-04-19 02:54:47 +08:00
if data['NeverExpire']:
expiredAt = None
else:
expiredAt = datetime.strptime(data['ExpiredAt'], '%Y-%m-%d %H:%M:%S')
DashboardConfig.createAPIKeys(expiredAt)
return ResponseObject(True, data=DashboardConfig.DashboardAPIKeys)
except Exception as e:
return ResponseObject(False, str(e))
return ResponseObject(False, "Dashboard API Keys function is disbaled")
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/deleteDashboardAPIKey')
def API_deleteDashboardAPIKey():
data = request.get_json()
if DashboardConfig.GetConfig('Server', 'dashboard_api_key'):
if len(data['Key']) > 0 and len(list(filter(lambda x : x.Key == data['Key'], DashboardConfig.DashboardAPIKeys))) > 0:
DashboardConfig.deleteAPIKey(data['Key'])
return ResponseObject(True, data=DashboardConfig.DashboardAPIKeys)
2025-04-19 02:54:47 +08:00
else:
return ResponseObject(False, "API Key does not exist", status_code=404)
return ResponseObject(False, "Dashboard API Keys function is disbaled")
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/updatePeerSettings/<configName>')
2024-06-18 03:16:42 +08:00
def API_updatePeerSettings(configName):
2021-04-02 20:48:00 -04:00
data = request.get_json()
id = data['id']
2024-06-18 03:16:42 +08:00
if len(id) > 0 and configName in WireguardConfigurations.keys():
name = data['name']
private_key = data['private_key']
dns_addresses = data['DNS']
allowed_ip = data['allowed_ip']
endpoint_allowed_ip = data['endpoint_allowed_ip']
preshared_key = data['preshared_key']
mtu = data['mtu']
keepalive = data['keepalive']
2026-02-07 03:03:35 +01:00
notes = data.get('notes', '')
2024-06-18 03:16:42 +08:00
wireguardConfig = WireguardConfigurations[configName]
foundPeer, peer = wireguardConfig.searchPeer(id)
if foundPeer:
2024-12-03 02:34:45 +08:00
if wireguardConfig.Protocol == 'wg':
2026-02-07 03:03:35 +01:00
status, msg = peer.updatePeer(name,
private_key,
preshared_key,
dns_addresses,
allowed_ip,
endpoint_allowed_ip,
mtu,
keepalive,
notes)
2025-08-28 16:11:01 +08:00
else:
2026-02-07 03:03:35 +01:00
status, msg = peer.updatePeer(name,
private_key,
preshared_key,
dns_addresses,
allowed_ip,
endpoint_allowed_ip,
mtu,
keepalive,
notes)
wireguardConfig.getPeers()
2025-08-28 16:11:01 +08:00
DashboardWebHooks.RunWebHook('peer_updated', {
"configuration": wireguardConfig.Name,
"peers": [id]
})
return ResponseObject(status, msg)
2024-12-03 02:34:45 +08:00
2024-06-18 03:16:42 +08:00
return ResponseObject(False, "Peer does not exist")
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/resetPeerData/<configName>')
2024-08-08 23:27:13 -04:00
def API_resetPeerData(configName):
data = request.get_json()
id = data['id']
type = data['type']
if len(id) == 0 or configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration/Peer does not exist")
wgc = WireguardConfigurations.get(configName)
foundPeer, peer = wgc.searchPeer(id)
if not foundPeer:
return ResponseObject(False, "Configuration/Peer does not exist")
2025-03-07 00:52:51 +08:00
resetStatus = peer.resetDataUsage(type)
if resetStatus:
wgc.restrictPeers([id])
wgc.allowAccessPeers([id])
return ResponseObject(status=resetStatus)
2024-06-18 03:16:42 +08:00
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/deletePeers/<configName>')
2024-06-18 03:16:42 +08:00
def API_deletePeers(configName: str) -> ResponseObject:
data = request.get_json()
peers = data['peers']
if configName in WireguardConfigurations.keys():
if len(peers) == 0:
return ResponseObject(False, "Please specify one or more peers", status_code=400)
2024-06-18 03:16:42 +08:00
configuration = WireguardConfigurations.get(configName)
status, msg = configuration.deletePeers(peers, AllPeerJobs, AllPeerShareLinks)
# Delete Assignment
for p in peers:
assignments = DashboardClients.DashboardClientsPeerAssignment.GetAssignedClients(configName, p)
for c in assignments:
DashboardClients.DashboardClientsPeerAssignment.UnassignClients(c.AssignmentID)
return ResponseObject(status, msg)
2021-04-02 20:48:00 -04:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2021-12-26 02:26:39 +03:00
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/restrictPeers/<configName>')
2024-06-18 03:16:42 +08:00
def API_restrictPeers(configName: str) -> ResponseObject:
2021-04-02 20:48:00 -04:00
data = request.get_json()
2024-06-18 03:16:42 +08:00
peers = data['peers']
if configName in WireguardConfigurations.keys():
if len(peers) == 0:
2024-09-22 21:50:30 +08:00
return ResponseObject(False, "Please specify one or more peers")
2024-06-18 03:16:42 +08:00
configuration = WireguardConfigurations.get(configName)
status, msg = configuration.restrictPeers(peers)
return ResponseObject(status, msg)
2025-04-19 02:54:47 +08:00
return ResponseObject(False, "Configuration does not exist", status_code=404)
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/sharePeer/create')
2024-08-06 10:17:14 -04:00
def API_sharePeer_create():
data: dict[str, str] = request.get_json()
Configuration = data.get('Configuration')
Peer = data.get('Peer')
ExpireDate = data.get('ExpireDate')
if Configuration is None or Peer is None:
2024-09-22 21:50:30 +08:00
return ResponseObject(False, "Please specify configuration and peers")
2024-08-06 10:17:14 -04:00
activeLink = AllPeerShareLinks.getLink(Configuration, Peer)
if len(activeLink) > 0:
return ResponseObject(True,
"This peer is already sharing. Please view data for shared link.",
data=activeLink[0]
)
status, message = AllPeerShareLinks.addLink(Configuration, Peer, datetime.strptime(ExpireDate, "%Y-%m-%d %H:%M:%S"))
2024-08-06 10:17:14 -04:00
if not status:
return ResponseObject(status, message)
return ResponseObject(data=AllPeerShareLinks.getLinkByID(message))
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/sharePeer/update')
2024-08-06 10:17:14 -04:00
def API_sharePeer_update():
data: dict[str, str] = request.get_json()
ShareID: str = data.get("ShareID")
ExpireDate: str = data.get("ExpireDate")
if not all([ShareID, ExpireDate]):
2024-08-06 10:17:14 -04:00
return ResponseObject(False, "Please specify ShareID")
if len(AllPeerShareLinks.getLinkByID(ShareID)) == 0:
return ResponseObject(False, "ShareID does not exist")
status, message = AllPeerShareLinks.updateLinkExpireDate(ShareID, datetime.strptime(ExpireDate, "%Y-%m-%d %H:%M:%S"))
2024-08-06 10:17:14 -04:00
if not status:
return ResponseObject(status, message)
return ResponseObject(data=AllPeerShareLinks.getLinkByID(ShareID))
2024-06-18 03:16:42 +08:00
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/sharePeer/get')
2024-08-07 00:37:05 -04:00
def API_sharePeer_get():
data = request.args
ShareID = data.get("ShareID")
if ShareID is None or len(ShareID) == 0:
return ResponseObject(False, "Please provide ShareID")
link = AllPeerShareLinks.getLinkByID(ShareID)
if len(link) == 0:
return ResponseObject(False, "This link is either expired to invalid")
l = link[0]
if l.Configuration not in WireguardConfigurations.keys():
return ResponseObject(False, "The peer you're looking for does not exist")
c = WireguardConfigurations.get(l.Configuration)
fp, p = c.searchPeer(l.Peer)
if not fp:
return ResponseObject(False, "The peer you're looking for does not exist")
return ResponseObject(data=p.downloadPeer())
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/allowAccessPeers/<configName>')
2024-06-18 03:16:42 +08:00
def API_allowAccessPeers(configName: str) -> ResponseObject:
data = request.get_json()
2024-06-18 03:16:42 +08:00
peers = data['peers']
if configName in WireguardConfigurations.keys():
if len(peers) == 0:
2024-09-22 21:50:30 +08:00
return ResponseObject(False, "Please specify one or more peers")
2024-06-18 03:16:42 +08:00
configuration = WireguardConfigurations.get(configName)
status, msg = configuration.allowAccessPeers(peers)
return ResponseObject(status, msg)
2024-06-18 03:16:42 +08:00
return ResponseObject(False, "Configuration does not exist")
2021-08-14 17:13:16 -04:00
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/addPeers/<configName>')
2024-06-18 03:16:42 +08:00
def API_addPeers(configName):
if configName in WireguardConfigurations.keys():
2025-07-02 18:45:43 +08:00
data: dict = request.get_json()
try:
2025-07-02 18:45:43 +08:00
bulkAdd: bool = data.get("bulkAdd", False)
bulkAddAmount: int = data.get('bulkAddAmount', 0)
preshared_key_bulkAdd: bool = data.get('preshared_key_bulkAdd', False)
2025-02-08 16:40:20 +08:00
public_key: str = data.get('public_key', "")
allowed_ips: list[str] = data.get('allowed_ips', [])
2025-02-16 17:42:32 +08:00
allowed_ips_validation: bool = data.get('allowed_ips_validation', True)
2024-08-08 23:27:13 -04:00
endpoint_allowed_ip: str = data.get('endpoint_allowed_ip', DashboardConfig.GetConfig("Peers", "peer_endpoint_allowed_ip")[1])
dns_addresses: str = data.get('DNS', DashboardConfig.GetConfig("Peers", "peer_global_DNS")[1])
mtu: int = data.get('mtu', None)
keep_alive: int = data.get('keepalive', None)
2026-02-07 03:03:35 +01:00
notes: str = data.get('notes', '')
2025-02-08 16:40:20 +08:00
preshared_key: str = data.get('preshared_key', "")
if type(mtu) is not int or mtu < 0 or mtu > 1460:
2025-09-17 11:50:36 +08:00
default: str = DashboardConfig.GetConfig("Peers", "peer_mtu")[1]
if default.isnumeric():
2025-09-17 11:50:36 +08:00
try:
mtu = int(default)
except Exception as e:
mtu = 0
else:
mtu = 0
if type(keep_alive) is not int or keep_alive < 0:
default = DashboardConfig.GetConfig("Peers", "peer_keep_alive")[1]
if default.isnumeric():
2025-09-17 11:50:36 +08:00
try:
keep_alive = int(default)
except Exception as e:
keep_alive = 0
else:
keep_alive = 0
config = WireguardConfigurations.get(configName)
if not config.getStatus():
config.toggleConfiguration()
2025-02-16 23:07:16 +08:00
ipStatus, availableIps = config.getAvailableIP(-1)
ipCountStatus, numberOfAvailableIPs = config.getNumberOfAvailableIP()
2025-02-16 17:42:32 +08:00
defaultIPSubnet = list(availableIps.keys())[0]
if bulkAdd:
if type(preshared_key_bulkAdd) is not bool:
preshared_key_bulkAdd = False
if type(bulkAddAmount) is not int or bulkAddAmount < 1:
return ResponseObject(False, "Please specify amount of peers you want to add")
2025-02-16 17:42:32 +08:00
if not ipStatus:
return ResponseObject(False, "No more available IP can assign")
2025-02-16 17:42:32 +08:00
if len(availableIps.keys()) == 0:
return ResponseObject(False, "This configuration does not have any IP address available")
2025-02-16 23:07:16 +08:00
if bulkAddAmount > sum(list(numberOfAvailableIPs.values())):
return ResponseObject(False,
2025-02-16 23:07:16 +08:00
f"The maximum number of peers can add is {sum(list(numberOfAvailableIPs.values()))}")
keyPairs = []
2025-02-16 23:07:16 +08:00
addedCount = 0
for subnet in availableIps.keys():
for ip in availableIps[subnet]:
newPrivateKey = GenerateWireguardPrivateKey()[1]
addedCount += 1
keyPairs.append({
"private_key": newPrivateKey,
"id": GenerateWireguardPublicKey(newPrivateKey)[1],
"preshared_key": (GenerateWireguardPrivateKey()[1] if preshared_key_bulkAdd else ""),
"allowed_ip": ip,
"name": f"BulkPeer_{(addedCount + 1)}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"DNS": dns_addresses,
"endpoint_allowed_ip": endpoint_allowed_ip,
"mtu": mtu,
"keepalive": keep_alive,
2026-02-07 03:03:35 +01:00
"notes": ""
2025-02-16 23:07:16 +08:00
})
if addedCount == bulkAddAmount:
break
if addedCount == bulkAddAmount:
break
if len(keyPairs) == 0 or (bulkAdd and len(keyPairs) != bulkAddAmount):
return ResponseObject(False, "Generating key pairs by bulk failed")
2025-09-15 21:48:00 +08:00
status, addedPeers, message = config.addPeers(keyPairs)
return ResponseObject(status=status, message=message, data=addedPeers)
else:
if config.searchPeer(public_key)[0] is True:
return ResponseObject(False, f"This peer already exist")
name = data.get("name", "")
private_key = data.get("private_key", "")
2025-04-19 02:54:47 +08:00
if len(public_key) == 0:
if len(private_key) == 0:
private_key = GenerateWireguardPrivateKey()[1]
public_key = GenerateWireguardPublicKey(private_key)[1]
else:
public_key = GenerateWireguardPublicKey(private_key)[1]
else:
if len(private_key) > 0:
genPub = GenerateWireguardPublicKey(private_key)[1]
# Check if provided pubkey match provided private key
if public_key != genPub:
return ResponseObject(False, "Provided Public Key does not match provided Private Key")
if len(allowed_ips) == 0:
2025-02-16 17:42:32 +08:00
if ipStatus:
2025-02-16 23:07:16 +08:00
for subnet in availableIps.keys():
for ip in availableIps[subnet]:
allowed_ips = [ip]
break
break
else:
return ResponseObject(False, "No more available IP can assign")
2025-02-08 16:40:20 +08:00
2025-02-16 17:42:32 +08:00
if allowed_ips_validation:
2025-02-08 16:40:20 +08:00
for i in allowed_ips:
2025-02-16 17:42:32 +08:00
found = False
2025-02-16 23:07:16 +08:00
for subnet in availableIps.keys():
try:
network = ipaddress.ip_network(subnet, False)
ap = ipaddress.ip_network(i)
except ValueError as e:
return ResponseObject(False, str(e))
2025-02-16 23:07:16 +08:00
if network.version == ap.version and ap.subnet_of(network):
2025-02-16 17:42:32 +08:00
found = True
2025-02-16 23:07:16 +08:00
2025-02-16 17:42:32 +08:00
if not found:
2025-02-08 16:40:20 +08:00
return ResponseObject(False, f"This IP is not available: {i}")
2025-09-15 21:48:00 +08:00
status, addedPeers, message = config.addPeers([
2024-11-06 20:58:53 +08:00
{
"name": name,
"id": public_key,
"private_key": private_key,
"allowed_ip": ','.join(allowed_ips),
"preshared_key": preshared_key,
"endpoint_allowed_ip": endpoint_allowed_ip,
"DNS": dns_addresses,
"mtu": mtu,
2024-12-03 02:34:45 +08:00
"keepalive": keep_alive,
2026-02-07 03:03:35 +01:00
"notes": notes
2024-11-06 20:58:53 +08:00
}]
)
2025-09-15 21:48:00 +08:00
return ResponseObject(status=status, message=message, data=addedPeers)
except Exception as e:
2025-09-15 21:48:00 +08:00
app.logger.error("Add peers failed", e)
return ResponseObject(False, f"Add peers failed.")
2024-06-18 03:16:42 +08:00
return ResponseObject(False, "Configuration does not exist")
2024-09-24 22:54:18 +08:00
@app.get(f"{APP_PREFIX}/api/downloadPeer/<configName>")
2024-06-18 03:16:42 +08:00
def API_downloadPeer(configName):
data = request.args
if configName not in WireguardConfigurations.keys():
2024-09-22 21:50:30 +08:00
return ResponseObject(False, "Configuration does not exist")
2024-06-18 03:16:42 +08:00
configuration = WireguardConfigurations[configName]
peerFound, peer = configuration.searchPeer(data['id'])
if len(data['id']) == 0 or not peerFound:
2024-09-22 21:50:30 +08:00
return ResponseObject(False, "Peer does not exist")
2024-06-18 03:16:42 +08:00
return ResponseObject(data=peer.downloadPeer())
2024-09-24 22:54:18 +08:00
@app.get(f"{APP_PREFIX}/api/downloadAllPeers/<configName>")
2024-06-18 03:16:42 +08:00
def API_downloadAllPeers(configName):
if configName not in WireguardConfigurations.keys():
2024-09-22 21:50:30 +08:00
return ResponseObject(False, "Configuration does not exist")
2024-06-18 03:16:42 +08:00
configuration = WireguardConfigurations[configName]
peerData = []
untitledPeer = 0
for i in configuration.Peers:
file = i.downloadPeer()
2025-01-16 18:36:06 +08:00
if file["fileName"] == "UntitledPeer":
2024-06-18 03:16:42 +08:00
file["fileName"] = str(untitledPeer) + "_" + file["fileName"]
untitledPeer += 1
peerData.append(file)
return ResponseObject(data=peerData)
2024-09-24 22:54:18 +08:00
@app.get(f"{APP_PREFIX}/api/getAvailableIPs/<configName>")
2024-06-18 03:16:42 +08:00
def API_getAvailableIPs(configName):
2024-11-25 22:11:51 +08:00
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
status, ips = WireguardConfigurations.get(configName).getAvailableIP()
2024-06-18 03:16:42 +08:00
return ResponseObject(status=status, data=ips)
2025-02-16 17:42:32 +08:00
@app.get(f"{APP_PREFIX}/api/getNumberOfAvailableIPs/<configName>")
def API_getNumberOfAvailableIPs(configName):
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
status, ips = WireguardConfigurations.get(configName).getNumberOfAvailableIP()
return ResponseObject(status=status, data=ips)
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationInfo')
2024-06-18 03:16:42 +08:00
def API_getConfigurationInfo():
configurationName = request.args.get("configurationName")
if not configurationName or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide configuration name")
return ResponseObject(data={
"configurationInfo": WireguardConfigurations[configurationName],
"configurationPeers": WireguardConfigurations[configurationName].getPeersList(),
"configurationRestrictedPeers": WireguardConfigurations[configurationName].getRestrictedPeersList()
})
2025-09-07 17:04:22 +08:00
@app.get(f'{APP_PREFIX}/api/getPeerHistoricalEndpoints')
def API_GetPeerHistoricalEndpoints():
configurationName = request.args.get("configurationName")
id = request.args.get('id')
if not configurationName or not id:
return ResponseObject(False, "Please provide configurationName and id")
fp, p = WireguardConfigurations.get(configurationName).searchPeer(id)
if fp:
result = p.getEndpoints()
geo = {}
try:
r = requests.post(f"http://ip-api.com/batch?fields=city,country,lat,lon,query",
data=json.dumps([x['endpoint'] for x in result]))
d = r.json()
except Exception as e:
return ResponseObject(data=result, message="Failed to request IP address geolocation. " + str(e))
return ResponseObject(data={
"endpoints": p.getEndpoints(),
"geolocation": d
})
return ResponseObject(False, "Peer does not exist")
2025-08-29 16:55:33 +08:00
@app.get(f'{APP_PREFIX}/api/getPeerSessions')
def API_GetPeerSessions():
configurationName = request.args.get("configurationName")
id = request.args.get('id')
try:
startDate = request.args.get('startDate', None)
endDate = request.args.get('endDate', None)
if startDate is None:
endDate = None
else:
startDate = datetime.strptime(startDate, "%Y-%m-%d")
if endDate:
endDate = datetime.strptime(endDate, "%Y-%m-%d")
if startDate > endDate:
return ResponseObject(False, "startDate must be smaller than endDate")
except Exception as e:
return ResponseObject(False, "Dates are invalid")
if not configurationName or not id:
return ResponseObject(False, "Please provide configurationName and id")
fp, p = WireguardConfigurations.get(configurationName).searchPeer(id)
if fp:
return ResponseObject(data=p.getSessions(startDate, endDate))
return ResponseObject(False, "Peer does not exist")
2025-09-01 17:16:03 +08:00
@app.get(f'{APP_PREFIX}/api/getPeerTraffics')
def API_GetPeerTraffics():
configurationName = request.args.get("configurationName")
id = request.args.get('id')
try:
interval = request.args.get('interval', 30)
startDate = request.args.get('startDate', None)
endDate = request.args.get('endDate', None)
if type(interval) is str:
if not interval.isdigit():
return ResponseObject(False, "Interval must be integers in minutes")
interval = int(interval)
if startDate is None:
endDate = None
else:
startDate = datetime.strptime(startDate, "%Y-%m-%d")
if endDate:
endDate = datetime.strptime(endDate, "%Y-%m-%d")
if startDate > endDate:
return ResponseObject(False, "startDate must be smaller than endDate")
except Exception as e:
return ResponseObject(False, "Dates are invalid" + e)
if not configurationName or not id:
return ResponseObject(False, "Please provide configurationName and id")
fp, p = WireguardConfigurations.get(configurationName).searchPeer(id)
if fp:
return ResponseObject(data=p.getTraffics(interval, startDate, endDate))
return ResponseObject(False, "Peer does not exist")
2025-12-02 11:00:54 +08:00
@app.get(f'{APP_PREFIX}/api/getPeerTrackingTableCounts')
def API_GetPeerTrackingTableCounts():
configurationName = request.args.get("configurationName")
if configurationName and configurationName not in WireguardConfigurations.keys():
2025-12-02 11:00:54 +08:00
return ResponseObject(False, "Configuration does not exist")
if configurationName:
c = WireguardConfigurations.get(configurationName)
return ResponseObject(data={
"TrafficTrackingTableSize": c.getTransferTableSize(),
"HistoricalTrackingTableSize": c.getHistoricalEndpointTableSize()
})
d = {}
for i in WireguardConfigurations.keys():
c = WireguardConfigurations.get(i)
d[i] = {
"TrafficTrackingTableSize": c.getTransferTableSize(),
"HistoricalTrackingTableSize": c.getHistoricalEndpointTableSize()
}
return ResponseObject(data=d)
2025-12-02 11:00:54 +08:00
@app.get(f'{APP_PREFIX}/api/downloadPeerTrackingTable')
def API_DownloadPeerTackingTable():
configurationName = request.args.get("configurationName")
table = request.args.get('table')
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
if table not in ['TrafficTrackingTable', 'HistoricalTrackingTable']:
return ResponseObject(False, "Table does not exist")
c = WireguardConfigurations.get(configurationName)
return ResponseObject(
data=c.downloadTransferTable() if table == 'TrafficTrackingTable'
else c.downloadHistoricalEndpointTable())
@app.post(f'{APP_PREFIX}/api/deletePeerTrackingTable')
def API_DeletePeerTrackingTable():
data = request.get_json()
configurationName = data.get('configurationName')
table = data.get('table')
if not configurationName or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
if not table or table not in ['TrafficTrackingTable', 'HistoricalTrackingTable']:
return ResponseObject(False, "Table does not exist")
c = WireguardConfigurations.get(configurationName)
return ResponseObject(
status=c.deleteTransferTable() if table == 'TrafficTrackingTable'
else c.deleteHistoryEndpointTable())
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/getDashboardTheme')
2024-06-18 03:16:42 +08:00
def API_getDashboardTheme():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "dashboard_theme")[1])
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/getDashboardVersion')
def API_getDashboardVersion():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "version")[1])
@app.post(f'{APP_PREFIX}/api/PeerScheduleJob')
@require_fields('Configuration', 'Peer', 'Field', 'Operator', 'Value', 'Action')
def API_savePeerScheduleJob():
data = request.json
configuration = WireguardConfigurations.get(data['Configuration'])
2025-04-19 02:54:47 +08:00
if configuration is None:
return ResponseObject(False, "Configuration does not exist", status_code=404)
peerKey = unquote(data['Peer'])
found, _ = configuration.searchPeer(peerKey)
if not found:
return ResponseObject(False, "Peer does not exist", status_code=404)
jobID = data.get('JobID', str(uuid4()))
if len(AllPeerJobs.searchJobById(jobID)) > 0:
return ResponseObject(False, "Job already exists", status_code=409)
success, result = AllPeerJobs.saveJob(PeerJob(
jobID, data['Configuration'], peerKey, data['Field'], data['Operator'], data['Value'],
datetime.now(), data.get('ExpireDate'), data['Action']))
if success:
return ResponseObject(success, data=result)
return ResponseObject(success, message=result)
@app.put(f'{APP_PREFIX}/api/PeerScheduleJob')
@require_fields('JobID', 'Configuration', 'Peer', 'Field', 'Operator', 'Value', 'Action')
def API_updatePeerScheduleJob():
data = request.json
configuration = WireguardConfigurations.get(data['Configuration'])
if configuration is None:
return ResponseObject(False, "Configuration does not exist", status_code=404)
peerKey = unquote(data['Peer'])
found, _ = configuration.searchPeer(peerKey)
if not found:
return ResponseObject(False, "Peer does not exist", status_code=404)
existing = AllPeerJobs.searchJobById(data['JobID'])
if len(existing) == 0:
return ResponseObject(False, "Job does not exist", status_code=404)
success, result = AllPeerJobs.saveJob(PeerJob(
data['JobID'], data['Configuration'], peerKey, data['Field'], data['Operator'], data['Value'],
datetime.now(), data.get('ExpireDate'), data['Action']))
if success:
return ResponseObject(success, data=result)
return ResponseObject(success, message=result)
@app.delete(f'{APP_PREFIX}/api/PeerScheduleJob')
@require_fields('JobID', 'Configuration', 'Peer')
def API_deletePeerScheduleJob():
data = request.json
configuration = WireguardConfigurations.get(data['Configuration'])
2025-04-19 02:54:47 +08:00
if configuration is None:
return ResponseObject(False, "Configuration does not exist", status_code=404)
peerKey = unquote(data['Peer'])
success, result = AllPeerJobs.deleteJob(PeerJob(
data['JobID'], data['Configuration'], peerKey, data.get('Field', ''),
data.get('Operator', ''), data.get('Value', ''),
datetime.now(), data.get('ExpireDate'), data.get('Action', '')))
if success:
return ResponseObject(success, message="Job deleted successfully")
return ResponseObject(success, message=result)
@app.get(f'{APP_PREFIX}/api/PeerScheduleJobLogs/<configName>')
2024-07-29 18:40:07 -04:00
def API_getPeerScheduleJobLogs(configName):
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
data = request.args.get("requestAll")
requestAll = False
if data is not None and data == "true":
requestAll = True
return ResponseObject(data=AllPeerJobs.getPeerJobLogs(configName))
2024-07-29 18:40:07 -04:00
'''
Tools
'''
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/ping/getAllPeersIpAddress')
2024-06-18 03:16:42 +08:00
def API_ping_getAllPeersIpAddress():
ips = {}
for c in WireguardConfigurations.values():
cips = {}
for p in c.Peers:
allowed_ip = p.allowed_ip.replace(" ", "").split(",")
parsed = []
for x in allowed_ip:
try:
ip = ipaddress.ip_network(x, strict=False)
except ValueError as e:
2025-07-02 18:45:43 +08:00
app.logger.error(f"Failed to parse IP address of {p.id} - {c.Name}")
host = list(ip.hosts())
if len(host) == 1:
parsed.append(str(host[0]))
2024-06-18 03:16:42 +08:00
endpoint = p.endpoint.replace(" ", "").replace("(none)", "")
if len(p.name) > 0:
cips[f"{p.name} - {p.id}"] = {
"allowed_ips": parsed,
"endpoint": endpoint
}
2021-08-14 17:13:16 -04:00
else:
2024-06-18 03:16:42 +08:00
cips[f"{p.id}"] = {
"allowed_ips": parsed,
"endpoint": endpoint
}
ips[c.Name] = cips
return ResponseObject(data=ips)
import requests
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/ping/execute')
2024-06-18 03:16:42 +08:00
def API_ping_execute():
if "ipAddress" in request.args.keys() and "count" in request.args.keys():
ip = request.args['ipAddress']
count = request.args['count']
try:
if ip is not None and len(ip) > 0 and count is not None and count.isnumeric():
result = ping(ip, count=int(count), source=None)
data = {
2024-06-18 03:16:42 +08:00
"address": result.address,
"is_alive": result.is_alive,
"min_rtt": result.min_rtt,
"avg_rtt": result.avg_rtt,
"max_rtt": result.max_rtt,
"package_sent": result.packets_sent,
"package_received": result.packets_received,
"package_loss": result.packet_loss,
"geo": None
}
try:
r = requests.get(f"http://ip-api.com/json/{result.address}?field=city")
data['geo'] = r.json()
except Exception as e:
pass
return ResponseObject(data=data)
2024-06-18 03:16:42 +08:00
return ResponseObject(False, "Please specify an IP Address (v4/v6)")
except Exception as exp:
return ResponseObject(False, exp)
return ResponseObject(False, "Please provide ipAddress and count")
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/traceroute/execute')
2024-06-18 03:16:42 +08:00
def API_traceroute_execute():
if "ipAddress" in request.args.keys() and len(request.args.get("ipAddress")) > 0:
ipAddress = request.args.get('ipAddress')
try:
tracerouteResult = traceroute(ipAddress, timeout=1, max_hops=64)
2024-06-18 03:16:42 +08:00
result = []
for hop in tracerouteResult:
if len(result) > 1:
skipped = False
for i in range(result[-1]["hop"] + 1, hop.distance):
result.append(
{
"hop": i,
"ip": "*",
"avg_rtt": "*",
"min_rtt": "*",
"max_rtt": "*"
}
)
skip = True
if skipped: continue
result.append(
{
"hop": hop.distance,
"ip": hop.address,
"avg_rtt": hop.avg_rtt,
"min_rtt": hop.min_rtt,
"max_rtt": hop.max_rtt
})
try:
r = requests.post(f"http://ip-api.com/batch?fields=city,country,lat,lon,query",
data=json.dumps([x['ip'] for x in result]))
d = r.json()
for i in range(len(result)):
result[i]['geo'] = d[i]
return ResponseObject(data=result)
except Exception as e:
app.logger.error(f"Failed to gather the geolocation data: {e}")
2025-02-12 20:07:37 +08:00
return ResponseObject(data=result, message="Failed to request IP address geolocation")
except Exception as e:
app.logger.error(f"Failed to execute the traceroute: {e}")
return ResponseObject(data=[], message="Failed to traceroute the given parameter")
2024-06-18 03:16:42 +08:00
else:
return ResponseObject(False, "Please provide ipAddress")
2021-09-03 17:32:51 -04:00
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/getDashboardUpdate')
2024-08-17 00:31:46 -04:00
def API_getDashboardUpdate():
2024-08-19 21:30:47 -04:00
import urllib.request as req
2024-08-17 00:31:46 -04:00
try:
2025-09-11 10:38:30 +08:00
r = req.urlopen("https://api.github.com/repos/WGDashboard/WGDashboard/releases/latest", timeout=5).read()
2024-08-17 00:31:46 -04:00
data = dict(json.loads(r))
tagName = data.get('tag_name')
htmlUrl = data.get('html_url')
if tagName is not None and htmlUrl is not None:
if version.parse(tagName) > version.parse(DashboardConfig.DashboardVersion):
return ResponseObject(message=f"{tagName} is now available for update!", data=htmlUrl)
2024-08-17 00:31:46 -04:00
else:
return ResponseObject(message="You're on the latest version")
return ResponseObject(False)
2025-02-12 20:07:37 +08:00
except Exception as e:
2024-08-19 21:30:47 -04:00
return ResponseObject(False, f"Request to GitHub API failed.")
2021-09-03 17:32:51 -04:00
2024-06-18 03:16:42 +08:00
'''
Sign Up
'''
2022-03-24 02:10:52 -04:00
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/isTotpEnabled')
2024-06-18 03:16:42 +08:00
def API_isTotpEnabled():
2024-08-05 15:39:11 -04:00
return (
ResponseObject(data=DashboardConfig.GetConfig("Account", "enable_totp")[1] and DashboardConfig.GetConfig("Account", "totp_verified")[1]))
2022-03-21 22:33:19 -04:00
2022-03-24 02:10:52 -04:00
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/api/Welcome_GetTotpLink')
2024-06-18 03:16:42 +08:00
def API_Welcome_GetTotpLink():
2024-08-05 15:39:11 -04:00
if not DashboardConfig.GetConfig("Account", "totp_verified")[1]:
DashboardConfig.SetConfig("Account", "totp_key", pyotp.random_base32(), True)
2024-06-18 03:16:42 +08:00
return ResponseObject(
data=pyotp.totp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).provisioning_uri(
issuer_name="WGDashboard"))
return ResponseObject(False)
2022-03-24 02:10:52 -04:00
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/Welcome_VerifyTotpLink')
2024-06-18 03:16:42 +08:00
def API_Welcome_VerifyTotpLink():
2023-11-28 16:37:16 -05:00
data = request.get_json()
2024-08-05 15:39:11 -04:00
totp = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now()
if totp == data['totp']:
DashboardConfig.SetConfig("Account", "totp_verified", "true")
DashboardConfig.SetConfig("Account", "enable_totp", "true")
return ResponseObject(totp == data['totp'])
2023-11-28 16:37:16 -05:00
2024-09-24 22:54:18 +08:00
@app.post(f'{APP_PREFIX}/api/Welcome_Finish')
2024-06-18 03:16:42 +08:00
def API_Welcome_Finish():
2022-04-23 00:34:11 -04:00
data = request.get_json()
2024-06-18 03:16:42 +08:00
if DashboardConfig.GetConfig("Other", "welcome_session")[1]:
if data["username"] == "":
return ResponseObject(False, "Username cannot be blank.")
2022-04-23 00:34:11 -04:00
2024-06-18 03:16:42 +08:00
if data["newPassword"] == "" or len(data["newPassword"]) < 8:
return ResponseObject(False, "Password must be at least 8 characters")
2022-04-21 15:11:01 -04:00
2024-06-18 03:16:42 +08:00
updateUsername, updateUsernameErr = DashboardConfig.SetConfig("Account", "username", data["username"])
updatePassword, updatePasswordErr = DashboardConfig.SetConfig("Account", "password",
{
"newPassword": data["newPassword"],
2024-08-02 17:27:28 -04:00
"repeatNewPassword": data["repeatNewPassword"],
2024-06-18 03:16:42 +08:00
"currentPassword": "admin"
})
2024-08-05 15:39:11 -04:00
if not updateUsername or not updatePassword:
return ResponseObject(False, f"{updateUsernameErr},{updatePasswordErr}".strip(","))
2021-12-26 02:26:39 +03:00
2024-06-18 03:16:42 +08:00
DashboardConfig.SetConfig("Other", "welcome_session", False)
return ResponseObject()
class Locale:
def __init__(self):
self.localePath = './static/locales/'
self.activeLanguages = {}
with open(os.path.join(f"{self.localePath}supported_locales.json"), "r") as f:
self.activeLanguages = sorted(json.loads(''.join(f.readlines())), key=lambda x : x['lang_name'])
def getLanguage(self) -> dict | None:
currentLanguage = DashboardConfig.GetConfig("Server", "dashboard_language")[1]
if currentLanguage == "en":
return None
if os.path.exists(os.path.join(f"{self.localePath}{currentLanguage}.json")):
with open(os.path.join(f"{self.localePath}{currentLanguage}.json"), "r") as f:
return dict(json.loads(''.join(f.readlines())))
else:
return None
def updateLanguage(self, lang_id):
if not os.path.exists(os.path.join(f"{self.localePath}{lang_id}.json")):
DashboardConfig.SetConfig("Server", "dashboard_language", "en-US")
else:
DashboardConfig.SetConfig("Server", "dashboard_language", lang_id)
Locale = Locale()
@app.get(f'{APP_PREFIX}/api/locale')
def API_Locale_CurrentLang():
return ResponseObject(data=Locale.getLanguage())
@app.get(f'{APP_PREFIX}/api/locale/available')
def API_Locale_Available():
return ResponseObject(data=Locale.activeLanguages)
@app.post(f'{APP_PREFIX}/api/locale/update')
def API_Locale_Update():
data = request.get_json()
if 'lang_id' not in data.keys():
return ResponseObject(False, "Please specify a lang_id")
Locale.updateLanguage(data['lang_id'])
return ResponseObject(data=Locale.getLanguage())
2024-09-09 23:43:55 +08:00
2025-01-08 18:09:05 +08:00
@app.get(f'{APP_PREFIX}/api/email/ready')
def API_Email_Ready():
return ResponseObject(EmailSender.is_ready())
2025-01-08 18:09:05 +08:00
@app.post(f'{APP_PREFIX}/api/email/send')
def API_Email_Send():
data = request.get_json()
2025-07-25 18:06:42 +08:00
if "Receiver" not in data.keys() or "Subject" not in data.keys():
return ResponseObject(False, "Please at least specify receiver and subject")
2025-01-13 16:47:15 +08:00
body = data.get('Body', '')
subject = data.get('Subject','')
2025-01-16 00:44:22 +08:00
download = None
2025-04-19 02:54:47 +08:00
if ("ConfigurationName" in data.keys()
and "Peer" in data.keys()):
2025-01-13 16:47:15 +08:00
if data.get('ConfigurationName') in WireguardConfigurations.keys():
configuration = WireguardConfigurations.get(data.get('ConfigurationName'))
attachmentName = ""
if configuration is not None:
fp, p = configuration.searchPeer(data.get('Peer'))
if fp:
template = Template(body)
download = p.downloadPeer()
body = template.render(peer=p.toJson(), configurationFile=download)
2025-09-07 22:12:22 +08:00
subject = Template(data.get('Subject', '')).render(peer=p.toJson(), configurationFile=download)
2025-01-13 16:47:15 +08:00
if data.get('IncludeAttachment', False):
u = str(uuid4())
attachmentName = f'{u}.conf'
2025-04-19 02:54:47 +08:00
with open(os.path.join('./attachments', attachmentName,), 'w+') as f:
f.write(download['file'])
2025-01-13 16:47:15 +08:00
2025-09-07 22:12:22 +08:00
s, m = EmailSender.send(data.get('Receiver'), subject, body,
2025-04-19 02:54:47 +08:00
data.get('IncludeAttachment', False), (attachmentName if download else ''))
2025-01-08 18:09:05 +08:00
return ResponseObject(s, m)
2025-09-07 22:12:22 +08:00
@app.post(f'{APP_PREFIX}/api/email/preview')
2025-01-16 00:44:22 +08:00
def API_Email_PreviewBody():
data = request.get_json()
2025-09-07 22:12:22 +08:00
subject = data.get('Subject', '')
2025-01-16 00:44:22 +08:00
body = data.get('Body', '')
2025-09-07 22:12:22 +08:00
2025-01-16 00:44:22 +08:00
if ("ConfigurationName" not in data.keys()
or "Peer" not in data.keys() or data.get('ConfigurationName') not in WireguardConfigurations.keys()):
return ResponseObject(False, "Please specify configuration and peer")
configuration = WireguardConfigurations.get(data.get('ConfigurationName'))
fp, p = configuration.searchPeer(data.get('Peer'))
if not fp:
return ResponseObject(False, "Peer does not exist")
try:
template = Template(body)
download = p.downloadPeer()
2025-09-07 22:12:22 +08:00
return ResponseObject(data={
"Body": Template(body).render(peer=p.toJson(), configurationFile=download),
"Subject": Template(subject).render(peer=p.toJson(), configurationFile=download)
})
2025-01-16 00:44:22 +08:00
except Exception as e:
return ResponseObject(False, message=str(e))
2024-11-27 18:26:13 +08:00
@app.get(f'{APP_PREFIX}/api/systemStatus')
def API_SystemStatus():
return ResponseObject(data=SystemStatus)
2024-11-27 18:26:13 +08:00
2024-12-04 17:50:16 +08:00
@app.get(f'{APP_PREFIX}/api/protocolsEnabled')
def API_ProtocolsEnabled():
return ResponseObject(data=ProtocolsEnabled())
2024-11-27 18:26:13 +08:00
2025-08-02 21:58:09 +08:00
'''
OIDC Controller
'''
@app.get(f'{APP_PREFIX}/api/oidc/toggle')
def API_OIDC_Toggle():
data = request.args
if not data.get('mode'):
return ResponseObject(False, "Please provide mode")
mode = data.get('mode')
if mode == 'Client':
DashboardConfig.SetConfig("OIDC", "client_enable",
not DashboardConfig.GetConfig("OIDC", "client_enable")[1])
elif mode == 'Admin':
DashboardConfig.SetConfig("OIDC", "admin_enable",
not DashboardConfig.GetConfig("OIDC", "admin_enable")[1])
else:
return ResponseObject(False, "Mode does not exist")
return ResponseObject()
@app.get(f'{APP_PREFIX}/api/oidc/status')
def API_OIDC_Status():
data = request.args
if not data.get('mode'):
return ResponseObject(False, "Please provide mode")
mode = data.get('mode')
if mode == 'Client':
return ResponseObject(data=DashboardConfig.GetConfig("OIDC", "client_enable")[1])
elif mode == 'Admin':
return ResponseObject(data=DashboardConfig.GetConfig("OIDC", "admin_enable")[1])
return ResponseObject(False, "Mode does not exist")
2025-07-10 23:39:21 +08:00
'''
Client Controller
'''
2025-09-05 16:39:59 +08:00
@app.get(f'{APP_PREFIX}/api/clients/toggleStatus')
def API_Clients_ToggleStatus():
DashboardConfig.SetConfig("Clients", "enable",
not DashboardConfig.GetConfig("Clients", "enable")[1])
return ResponseObject(data=DashboardConfig.GetConfig("Clients", "enable")[1])
2025-07-10 23:39:21 +08:00
@app.get(f'{APP_PREFIX}/api/clients/allClients')
def API_Clients_AllClients():
return ResponseObject(data=DashboardClients.GetAllClients())
2025-07-18 21:42:39 +08:00
@app.get(f'{APP_PREFIX}/api/clients/allClientsRaw')
def API_Clients_AllClientsRaw():
return ResponseObject(data=DashboardClients.GetAllClientsRaw())
2025-07-10 23:39:21 +08:00
@app.post(f'{APP_PREFIX}/api/clients/assignClient')
def API_Clients_AssignClient():
data = request.get_json()
configurationName = data.get('ConfigurationName')
id = data.get('Peer')
client = data.get('ClientID')
if not all([configurationName, id, client]):
return ResponseObject(False, "Please provide all required fields")
2025-08-11 17:29:15 +08:00
if not DashboardClients.GetClient(client):
return ResponseObject(False, "Client does not exist")
2025-07-10 23:39:21 +08:00
status, data = DashboardClients.AssignClient(configurationName, id, client)
if not status:
return ResponseObject(status, message="Client already assiged to this peer")
return ResponseObject(data=data)
@app.post(f'{APP_PREFIX}/api/clients/unassignClient')
def API_Clients_UnassignClient():
data = request.get_json()
assignmentID = data.get('AssignmentID')
if not assignmentID:
return ResponseObject(False, "Please provide AssignmentID")
return ResponseObject(status=DashboardClients.UnassignClient(assignmentID))
@app.get(f'{APP_PREFIX}/api/clients/assignedClients')
def API_Clients_AssignedClients():
data = request.args
configurationName = data.get('ConfigurationName')
peerID = data.get('Peer')
if not all([configurationName, id]):
return ResponseObject(False, "Please provide all required fields")
return ResponseObject(
data=DashboardClients.GetAssignedPeerClients(configurationName, peerID))
@app.get(f'{APP_PREFIX}/api/clients/allConfigurationsPeers')
def API_Clients_AllConfigurationsPeers():
c = {}
for (key, val) in WireguardConfigurations.items():
c[key] = list(map(lambda x : {
"id": x.id,
"name": x.name
}, val.Peers))
return ResponseObject(
data=c
)
2025-07-21 02:29:33 +08:00
@app.get(f'{APP_PREFIX}/api/clients/assignedPeers')
def API_Clients_AssignedPeers():
data = request.args
clientId = data.get("ClientID")
if not clientId:
return ResponseObject(False, "Please provide ClientID")
2025-08-11 17:29:15 +08:00
if not DashboardClients.GetClient(clientId):
return ResponseObject(False, "Client does not exist")
d = DashboardClients.GetClientAssignedPeersGrouped(clientId)
2025-07-21 02:29:33 +08:00
if d is None:
return ResponseObject(False, "Client does not exist")
return ResponseObject(data=d)
2025-07-10 23:39:21 +08:00
@app.post(f'{APP_PREFIX}/api/clients/generatePasswordResetLink')
def API_Clients_GeneratePasswordResetLink():
data = request.get_json()
clientId = data.get("ClientID")
if not clientId:
return ResponseObject(False, "Please provide ClientID")
2025-08-11 17:29:15 +08:00
if not DashboardClients.GetClient(clientId):
return ResponseObject(False, "Client does not exist")
token = DashboardClients.GenerateClientPasswordResetToken(clientId)
if token:
return ResponseObject(data=token)
return ResponseObject(False, "Failed to generate link")
2025-07-10 23:39:21 +08:00
2025-08-11 14:35:30 +08:00
@app.post(f'{APP_PREFIX}/api/clients/updateProfileName')
def API_Clients_UpdateProfile():
data = request.get_json()
clientId = data.get("ClientID")
if not clientId:
return ResponseObject(False, "Please provide ClientID")
2025-08-11 17:29:15 +08:00
if not DashboardClients.GetClient(clientId):
return ResponseObject(False, "Client does not exist")
2025-08-11 14:35:30 +08:00
value = data.get('Name')
return ResponseObject(status=DashboardClients.UpdateClientProfile(clientId, value))
2025-08-11 17:29:15 +08:00
@app.post(f'{APP_PREFIX}/api/clients/deleteClient')
def API_Clients_DeleteClient():
data = request.get_json()
clientId = data.get("ClientID")
if not clientId:
return ResponseObject(False, "Please provide ClientID")
if not DashboardClients.GetClient(clientId):
return ResponseObject(False, "Client does not exist")
2025-08-15 21:45:09 +08:00
return ResponseObject(status=DashboardClients.DeleteClient(clientId))
2025-08-22 18:26:31 +08:00
@app.get(f'{APP_PREFIX}/api/webHooks/getWebHooks')
def API_WebHooks_GetWebHooks():
return ResponseObject(data=DashboardWebHooks.GetWebHooks())
@app.get(f'{APP_PREFIX}/api/webHooks/createWebHook')
def API_WebHooks_createWebHook():
return ResponseObject(data=DashboardWebHooks.CreateWebHook().model_dump(
exclude={'CreationDate'}
))
@app.post(f'{APP_PREFIX}/api/webHooks/updateWebHook')
def API_WebHooks_UpdateWebHook():
data = request.get_json()
status, msg = DashboardWebHooks.UpdateWebHook(data)
return ResponseObject(status, msg)
2025-08-25 16:06:41 +08:00
@app.post(f'{APP_PREFIX}/api/webHooks/deleteWebHook')
def API_WebHooks_DeleteWebHook():
data = request.get_json()
status, msg = DashboardWebHooks.DeleteWebHook(data)
return ResponseObject(status, msg)
2025-08-26 00:41:37 +08:00
@app.get(f'{APP_PREFIX}/api/webHooks/getWebHookSessions')
def API_WebHooks_GetWebHookSessions():
webhookID = request.args.get('WebHookID')
if not webhookID:
return ResponseObject(False, "Please provide WebHookID")
webHook = DashboardWebHooks.SearchWebHookByID(webhookID)
if not webHook:
return ResponseObject(False, "Webhook does not exist")
return ResponseObject(data=DashboardWebHooks.GetWebHookSessions(webHook))
2025-08-25 16:06:41 +08:00
2025-07-10 23:39:21 +08:00
'''
Index Page
'''
2024-09-24 22:54:18 +08:00
@app.get(f'{APP_PREFIX}/')
2024-06-18 03:16:42 +08:00
def index():
return render_template('index.html', APP_PREFIX=APP_PREFIX)
2021-10-18 02:24:09 +03:00
if __name__ == "__main__":
2024-08-05 15:39:11 -04:00
startThreads()
2025-08-20 15:26:07 +08:00
DashboardPlugins.startThreads()
app.run(host=app_ip, debug=False, port=app_port)