Finished create configuration from backup

This commit is contained in:
Donald Zou
2024-10-25 23:34:07 +08:00
parent 78b65a799e
commit c3a55f8b69
6 changed files with 136 additions and 95 deletions

View File

@@ -440,7 +440,7 @@ class WireguardConfiguration:
def __str__(self):
return self.message
def __init__(self, name: str = None, data: dict = None):
def __init__(self, name: str = None, data: dict = None, backup: dict = None):
print(f"[WGDashboard] Initialized Configuration: {name}")
self.__parser: configparser.ConfigParser = configparser.ConfigParser(strict=False)
@@ -463,18 +463,31 @@ class WireguardConfiguration:
self.SaveConfig: bool = True
self.Name = name
self.__configPath = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf')
if name is not None:
if data is not None and "Backup" in data.keys():
db = self.__importDatabase(
os.path.join(
DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
'WGDashboard_Backup',
data["Backup"].replace(".conf", ".sql")))
else:
self.__createDatabase()
self.__parseConfigurationFile()
self.__initPeersList()
else:
self.Name = data["ConfigurationName"]
self.__configPath = os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{self.Name}.conf')
for i in dir(self):
if str(i) in data.keys():
if isinstance(getattr(self, i), bool):
setattr(self, i, _strToBool(data[i]))
else:
setattr(self, i, str(data[i]))
self.__parser["Interface"] = {
"PrivateKey": self.PrivateKey,
"Address": self.Address,
@@ -485,13 +498,15 @@ class WireguardConfiguration:
"PostDown": self.PostDown,
"SaveConfig": "true"
}
with open(self.__configPath, "w+") as configFile:
self.__parser.write(configFile)
self.__createDatabase()
self.__initPeersList()
if "Backup" not in data.keys():
self.__createDatabase()
with open(self.__configPath, "w+") as configFile:
self.__parser.write(configFile)
self.__initPeersList()
def __initPeersList(self):
self.Peers: list[Peer] = []
@@ -588,6 +603,18 @@ class WireguardConfiguration:
):
yield line
def __importDatabase(self, sqlFilePath) -> bool:
self.__dropDatabase()
self.__createDatabase()
if not os.path.exists(sqlFilePath):
return False
with open(sqlFilePath, 'r') as f:
for l in f.readlines():
l = l.rstrip("\n")
if len(l) > 0:
sqlUpdate(l)
return True
def __getPublicKey(self) -> str:
return _generatePublicKey(self.PrivateKey)[1]
@@ -993,14 +1020,7 @@ class WireguardConfiguration:
return False
self.__parseConfigurationFile()
self.__dropDatabase()
self.__createDatabase()
if (os.path.exists(targetSQL)):
with open(targetSQL, 'r') as sqlFile:
for l in sqlFile.readlines():
l = l.rstrip('\n')
if len(l) > 0:
sqlUpdate(l)
self.__importDatabase(targetSQL)
self.__initPeersList()
return True
@@ -1414,16 +1434,13 @@ class DashboardConfig:
Private Functions
'''
def _strToBool(value: str) -> bool:
return value.lower() in ("yes", "true", "t", "1", 1)
def _regexMatch(regex, text):
pattern = re.compile(regex)
return pattern.search(text) is not None
def _getConfigurationList():
for i in os.listdir(DashboardConfig.GetConfig("Server", "wg_conf_path")[1]):
if _regexMatch("^(.{1,}).(conf)$", i):
@@ -1437,8 +1454,6 @@ def _getConfigurationList():
except WireguardConfiguration.InvalidConfigurationFileException as e:
print(f"{i} have an invalid configuration file.")
def _checkIPWithRange(ip):
ip_patterns = (
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|\/)){4}([0-9]{1,2})(,|$)",
@@ -1455,7 +1470,6 @@ def _checkIPWithRange(ip):
return result
def _checkIP(ip):
ip_patterns = (
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}",
@@ -1471,7 +1485,6 @@ def _checkIP(ip):
return result
def _checkDNS(dns):
dns = dns.replace(' ', '').split(',')
for i in dns:
@@ -1479,7 +1492,6 @@ def _checkDNS(dns):
return False, f"{i} does not appear to be an valid DNS address"
return True, ""
def _generatePublicKey(privateKey) -> tuple[bool, str] | tuple[bool, None]:
try:
publicKey = subprocess.check_output(f"wg pubkey", input=privateKey.encode(), shell=True,
@@ -1488,7 +1500,6 @@ def _generatePublicKey(privateKey) -> tuple[bool, str] | tuple[bool, None]:
except subprocess.CalledProcessError:
return False, None
def _generatePrivateKey() -> [bool, str]:
try:
publicKey = subprocess.check_output(f"wg genkey", shell=True,
@@ -1696,23 +1707,11 @@ def API_getWireguardConfigurations():
@app.route(f'{APP_PREFIX}/api/addWireguardConfiguration', methods=["POST"])
def API_addWireguardConfiguration():
data = request.get_json()
keys = [
"ConfigurationName",
"Address",
"ListenPort",
"PrivateKey",
"PublicKey",
"PresharedKey",
"PreUp",
"PreDown",
"PostUp",
"PostDown",
]
requiredKeys = [
"ConfigurationName", "Address", "ListenPort", "PrivateKey"
]
for i in keys:
if i not in data.keys() or (i in requiredKeys and len(str(data[i])) == 0):
for i in requiredKeys:
if i not in data.keys():
return ResponseObject(False, "Please provide all required parameters.")
# Check duplicate names, ports, address
@@ -1732,7 +1731,24 @@ def API_addWireguardConfiguration():
f"Already have a configuration with the address \"{data['Address']}\"",
"Address")
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data)
if "Backup" in data.keys():
if not os.path.exists(os.path.join(
DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
'WGDashboard_Backup',
data["Backup"])) or not os.path.exists(os.path.join(
DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
'WGDashboard_Backup',
data["Backup"].replace('.conf', '.sql'))):
return ResponseObject(False, "Backup file does not exist")
shutil.copy(
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], 'WGDashboard_Backup', data["Backup"]),
os.path.join(DashboardConfig.GetConfig("Server", "wg_conf_path")[1], f'{data["ConfigurationName"]}.conf')
)
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data, name=data['ConfigurationName'])
else:
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data)
return ResponseObject()