refactor func_process_wireguard_status to improve data handling and parsing

This commit is contained in:
Eduardo Silva
2026-01-12 19:10:14 -03:00
parent 61b35601f3
commit e430ff6b0e
3 changed files with 78 additions and 85 deletions

View File

@@ -4,6 +4,7 @@ import os
import subprocess
import time
import uuid
from typing import Dict, Any
import pytz
import requests
@@ -211,55 +212,49 @@ def api_instance_info(request):
return JsonResponse(data)
def func_process_wireguard_status():
# Query WireGuard status from the system and construct the data dictionary
commands = {
'latest-handshakes': "wg show all latest-handshakes | expand | tr -s ' '",
'allowed-ips': "wg show all allowed-ips | expand | tr -s ' '",
'transfer': "wg show all transfer | expand | tr -s ' '",
'endpoints': "wg show all endpoints | expand | tr -s ' '",
}
def func_process_wireguard_status() -> Dict[str, Any]:
command = "wg show all dump"
data = {}
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate()
for key, command in commands.items():
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
return {"message": stderr, "status": "error"}
if process.returncode != 0:
return JsonResponse({'error': stderr}, status=400)
data: Dict[str, Dict[str, Dict[str, Any]]] = {}
current_interface = None
for line in stdout.strip().split('\n'):
parts = line.split()
if len(parts) >= 3:
interface, peer, value = parts[0], parts[1], " ".join(parts[2:])
current_interface = interface
elif len(parts) == 2 and current_interface:
peer, value = parts
else:
continue
# wg dump format is tab-separated.
# There are two kinds of lines:
# - Interface line: interface \t private_key \t public_key \t listen_port \t fwmark
# - Peer line: interface \t peer_public_key \t preshared_key \t endpoint \t allowed_ips \t latest_handshake \t transfer_rx \t transfer_tx \t persistent_keepalive
for line in stdout.strip().split("\n"):
parts = line.split("\t")
if len(parts) < 5:
continue
if interface not in data:
data[interface] = {}
interface = parts[0]
if peer not in data[interface]:
data[interface][peer] = {
'allowed-ips': [],
'latest-handshakes': '',
'transfer': {'tx': 0, 'rx': 0},
'endpoints': '',
}
# Peer lines are expected to have at least 9 fields
if len(parts) < 9:
# interface line, ignore
continue
if key == 'allowed-ips':
data[interface][peer]['allowed-ips'].append(value)
elif key == 'transfer':
rx, tx = value.split()[-2:]
data[interface][peer]['transfer'] = {'tx': int(tx), 'rx': int(rx)}
elif key == 'endpoints':
data[interface][peer]['endpoints'] = value
else:
data[interface][peer][key] = value
peer_public_key = parts[1]
endpoint = parts[3]
allowed_ips_raw = parts[4]
latest_handshake = parts[5]
transfer_rx = parts[6]
transfer_tx = parts[7]
if interface not in data:
data[interface] = {}
data[interface][peer_public_key] = {
"allowed-ips": [ip for ip in allowed_ips_raw.split(",") if ip] if allowed_ips_raw else [],
"latest-handshakes": latest_handshake or "",
"transfer": {"tx": int(transfer_tx or 0), "rx": int(transfer_rx or 0)},
"endpoints": endpoint or "",
}
return data