mirror of
https://github.com/donaldzou/WGDashboard.git
synced 2025-10-04 08:16:17 +00:00
Completed override peer settings
This commit is contained in:
@@ -59,6 +59,15 @@ def ValidateDNSAddress(addresses) -> tuple[bool, str]:
|
||||
return False, f"{address} does not appear to be an valid DNS address"
|
||||
return True, ""
|
||||
|
||||
def ValidateEndpointAllowedIPs(IPs) -> tuple[bool, str] | tuple[bool, None]:
|
||||
ips = IPs.replace(" ", "").split(",")
|
||||
for ip in ips:
|
||||
try:
|
||||
ipaddress.ip_network(ip, strict=False)
|
||||
except ValueError as e:
|
||||
return False, str(e)
|
||||
return True, None
|
||||
|
||||
def GenerateWireguardPublicKey(privateKey: str) -> tuple[bool, str] | tuple[bool, None]:
|
||||
try:
|
||||
publicKey = subprocess.check_output(f"wg pubkey", input=privateKey.encode(), shell=True,
|
||||
|
@@ -1,6 +1,9 @@
|
||||
"""
|
||||
WireGuard Configuration
|
||||
"""
|
||||
from typing import Any
|
||||
|
||||
import jinja2
|
||||
import sqlalchemy, random, shutil, configparser, ipaddress, os, subprocess, time, re, uuid, psutil, traceback
|
||||
from zipfile import ZipFile
|
||||
from datetime import datetime, timedelta
|
||||
@@ -11,7 +14,8 @@ from .DashboardConfig import DashboardConfig
|
||||
from .Peer import Peer
|
||||
from .PeerJobs import PeerJobs
|
||||
from .PeerShareLinks import PeerShareLinks
|
||||
from .Utilities import StringToBoolean, GenerateWireguardPublicKey, RegexMatch
|
||||
from .Utilities import StringToBoolean, GenerateWireguardPublicKey, RegexMatch, ValidateDNSAddress, \
|
||||
ValidateEndpointAllowedIPs
|
||||
from .WireguardConfigurationInfo import WireguardConfigurationInfo
|
||||
|
||||
|
||||
@@ -1106,19 +1110,34 @@ class WireguardConfiguration:
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
def updateConfigurationInfo(self, key: str, value) -> tuple[bool, str] | tuple[bool, None]:
|
||||
def updateConfigurationInfo(self, key: str, value: str | dict[str, str]) -> tuple[bool, Any, str] | tuple[
|
||||
bool, str, None] | tuple[bool, None, None]:
|
||||
if key == "Description":
|
||||
self.configurationInfo.Description = value
|
||||
elif key == "OverridePeerSettings":
|
||||
for key in value.keys():
|
||||
if key == "DNS" and value.get("DNS"):
|
||||
pass
|
||||
|
||||
for (key, val) in value.items():
|
||||
status, msg = self.__validateOverridePeerSettings(key, jinja2.Template(val).render(configuration=self.toJson()))
|
||||
if not status:
|
||||
return False, msg, key
|
||||
self.configurationInfo.OverridePeerSettings = (
|
||||
self.configurationInfo.OverridePeerSettings.model_validate(value))
|
||||
else:
|
||||
return False, "Key does not exist"
|
||||
return False, "Key does not exist", None
|
||||
|
||||
self.storeConfigurationInfo()
|
||||
return True, None
|
||||
return True, None, None
|
||||
|
||||
def __validateOverridePeerSettings(self, key: str, value: str | int) -> tuple[bool, None] | tuple[bool, str]:
|
||||
status = True
|
||||
msg = None
|
||||
print(value)
|
||||
if key == "DNS" and value:
|
||||
status, msg = ValidateDNSAddress(value)
|
||||
elif key == "EndpointAllowedIPs" and value:
|
||||
status, msg = ValidateEndpointAllowedIPs(value)
|
||||
elif key == "ListenPort" and value:
|
||||
if not value.isnumeric() or not (1 <= int(value) <= 65535):
|
||||
status = False
|
||||
msg = "Listen Port must be >= 1 and <= 65535"
|
||||
return status, msg
|
||||
|
Reference in New Issue
Block a user