mirror of
https://github.com/donaldzou/WGDashboard.git
synced 2025-10-04 00:06:18 +00:00
Updated how available IP is generated
This commit is contained in:
132
src/dashboard.py
132
src/dashboard.py
@@ -10,6 +10,7 @@ from json import JSONEncoder
|
||||
from flask_cors import CORS
|
||||
from icmplib import ping, traceroute
|
||||
from flask.json.provider import DefaultJSONProvider
|
||||
from itertools import islice
|
||||
from Utilities import (
|
||||
RegexMatch, GetRemoteEndpoint, StringToBoolean,
|
||||
ValidateIPAddressesWithRange, ValidateDNSAddress,
|
||||
@@ -1107,41 +1108,69 @@ class WireguardConfiguration:
|
||||
return False, str(e)
|
||||
return True, None
|
||||
|
||||
def getAvailableIP(self, all: bool = False) -> tuple[bool, list[str]] | tuple[bool, None]:
|
||||
def getNumberOfAvailableIP(self):
|
||||
if len(self.Address) < 0:
|
||||
return False, None
|
||||
address = self.Address.split(',')
|
||||
existedAddress = []
|
||||
availableAddress = []
|
||||
for p in self.Peers:
|
||||
if len(p.allowed_ip) > 0:
|
||||
add = p.allowed_ip.split(',')
|
||||
for i in add:
|
||||
a, c = i.split('/')
|
||||
existedAddress = set()
|
||||
availableAddress = {}
|
||||
for p in self.Peers + self.getRestrictedPeersList():
|
||||
peerAllowedIP = p.allowed_ip.split(',')
|
||||
for pip in peerAllowedIP:
|
||||
ppip = pip.strip().split('/')
|
||||
if len(ppip) == 2:
|
||||
try:
|
||||
existedAddress.append(ipaddress.ip_address(a.replace(" ", "")))
|
||||
except ValueError as error:
|
||||
check = ipaddress.ip_network(ppip[0])
|
||||
existedAddress.add(check)
|
||||
except Exception as e:
|
||||
print(f"[WGDashboard] Error: {self.Name} peer {p.id} have invalid ip")
|
||||
for p in self.getRestrictedPeersList():
|
||||
if len(p.allowed_ip) > 0:
|
||||
add = p.allowed_ip.split(',')
|
||||
for i in add:
|
||||
a, c = i.split('/')
|
||||
existedAddress.append(ipaddress.ip_address(a.replace(" ", "")))
|
||||
for i in address:
|
||||
addressSplit, cidr = i.split('/')
|
||||
existedAddress.append(ipaddress.ip_address(addressSplit.replace(" ", "")))
|
||||
for i in address:
|
||||
network = ipaddress.ip_network(i.replace(" ", ""), False)
|
||||
count = 0
|
||||
for h in network.hosts():
|
||||
if h not in existedAddress:
|
||||
availableAddress.append(ipaddress.ip_network(h).compressed)
|
||||
count += 1
|
||||
if not all:
|
||||
if network.version == 6 and count > 255:
|
||||
break
|
||||
|
||||
configurationAddresses = self.Address.split(',')
|
||||
for ca in configurationAddresses:
|
||||
ca = ca.strip()
|
||||
caSplit = ca.split('/')
|
||||
try:
|
||||
if len(caSplit) == 2:
|
||||
network = ipaddress.ip_network(ca, False)
|
||||
existedAddress.add(ipaddress.ip_network(caSplit[0]))
|
||||
availableAddress[ca] = network.num_addresses
|
||||
for p in existedAddress:
|
||||
if p.subnet_of(network):
|
||||
availableAddress[ca] -= 1
|
||||
|
||||
# map(lambda iph : ipaddress.ip_network(iph).compressed, network.hosts())
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print(f"[WGDashboard] Error: Failed to parse IP address {ca} from {self.Name}")
|
||||
return True, availableAddress
|
||||
|
||||
def getAvailableIP(self, threshold = 255) -> tuple[bool, list[str]] | tuple[bool, None]:
|
||||
if len(self.Address) < 0:
|
||||
return False, None
|
||||
existedAddress = set()
|
||||
availableAddress = {}
|
||||
for p in self.Peers + self.getRestrictedPeersList():
|
||||
peerAllowedIP = p.allowed_ip.split(',')
|
||||
for pip in peerAllowedIP:
|
||||
ppip = pip.strip().split('/')
|
||||
if len(ppip) == 2:
|
||||
try:
|
||||
check = ipaddress.ip_network(ppip[0])
|
||||
existedAddress.add(pip)
|
||||
except Exception as e:
|
||||
print(f"[WGDashboard] Error: {self.Name} peer {p.id} have invalid ip")
|
||||
configurationAddresses = self.Address.split(',')
|
||||
for ca in configurationAddresses:
|
||||
ca = ca.strip()
|
||||
caSplit = ca.split('/')
|
||||
try:
|
||||
if len(caSplit) == 2:
|
||||
network = ipaddress.ip_network(ca, False)
|
||||
existedAddress.add(ipaddress.ip_network(caSplit[0]).compressed)
|
||||
availableAddress[ca] = list(islice(filter(lambda ip : ip not in existedAddress,
|
||||
map(lambda iph : ipaddress.ip_network(iph).compressed, network.hosts())), threshold))
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print(f"[WGDashboard] Error: Failed to parse IP address {ca} from {self.Name}")
|
||||
return True, availableAddress
|
||||
|
||||
def getRealtimeTrafficUsage(self):
|
||||
@@ -2491,7 +2520,7 @@ def API_addPeers(configName):
|
||||
|
||||
public_key: str = data.get('public_key', "")
|
||||
allowed_ips: list[str] = data.get('allowed_ips', [])
|
||||
override_allowed_ips: bool = data.get('override_allowed_ips', False)
|
||||
allowed_ips_validation: bool = data.get('allowed_ips_validation', True)
|
||||
|
||||
endpoint_allowed_ip: str = data.get('endpoint_allowed_ip', DashboardConfig.GetConfig("Peers", "peer_endpoint_allowed_ip")[1])
|
||||
dns_addresses: str = data.get('DNS', DashboardConfig.GetConfig("Peers", "peer_global_DNS")[1])
|
||||
@@ -2508,21 +2537,24 @@ def API_addPeers(configName):
|
||||
if len(endpoint_allowed_ip) == 0:
|
||||
endpoint_allowed_ip = DashboardConfig.GetConfig("Peers", "peer_endpoint_allowed_ip")[1]
|
||||
config = WireguardConfigurations.get(configName)
|
||||
# if not bulkAdd and (len(public_key) == 0 or len(allowed_ips) == 0):
|
||||
# return ResponseObject(False, "Please provide at least public_key and allowed_ips")
|
||||
if not config.getStatus():
|
||||
config.toggleConfiguration()
|
||||
availableIps = config.getAvailableIP()
|
||||
ipStatus, availableIps = config.getAvailableIP()
|
||||
defaultIPSubnet = list(availableIps.keys())[0]
|
||||
if bulkAdd:
|
||||
if type(preshared_key_bulkAdd) is not bool:
|
||||
preshared_key_bulkAdd = False
|
||||
if type(bulkAddAmount) is not int or bulkAddAmount < 1:
|
||||
return ResponseObject(False, "Please specify amount of peers you want to add")
|
||||
if not availableIps[0]:
|
||||
if not ipStatus:
|
||||
return ResponseObject(False, "No more available IP can assign")
|
||||
if bulkAddAmount > len(availableIps[1]):
|
||||
if len(availableIps.keys()) == 0:
|
||||
return ResponseObject(False, "This configuration does not have any IP address available")
|
||||
|
||||
|
||||
if bulkAddAmount > len(availableIps[defaultIPSubnet]):
|
||||
return ResponseObject(False,
|
||||
f"The maximum number of peers can add is {len(availableIps[1])}")
|
||||
f"The maximum number of peers can add is {len(availableIps[defaultIPSubnet])}")
|
||||
keyPairs = []
|
||||
for i in range(bulkAddAmount):
|
||||
newPrivateKey = GenerateWireguardPrivateKey()[1]
|
||||
@@ -2530,7 +2562,7 @@ def API_addPeers(configName):
|
||||
"private_key": newPrivateKey,
|
||||
"id": GenerateWireguardPublicKey(newPrivateKey)[1],
|
||||
"preshared_key": (GenerateWireguardPrivateKey()[1] if preshared_key_bulkAdd else ""),
|
||||
"allowed_ip": availableIps[1][i],
|
||||
"allowed_ip": availableIps[defaultIPSubnet][i],
|
||||
"name": f"BulkPeer #{(i + 1)}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
|
||||
"DNS": dns_addresses,
|
||||
"endpoint_allowed_ip": endpoint_allowed_ip,
|
||||
@@ -2554,14 +2586,19 @@ def API_addPeers(configName):
|
||||
public_key = GenerateWireguardPublicKey(private_key)[1]
|
||||
|
||||
if len(allowed_ips) == 0:
|
||||
if availableIps[0]:
|
||||
allowed_ips = [availableIps[1][0]]
|
||||
if ipStatus:
|
||||
allowed_ips = [availableIps[defaultIPSubnet][0]]
|
||||
else:
|
||||
return ResponseObject(False, "No more available IP can assign")
|
||||
|
||||
if not override_allowed_ips:
|
||||
if allowed_ips_validation:
|
||||
for i in allowed_ips:
|
||||
if i not in availableIps[1]:
|
||||
found = False
|
||||
for key in availableIps.keys():
|
||||
if i in availableIps[key]:
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
return ResponseObject(False, f"This IP is not available: {i}")
|
||||
|
||||
status, result = config.addPeers([
|
||||
@@ -2580,7 +2617,7 @@ def API_addPeers(configName):
|
||||
)
|
||||
return ResponseObject(status=status, message=result['message'], data=result['peers'])
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print(e, str(e.__traceback__))
|
||||
return ResponseObject(False, "Add peers failed. Please see data for specific issue")
|
||||
|
||||
return ResponseObject(False, "Configuration does not exist")
|
||||
@@ -2618,6 +2655,13 @@ def API_getAvailableIPs(configName):
|
||||
status, ips = WireguardConfigurations.get(configName).getAvailableIP()
|
||||
return ResponseObject(status=status, data=ips)
|
||||
|
||||
@app.get(f"{APP_PREFIX}/api/getNumberOfAvailableIPs/<configName>")
|
||||
def API_getNumberOfAvailableIPs(configName):
|
||||
if configName not in WireguardConfigurations.keys():
|
||||
return ResponseObject(False, "Configuration does not exist")
|
||||
status, ips = WireguardConfigurations.get(configName).getNumberOfAvailableIP()
|
||||
return ResponseObject(status=status, data=ips)
|
||||
|
||||
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationInfo')
|
||||
def API_getConfigurationInfo():
|
||||
configurationName = request.args.get("configurationName")
|
||||
|
Reference in New Issue
Block a user