WGDashboard/src/dashboard_new.py

568 lines
21 KiB
Python
Raw Normal View History

import sqlite3
import configparser
import hashlib
import ipaddress
import json
# Python Built-in Library
import os
import secrets
import subprocess
import time
import re
import urllib.parse
import urllib.request
import urllib.error
from dataclasses import dataclass
from datetime import datetime, timedelta
from json import JSONEncoder
from operator import itemgetter
from typing import Dict, Any
import bcrypt
import flask
# PIP installed library
import ifcfg
import pyotp
from flask import Flask, request, render_template, redirect, url_for, session, jsonify, g
from flask.json.provider import JSONProvider
from flask_qrcode import QRcode
from icmplib import ping, traceroute
# Import other python files
import threading
from sqlalchemy.orm import mapped_column, declarative_base, Session
from sqlalchemy import FLOAT, INT, VARCHAR, select, MetaData, DATETIME
from sqlalchemy import create_engine, inspect
DASHBOARD_VERSION = 'v3.1'
CONFIGURATION_PATH = os.getenv('CONFIGURATION_PATH', '.')
DB_PATH = os.path.join(CONFIGURATION_PATH, 'db')
if not os.path.isdir(DB_PATH):
os.mkdir(DB_PATH)
DASHBOARD_CONF = os.path.join(CONFIGURATION_PATH, 'wg-dashboard.ini')
# WireGuard's configuration path
WG_CONF_PATH = None
# Dashboard Config Name
# Upgrade Required
UPDATE = None
# Flask App Configuration
app = Flask("WGDashboard")
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 5206928
app.secret_key = secrets.token_urlsafe(32)
# Enable QR Code Generator
QRcode(app)
'''
Classes
'''
Base = declarative_base()
class CustomJsonEncoder(JSONProvider):
def dumps(self, obj, **kwargs):
if type(obj) == WireguardConfiguration:
return obj.toJSON()
return json.dumps(obj)
def loads(self, obj, **kwargs):
return json.loads(obj, **kwargs)
app.json = CustomJsonEncoder(app)
class WireguardConfiguration:
__parser: configparser.ConfigParser = configparser.ConfigParser(strict=False)
__parser.optionxform = str
Status: bool = False
Name: str = ""
PrivateKey: str = ""
PublicKey: str = ""
ListenPort: str = ""
Address: str = ""
DNS: str = ""
Table: str = ""
MTU: str = ""
PreUp: str = ""
PostUp: str = ""
PreDown: str = ""
PostDown: str = ""
SaveConfig: bool = False
class InvalidConfigurationFileException(Exception):
def __init__(self, m):
self.message = m
def __str__(self):
return self.message
def __init__(self, name):
self.Name = name
self.__parser.read(os.path.join(WG_CONF_PATH, f'{self.Name}.conf'))
sections = self.__parser.sections()
if "Interface" not in sections:
raise self.InvalidConfigurationFileException(
"[Interface] section not found in " + os.path.join(WG_CONF_PATH, f'{self.Name}.conf'))
interfaceConfig = dict(self.__parser.items("Interface", True))
for i in dir(self):
if str(i) in interfaceConfig.keys():
if isinstance(getattr(self, i), bool):
setattr(self, i, _strToBool(interfaceConfig[i]))
else:
setattr(self, i, interfaceConfig[i])
if self.PrivateKey:
self.PublicKey = self.__getPublicKey()
self.Status = self.__getStatus()
# Create tables in database
inspector = inspect(engine)
existingTable = inspector.get_table_names()
if self.Name not in existingTable:
_createPeerModel(self.Name).__table__.create(engine)
if self.Name + "_restrict_access" not in existingTable:
_createRestrcitedPeerModel(self.Name).__table__.create(engine)
if self.Name + "_transfer" not in existingTable:
_createPeerTransferModel(self.Name).__table__.create(engine)
def __getPublicKey(self) -> str:
return subprocess.check_output(['wg', 'pubkey'], input=self.PrivateKey.encode()).decode().strip('\n')
def __getStatus(self) -> bool:
return self.Name in dict(ifcfg.interfaces().items()).keys()
def toJSON(self):
self.Status = self.__getStatus()
return self.__dict__
def iPv46RegexCheck(ip):
return re.match(
'((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9a-f]{1,4}:){7}([0-9a-f]{1,4}|:))|(([0-9a-f]{1,4}:){6}(:[0-9a-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9a-f]{1,4}:){5}(((:[0-9a-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9a-f]{1,4}:){4}(((:[0-9a-f]{1,4}){1,3})|((:[0-9a-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9a-f]{1,4}:){3}(((:[0-9a-f]{1,4}){1,4})|((:[0-9a-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9a-f]{1,4}:){2}(((:[0-9a-f]{1,4}){1,5})|((:[0-9a-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9a-f]{1,4}:){1}(((:[0-9a-f]{1,4}){1,6})|((:[0-9a-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9a-f]{1,4}){1,7})|((:[0-9a-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))',
ip)
class DashboardConfig:
def __init__(self):
self.__config = configparser.ConfigParser(strict=False)
self.__config.read(DASHBOARD_CONF)
self.hiddenAttribute = ["totp_key"]
self.__default = {
"Account": {
"username": "admin",
"password": "admin",
"enable_totp": "false",
"totp_key": pyotp.random_base32()
},
"Server": {
"wg_conf_path": "/etc/wireguard",
"app_ip": "0.0.0.0",
"app_port": "10086",
"auth_req": "true",
"version": DASHBOARD_VERSION,
"dashboard_refresh_interval": "60000",
"dashboard_sort": "status",
"dashboard_theme": "dark"
},
"Peers": {
"peer_global_DNS": "1.1.1.1",
"peer_endpoint_allowed_ip": "0.0.0.0/0",
"peer_display_mode": "grid",
"remote_endpoint": ifcfg.default_interface()['inet'],
"peer_MTU": "1420",
"peer_keep_alive": "21"
},
"Other": {
"welcome_session": "true"
}
}
for section, keys in self.__default.items():
for key, value in keys.items():
exist, currentData = self.GetConfig(section, key)
if not exist:
self.SetConfig(section, key, value, True)
def __configValidation(self, key, value: Any) -> [bool, str]:
if type(value) is str and len(value) == 0:
return False, "Field cannot be empty!"
if key == "peer_global_dns":
value = value.split(",")
for i in value:
try:
ipaddress.ip_address(i)
except ValueError as e:
return False, str(e)
if key == "peer_endpoint_allowed_ip":
value = value.split(",")
for i in value:
try:
ipaddress.ip_network(i, strict=False)
except Exception as e:
return False, str(e)
if key == "wg_conf_path":
if not os.path.exists(value):
return False, f"{value} is not a valid path"
if key == "password":
if self.GetConfig("Account", "password")[0]:
if not self.__checkPassword(
value["currentPassword"], self.GetConfig("Account", "password")[1].encode("utf-8")):
return False, "Current password does not match."
if value["newPassword"] != value["repeatNewPassword"]:
return False, "New passwords does not match"
return True, ""
def generatePassword(self, plainTextPassword: str):
return bcrypt.hashpw(plainTextPassword.encode("utf-8"), bcrypt.gensalt(rounds=12))
def __checkPassword(self, plainTextPassword: str, hashedPassword: bytes):
return bcrypt.checkpw(plainTextPassword.encode("utf-8"), hashedPassword)
def SetConfig(self, section: str, key: str, value: any, init: bool = False) -> [bool, str]:
if key in self.hiddenAttribute and not init:
return False, None
if not init:
valid, msg = self.__configValidation(key, value)
if not valid:
return False, msg
if section == "Account" and key == "password":
if not init:
value = self.generatePassword(value["newPassword"]).decode("utf-8")
else:
value = self.generatePassword(value).decode("utf-8")
if section not in self.__config:
self.__config[section] = {}
if key not in self.__config[section].keys() or value != self.__config[section][key]:
if type(value) is bool:
if value:
self.__config[section][key] = "true"
else:
self.__config[section][key] = "false"
else:
self.__config[section][key] = value
return self.SaveConfig(), ""
return True, ""
def SaveConfig(self) -> bool:
try:
with open(DASHBOARD_CONF, "w+", encoding='utf-8') as configFile:
self.__config.write(configFile)
return True
except Exception as e:
return False
def GetConfig(self, section, key) -> [any, bool]:
if section not in self.__config:
return False, None
if key not in self.__config[section]:
return False, None
if self.__config[section][key] in ["1", "yes", "true", "on"]:
return True, True
if self.__config[section][key] in ["0", "no", "false", "off"]:
return True, False
return True, self.__config[section][key]
def toJSON(self) -> dict[str, dict[Any, Any]]:
the_dict = {}
for section in self.__config.sections():
the_dict[section] = {}
for key, val in self.__config.items(section):
if key not in self.hiddenAttribute:
if val in ["1", "yes", "true", "on"]:
the_dict[section][key] = True
elif val in ["0", "no", "false", "off"]:
the_dict[section][key] = False
else:
the_dict[section][key] = val
return the_dict
def ResponseObject(status=True, message=None, data=None) -> Flask.response_class:
response = Flask.make_response(app, {
"status": status,
"message": message,
"data": data
})
response.content_type = "application/json"
return response
DashboardConfig = DashboardConfig()
WireguardConfigurations: [WireguardConfiguration] = []
'''
Private Functions
'''
def _strToBool(value: str) -> bool:
return value.lower() in ("yes", "true", "t", "1", 1)
def _createPeerModel(wgConfigName):
class Peer(Base):
__tablename__ = wgConfigName
id = mapped_column(VARCHAR, primary_key=True)
private_key = mapped_column(VARCHAR)
DNS = mapped_column(VARCHAR)
endpoint_allowed_ip = mapped_column(VARCHAR)
name = mapped_column(VARCHAR)
total_receive = mapped_column(FLOAT)
total_sent = mapped_column(FLOAT)
total_data = mapped_column(FLOAT)
endpoint = mapped_column(VARCHAR)
status = mapped_column(VARCHAR)
latest_handshake = mapped_column(VARCHAR)
allowed_ip = mapped_column(VARCHAR)
cumu_receive = mapped_column(FLOAT)
cumu_sent = mapped_column(FLOAT)
cumu_data = mapped_column(FLOAT)
mtu = mapped_column(INT)
keepalive = mapped_column(INT)
remote_endpoint = mapped_column(VARCHAR)
preshared_key = mapped_column(VARCHAR)
return Peer
def _createRestrcitedPeerModel(wgConfigName):
class PeerRestricted(Base):
__tablename__ = wgConfigName + "_restrict_access"
id = mapped_column(VARCHAR, primary_key=True)
private_key = mapped_column(VARCHAR)
DNS = mapped_column(VARCHAR)
endpoint_allowed_ip = mapped_column(VARCHAR)
name = mapped_column(VARCHAR)
total_receive = mapped_column(FLOAT)
total_sent = mapped_column(FLOAT)
total_data = mapped_column(FLOAT)
endpoint = mapped_column(VARCHAR)
status = mapped_column(VARCHAR)
latest_handshake = mapped_column(VARCHAR)
allowed_ip = mapped_column(VARCHAR)
cumu_receive = mapped_column(FLOAT)
cumu_sent = mapped_column(FLOAT)
cumu_data = mapped_column(FLOAT)
mtu = mapped_column(INT)
keepalive = mapped_column(INT)
remote_endpoint = mapped_column(VARCHAR)
preshared_key = mapped_column(VARCHAR)
return PeerRestricted
def _createPeerTransferModel(wgConfigName):
class PeerTransfer(Base):
__tablename__ = wgConfigName + "_transfer"
id = mapped_column(VARCHAR, primary_key=True)
total_receive = mapped_column(FLOAT)
total_sent = mapped_column(FLOAT)
total_data = mapped_column(FLOAT)
cumu_receive = mapped_column(FLOAT)
cumu_sent = mapped_column(FLOAT)
cumu_data = mapped_column(FLOAT)
time = mapped_column(DATETIME)
return PeerTransfer
def _regexMatch(regex, text):
pattern = re.compile(regex)
return pattern.search(text) is not None
def _getConfigurationList() -> [WireguardConfiguration]:
configurations = []
for i in os.listdir(WG_CONF_PATH):
if _regexMatch("^(.{1,}).(conf)$", i):
i = i.replace('.conf', '')
try:
configurations.append(WireguardConfiguration(i))
except WireguardConfiguration.InvalidConfigurationFileException as e:
print(f"{i} have an invalid configuration file.")
return configurations
'''
API Routes
'''
@app.before_request
def auth_req():
authenticationRequired = DashboardConfig.GetConfig("Server", "auth_req")[1]
if authenticationRequired:
if ('/static/' not in request.path and "username" not in session and "/" != request.path
and "validateAuthentication" not in request.path and "authenticate" not in request.path
and "getDashboardConfiguration" not in request.path and "getDashboardTheme" not in request.path
and "isTotpEnabled" not in request.path
):
resp = Flask.make_response(app, "Not Authorized" + request.path)
resp.status_code = 401
return resp
@app.route('/api/validateAuthentication', methods=["GET"])
def API_ValidateAuthentication():
token = request.cookies.get("authToken") + ""
if token == "" or "username" not in session or session["username"] != token:
return ResponseObject(False, "Invalid authentication")
return ResponseObject(True)
@app.route('/api/authenticate', methods=['POST'])
def API_AuthenticateLogin():
data = request.get_json()
valid = bcrypt.checkpw(data['password'].encode("utf-8"),
DashboardConfig.GetConfig("Account", "password")[1].encode("utf-8"))
totpEnabled = DashboardConfig.GetConfig("Account", "enable_totp")[1]
totpValid = False
if totpEnabled:
totpValid = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == data['totp']
if (valid
and data['username'] == DashboardConfig.GetConfig("Account", "username")[1]
and ((totpEnabled and totpValid) or not totpEnabled)
):
authToken = hashlib.sha256(f"{data['username']}{datetime.now()}".encode()).hexdigest()
session['username'] = authToken
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
resp.set_cookie("authToken", authToken)
session.permanent = True
return resp
return ResponseObject(False, "Username, password or OTP is incorrect.")
@app.route('/api/signout')
def API_SignOut():
resp = ResponseObject(True, "")
resp.delete_cookie("authToken")
return resp
@app.route('/api/getWireguardConfigurations', methods=["GET"])
def API_getWireguardConfigurations():
WireguardConfigurations = _getConfigurationList()
return ResponseObject(data=[wc.toJSON() for wc in WireguardConfigurations])
@app.route('/api/getDashboardConfiguration', methods=["GET"])
def API_getDashboardConfiguration():
return ResponseObject(data=DashboardConfig.toJSON())
@app.route('/api/updateDashboardConfiguration', methods=["POST"])
def API_updateDashboardConfiguration():
data = request.get_json()
for section in data['DashboardConfiguration'].keys():
for key in data['DashboardConfiguration'][section].keys():
if not DashboardConfig.SetConfig(section, key, data['DashboardConfiguration'][section][key])[0]:
return ResponseObject(False, "Section or value is invalid.")
return ResponseObject()
@app.route('/api/updateDashboardConfigurationItem', methods=["POST"])
def API_updateDashboardConfigurationItem():
data = request.get_json()
if "section" not in data.keys() or "key" not in data.keys() or "value" not in data.keys():
return ResponseObject(False, "Invalid request.")
valid, msg = DashboardConfig.SetConfig(
data["section"], data["key"], data['value'])
if not valid:
return ResponseObject(False, msg)
return ResponseObject()
@app.route('/api/getDashboardTheme')
def API_getDashboardTheme():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "dashboard_theme")[1])
@app.route('/api/isTotpEnabled')
def API_isTotpEnabled():
return ResponseObject(data=DashboardConfig.GetConfig("Account", "enable_totp")[1])
@app.route('/api/Welcome_GetTotpLink')
def API_Welcome_GetTotpLink():
if DashboardConfig.GetConfig("Other", "welcome_session")[1]:
return ResponseObject(
data=pyotp.totp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).provisioning_uri(
issuer_name="WGDashboard"))
return ResponseObject(False)
@app.route('/api/Welcome_VerifyTotpLink', methods=["POST"])
def API_Welcome_VerifyTotpLink():
data = request.get_json()
if DashboardConfig.GetConfig("Other", "welcome_session")[1]:
return ResponseObject(pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == data['totp'])
return ResponseObject(False)
@app.route('/api/Welcome_Finish', methods=["POST"])
def API_Welcome_Finish():
data = request.get_json()
if DashboardConfig.GetConfig("Other", "welcome_session")[1]:
if data["username"] == "":
return ResponseObject(False, "Username cannot be blank.")
if data["newPassword"] == "" or len(data["newPassword"]) < 8:
return ResponseObject(False, "Password must be at least 8 characters")
updateUsername, updateUsernameErr = DashboardConfig.SetConfig("Account", "username", data["username"])
updatePassword, updatePasswordErr = DashboardConfig.SetConfig("Account", "password",
{
"newPassword": data["newPassword"],
"repeatNewPassword": data[
"repeatNewPassword"],
"currentPassword": "admin"
})
updateEnableTotp, updateEnableTotpErr = DashboardConfig.SetConfig("Account", "enable_totp", data["enable_totp"])
if not updateUsername or not updatePassword or not updateEnableTotp:
return ResponseObject(False, f"{updateUsernameErr},{updatePasswordErr},{updateEnableTotpErr}".strip(","))
DashboardConfig.SetConfig("Other", "welcome_session", False)
return ResponseObject()
@app.route('/', methods=['GET'])
def index():
"""
Index page related
@return: Template
"""
return render_template('index_new.html')
if __name__ == "__main__":
engine = create_engine("sqlite:///" + os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard.db'))
_, app_ip = DashboardConfig.GetConfig("Server", "app_ip")
_, app_port = DashboardConfig.GetConfig("Server", "app_port")
_, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path")
WireguardConfigurations = _getConfigurationList()
app.run(host=app_ip, debug=True, port=app_port)