Adjusted some API endpoint
Some checks failed
Qodana / qodana (push) Has been cancelled

This commit is contained in:
Donald Zou
2025-04-19 02:54:47 +08:00
parent 66f4507dee
commit 6681303450
48 changed files with 191 additions and 156 deletions

View File

@@ -50,12 +50,13 @@ app.json = CustomJsonEncoder(app)
'''
Response Object
'''
def ResponseObject(status=True, message=None, data=None) -> Flask.response_class:
def ResponseObject(status=True, message=None, data=None, status_code = 200) -> Flask.response_class:
response = Flask.make_response(app, {
"status": status,
"message": message,
"data": data
})
response.status_code = status_code
response.content_type = "application/json"
return response
@@ -115,18 +116,19 @@ class PeerJobs:
def searchJob(self, Configuration: str, Peer: str):
return list(filter(lambda x: x.Configuration == Configuration and x.Peer == Peer, self.Jobs))
def searchJobById(self, JobID):
return list(filter(lambda x: x.JobID == JobID, self.Jobs))
def saveJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]:
try:
with self.jobdb:
jobdbCursor = self.jobdb.cursor()
if (len(str(Job.CreationDate))) == 0:
if len(self.searchJobById(Job.JobID)) == 0:
jobdbCursor.execute('''
INSERT INTO PeerJobs VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S','now'), NULL, ?)
''', (Job.JobID, Job.Configuration, Job.Peer, Job.Field, Job.Operator, Job.Value, Job.Action,))
JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
else:
currentJob = jobdbCursor.execute('SELECT * FROM PeerJobs WHERE JobID = ?', (Job.JobID, )).fetchone()
if currentJob is not None:
@@ -135,9 +137,9 @@ class PeerJobs:
''', (Job.Field, Job.Operator, Job.Value, Job.Action, Job.JobID))
JobLogger.log(Job.JobID,
Message=f"Job is updated from if {currentJob['Field']} {currentJob['Operator']} {currentJob['value']} then {currentJob['Action']}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
self.jobdb.commit()
self.__getJobs()
return True, list(
filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID,
self.Jobs))
@@ -1887,15 +1889,22 @@ class DashboardConfig:
value = self.generatePassword(value["newPassword"]).decode("utf-8")
else:
value = self.generatePassword(value).decode("utf-8")
if section == "Email" and key == "email_template":
value = value.encode('unicode_escape').decode('utf-8')
if section == "Server" and key == "wg_conf_path":
if not os.path.exists(value):
return False, "Path does not exist"
if section not in self.__config:
self.__config[section] = {}
if key not in self.__config[section].keys() or value != self.__config[section][key]:
if init:
self.__config[section] = {}
else:
return False, "Section does not exist"
if ((key not in self.__config[section].keys() and init) or
(key in self.__config[section].keys() and value != self.__config[section][key] and not init)):
if type(value) is bool:
if value:
self.__config[section][key] = "true"
@@ -1908,6 +1917,8 @@ class DashboardConfig:
else:
self.__config[section][key] = value
return self.SaveConfig(), ""
else:
return False, f"{key} does not exist under {section}"
return True, ""
def SaveConfig(self) -> bool:
@@ -1925,14 +1936,18 @@ class DashboardConfig:
if key not in self.__config[section]:
return False, None
if section == "Email" and key == "email_template":
return True, self.__config[section][key].encode('utf-8').decode('unicode_escape')
if section == "WireGuardConfiguration" and key == "autostart":
return True, list(filter(lambda x: len(x) > 0, self.__config[section][key].split("||")))
if self.__config[section][key] in ["1", "yes", "true", "on"]:
return True, True
if self.__config[section][key] in ["0", "no", "false", "off"]:
return True, False
if section == "WireGuardConfiguration" and key == "autostart":
return True, list(filter(lambda x: len(x) > 0, self.__config[section][key].split("||")))
return True, self.__config[section][key]
@@ -2169,7 +2184,7 @@ def API_toggleWireguardConfiguration():
configurationName = request.args.get('configurationName')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide a valid configuration name")
return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
toggleStatus, msg = WireguardConfigurations[configurationName].toggleConfiguration()
return ResponseObject(toggleStatus, msg, WireguardConfigurations[configurationName].Status)
@@ -2182,7 +2197,7 @@ def API_updateWireguardConfiguration():
return ResponseObject(False, "Please provide these following field: " + ", ".join(requiredKeys))
name = data.get("Name")
if name not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(False, "Configuration does not exist", status_code=404)
status, msg = WireguardConfigurations[name].updateConfigurationSettings(data)
@@ -2193,7 +2208,7 @@ def API_GetWireguardConfigurationRawFile():
configurationName = request.args.get('configurationName')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide a valid configuration name")
return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
return ResponseObject(data={
"path": WireguardConfigurations[configurationName].configPath,
@@ -2219,7 +2234,7 @@ def API_UpdateWireguardConfigurationRawFile():
def API_deleteWireguardConfiguration():
data = request.get_json()
if "ConfigurationName" not in data.keys() or data.get("ConfigurationName") is None or data.get("ConfigurationName") not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide the configuration name you want to delete")
return ResponseObject(False, "Please provide the configuration name you want to delete", status_code=404)
status = WireguardConfigurations[data.get("ConfigurationName")].deleteConfiguration()
if status:
WireguardConfigurations.pop(data.get("ConfigurationName"))
@@ -2232,7 +2247,7 @@ def API_renameWireguardConfiguration():
for k in keys:
if (k not in data.keys() or data.get(k) is None or len(data.get(k)) == 0 or
(k == "ConfigurationName" and data.get(k) not in WireguardConfigurations.keys())):
return ResponseObject(False, "Please provide the configuration name you want to rename")
return ResponseObject(False, "Please provide the configuration name you want to rename", status_code=404)
status, message = WireguardConfigurations[data.get("ConfigurationName")].renameConfiguration(data.get("NewConfigurationName"))
if status:
@@ -2244,14 +2259,14 @@ def API_renameWireguardConfiguration():
def API_getWireguardConfigurationRealtimeTraffic():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(data=WireguardConfigurations[configurationName].getRealtimeTrafficUsage())
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationBackup')
def API_getWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(data=WireguardConfigurations[configurationName].getBackups())
@app.get(f'{APP_PREFIX}/api/getAllWireguardConfigurationBackup')
@@ -2297,7 +2312,7 @@ def API_getAllWireguardConfigurationBackup():
def API_createWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(status=WireguardConfigurations[configurationName].backupConfigurationFile()[0],
data=WireguardConfigurations[configurationName].getBackups())
@@ -2309,22 +2324,24 @@ def API_deleteWireguardConfigurationBackup():
len(data['ConfigurationName']) == 0 or
len(data['BackupFileName']) == 0):
return ResponseObject(False,
"Please provide configurationName and backupFileName in body")
"Please provide configurationName and backupFileName in body", status_code=400)
configurationName = data['ConfigurationName']
backupFileName = data['BackupFileName']
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(WireguardConfigurations[configurationName].deleteBackup(backupFileName))
status = WireguardConfigurations[configurationName].deleteBackup(backupFileName)
return ResponseObject(status=status, message=(None if status else 'Backup file does not exist'),
status_code = (200 if status else 404))
@app.get(f'{APP_PREFIX}/api/downloadWireguardConfigurationBackup')
def API_downloadWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
backupFileName = request.args.get('backupFileName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(False, "Configuration does not exist", status_code=404)
status, zip = WireguardConfigurations[configurationName].downloadBackup(backupFileName)
return ResponseObject(status, data=zip)
return ResponseObject(status, data=zip, status_code=(200 if status else 404))
@app.post(f'{APP_PREFIX}/api/restoreWireguardConfigurationBackup')
def API_restoreWireguardConfigurationBackup():
@@ -2334,13 +2351,14 @@ def API_restoreWireguardConfigurationBackup():
len(data['ConfigurationName']) == 0 or
len(data['BackupFileName']) == 0):
return ResponseObject(False,
"Please provide configurationName and backupFileName in body")
"Please provide ConfigurationName and BackupFileName in body", status_code=400)
configurationName = data['ConfigurationName']
backupFileName = data['BackupFileName']
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(WireguardConfigurations[configurationName].restoreBackup(backupFileName))
return ResponseObject(False, "Configuration does not exist", status_code=404)
status = WireguardConfigurations[configurationName].restoreBackup(backupFileName)
return ResponseObject(status=status, message=(None if status else 'Restore backup failed'))
@app.get(f'{APP_PREFIX}/api/getDashboardConfiguration')
def API_getDashboardConfiguration():
@@ -2354,7 +2372,7 @@ def API_updateDashboardConfigurationItem():
valid, msg = DashboardConfig.SetConfig(
data["section"], data["key"], data['value'])
if not valid:
return ResponseObject(False, msg)
return ResponseObject(False, msg, status_code=404)
if data['section'] == "Server":
if data['key'] == 'wg_conf_path':
WireguardConfigurations.clear()
@@ -2373,7 +2391,7 @@ def API_newDashboardAPIKey():
data = request.get_json()
if DashboardConfig.GetConfig('Server', 'dashboard_api_key'):
try:
if data['neverExpire']:
if data['NeverExpire']:
expiredAt = None
else:
expiredAt = datetime.strptime(data['ExpiredAt'], '%Y-%m-%d %H:%M:%S')
@@ -2390,6 +2408,8 @@ def API_deleteDashboardAPIKey():
if len(data['Key']) > 0 and len(list(filter(lambda x : x.Key == data['Key'], DashboardConfig.DashboardAPIKeys))) > 0:
DashboardConfig.deleteAPIKey(data['Key'])
return ResponseObject(True, data=DashboardConfig.DashboardAPIKeys)
else:
return ResponseObject(False, "API Key does not exist", status_code=404)
return ResponseObject(False, "Dashboard API Keys function is disbaled")
@app.post(f'{APP_PREFIX}/api/updatePeerSettings/<configName>')
@@ -2457,7 +2477,7 @@ def API_restrictPeers(configName: str) -> ResponseObject:
return ResponseObject(False, "Please specify one or more peers")
configuration = WireguardConfigurations.get(configName)
return configuration.restrictPeers(peers)
return ResponseObject(False, "Configuration does not exist")
return ResponseObject(False, "Configuration does not exist", status_code=404)
@app.post(f'{APP_PREFIX}/api/sharePeer/create')
def API_sharePeer_create():
@@ -2549,10 +2569,6 @@ def API_addPeers(configName):
mtu = int(DashboardConfig.GetConfig("Peers", "peer_MTU")[1])
if type(keep_alive) is not int or keep_alive < 0:
keep_alive = int(DashboardConfig.GetConfig("Peers", "peer_keep_alive")[1])
if len(dns_addresses) == 0:
dns_addresses = DashboardConfig.GetConfig("Peers", "peer_global_DNS")[1]
if len(endpoint_allowed_ip) == 0:
endpoint_allowed_ip = DashboardConfig.GetConfig("Peers", "peer_endpoint_allowed_ip")[1]
config = WireguardConfigurations.get(configName)
if not config.getStatus():
config.toggleConfiguration()
@@ -2603,10 +2619,26 @@ def API_addPeers(configName):
return ResponseObject(False, f"This peer already exist")
name = data.get("name", "")
private_key = data.get("private_key", "")
if len(public_key) == 0:
if len(private_key) == 0:
private_key = GenerateWireguardPrivateKey()[1]
public_key = GenerateWireguardPublicKey(private_key)[1]
else:
public_key = GenerateWireguardPublicKey(private_key)[1]
else:
if len(private_key) > 0:
genPub = GenerateWireguardPublicKey(private_key)[1]
# Check if provided pubkey match provided private key
if public_key != genPub:
return ResponseObject(False, "Provided Public Key does not match provided Private Key")
# if len(public_key) == 0 and len(private_key) == 0:
# private_key = GenerateWireguardPrivateKey()[1]
# public_key = GenerateWireguardPublicKey(private_key)[1]
# elif len(public_key) == 0 and len(private_key) > 0:
# public_key = GenerateWireguardPublicKey(private_key)[1]
if len(public_key) == 0 and len(private_key) == 0:
private_key = GenerateWireguardPrivateKey()[1]
public_key = GenerateWireguardPublicKey(private_key)[1]
if len(allowed_ips) == 0:
if ipStatus:
@@ -2710,19 +2742,22 @@ def API_getDashboardTheme():
def API_getDashboardVersion():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "version")[1])
@app.post(f'{APP_PREFIX}/api/savePeerScheduleJob/')
@app.post(f'{APP_PREFIX}/api/savePeerScheduleJob')
def API_savePeerScheduleJob():
data = request.json
if "Job" not in data.keys() not in WireguardConfigurations.keys():
if "Job" not in data.keys():
return ResponseObject(False, "Please specify job")
job: dict = data['Job']
if "Peer" not in job.keys() or "Configuration" not in job.keys():
return ResponseObject(False, "Please specify peer and configuration")
configuration = WireguardConfigurations.get(job['Configuration'])
if configuration is None:
return ResponseObject(False, "Configuration does not exist")
f, fp = configuration.searchPeer(job['Peer'])
if not f:
return ResponseObject(False, "Peer does not exist")
s, p = AllPeerJobs.saveJob(PeerJob(
job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'],
job['CreationDate'], job['ExpireDate'], job['Action']))
@@ -2730,15 +2765,17 @@ def API_savePeerScheduleJob():
return ResponseObject(s, data=p)
return ResponseObject(s, message=p)
@app.post(f'{APP_PREFIX}/api/deletePeerScheduleJob/')
@app.post(f'{APP_PREFIX}/api/deletePeerScheduleJob')
def API_deletePeerScheduleJob():
data = request.json
if "Job" not in data.keys() not in WireguardConfigurations.keys():
if "Job" not in data.keys():
return ResponseObject(False, "Please specify job")
job: dict = data['Job']
if "Peer" not in job.keys() or "Configuration" not in job.keys():
return ResponseObject(False, "Please specify peer and configuration")
configuration = WireguardConfigurations.get(job['Configuration'])
if configuration is None:
return ResponseObject(False, "Configuration does not exist")
f, fp = configuration.searchPeer(job['Peer'])
if not f:
return ResponseObject(False, "Peer does not exist")
@@ -2817,7 +2854,6 @@ def API_ping_execute():
try:
if ip is not None and len(ip) > 0 and count is not None and count.isnumeric():
result = ping(ip, count=int(count), source=None)
data = {
"address": result.address,
"is_alive": result.is_alive,
@@ -2829,8 +2865,6 @@ def API_ping_execute():
"package_loss": result.packet_loss,
"geo": None
}
try:
r = requests.get(f"http://ip-api.com/json/{result.address}?field=city")
data['geo'] = r.json()
@@ -3008,7 +3042,8 @@ def API_Email_Send():
return ResponseObject(False, "Please at least specify receiver")
body = data.get('Body', '')
download = None
if "ConfigurationName" in data.keys() and "Peer" in data.keys():
if ("ConfigurationName" in data.keys()
and "Peer" in data.keys()):
if data.get('ConfigurationName') in WireguardConfigurations.keys():
configuration = WireguardConfigurations.get(data.get('ConfigurationName'))
attachmentName = ""
@@ -3020,12 +3055,13 @@ def API_Email_Send():
body = template.render(peer=p.toJson(), configurationFile=download)
if data.get('IncludeAttachment', False):
u = str(uuid4())
with open(os.path.join('./attachments', download['fileName'],), 'w+') as f:
f.write(download['file'])
attachmentName = f'{u}.conf'
with open(os.path.join('./attachments', attachmentName,), 'w+') as f:
f.write(download['file'])
s, m = EmailSender.send(data.get('Receiver'), data.get('Subject', ''), body,
data.get('IncludeAttachment', False), (download.get('fileName', '') if download else ''))
data.get('IncludeAttachment', False), (attachmentName if download else ''))
return ResponseObject(s, m)
@app.post(f'{APP_PREFIX}/api/email/previewBody')