add WireGuard status processing and caching functionality

This commit is contained in:
Eduardo Silva
2026-01-07 11:07:45 -03:00
parent bf0ada3d89
commit 41fbf442f2
7 changed files with 179 additions and 2 deletions

View File

@@ -1,3 +1,10 @@
from django.contrib import admin
from .models import WireguardStatusCache
# Register your models here.
class WireguardStatusCacheAdmin(admin.ModelAdmin):
list_display = ('cache_type', 'processing_time_ms', 'created', 'updated')
readonly_fields = ('uuid', 'created', 'updated')
admin.site.register(WireguardStatusCache, WireguardStatusCacheAdmin)

View File

@@ -0,0 +1,20 @@
# Generated by Django 5.2.9 on 2026-01-07 13:28
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='wireguardstatuscache',
name='uuid',
field=models.UUIDField(default=uuid.uuid4, editable=False, unique=True),
),
]

View File

@@ -1,10 +1,13 @@
import uuid
from django.db import models
class WireguardStatusCache(models.Model):
cache_type = models.CharField(choices=(('master', 'Master'), ('cluster', 'Cluster')), max_length=16)
data = models.JSONField()
processing_time_ms = models.PositiveIntegerField()
uuid = models.UUIDField(unique=True)
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)

View File

@@ -2,6 +2,7 @@ import base64
import datetime
import os
import subprocess
import time
import uuid
import pytz
@@ -17,6 +18,7 @@ from django.shortcuts import get_object_or_404, redirect
from django.utils import timezone
from django.views.decorators.http import require_http_methods
from api.models import WireguardStatusCache
from user_manager.models import AuthenticationToken, UserAcl
from vpn_invite.models import InviteSettings, PeerInvite
from wgwadmlibrary.tools import create_peer_invite, get_peer_invite_data, send_email, user_allowed_peers, \
@@ -208,12 +210,153 @@ def api_instance_info(request):
}
return JsonResponse(data)
def func_process_wireguard_status():
# Query WireGuard status from the system and construct the data dictionary
commands = {
'latest-handshakes': "wg show all latest-handshakes | expand | tr -s ' '",
'allowed-ips': "wg show all allowed-ips | expand | tr -s ' '",
'transfer': "wg show all transfer | expand | tr -s ' '",
'endpoints': "wg show all endpoints | expand | tr -s ' '",
}
data = {}
for key, command in commands.items():
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
return JsonResponse({'error': stderr}, status=400)
current_interface = None
for line in stdout.strip().split('\n'):
parts = line.split()
if len(parts) >= 3:
interface, peer, value = parts[0], parts[1], " ".join(parts[2:])
current_interface = interface
elif len(parts) == 2 and current_interface:
peer, value = parts
else:
continue
if interface not in data:
data[interface] = {}
if peer not in data[interface]:
data[interface][peer] = {
'allowed-ips': [],
'latest-handshakes': '',
'transfer': {'tx': 0, 'rx': 0},
'endpoints': '',
}
if key == 'allowed-ips':
data[interface][peer]['allowed-ips'].append(value)
elif key == 'transfer':
rx, tx = value.split()[-2:]
data[interface][peer]['transfer'] = {'tx': int(tx), 'rx': int(rx)}
elif key == 'endpoints':
data[interface][peer]['endpoints'] = value
else:
data[interface][peer][key] = value
return data
def func_apply_enhanced_filter(data: dict, user_acl: UserAcl):
# Remove peers and instances that are not allowed for the user
if user_acl.enable_enhanced_filter:
pass
else:
pass
return data
def func_get_wireguard_status():
if settings.WIREGUARD_STATUS_CACHE_ENABLED:
cache_entry = WireguardStatusCache.objects.filter(cache_type='master').order_by('-created').first()
if cache_entry:
data = cache_entry.data
data['cache_information'] = {
'processing_time_ms': cache_entry.processing_time_ms,
'created': cache_entry.created.isoformat(),
'cache_type': cache_entry.cache_type,
'cache_hit': True,
'cache_enabled': True,
'cache_uuid': str(cache_entry.uuid),
}
data['status'] = 'success'
data['message'] = 'WireGuard status retrieved from cache'
else:
data = {
'status': 'error',
'message': 'No cache entry found',
'cache_information': {
'cache_hit': False,
'cache_enabled': True,
}
}
else:
data = func_process_wireguard_status()
data['cache_information'] = {
'cache_hit': False,
'cache_enabled': False,
}
data['status'] = 'success'
data['message'] = 'WireGuard status retrieved without cache'
return data
def cron_refresh_wireguard_status_cache(request):
data = {'status': 'success'}
WireguardStatusCache.objects.filter(created__lt=timezone.now() - timezone.timedelta(seconds=settings.WIREGUARD_STATUS_CACHE_MAX_AGE)).delete()
if not settings.WIREGUARD_STATUS_CACHE_ENABLED:
return JsonResponse(data)
start_time = time.monotonic()
data = func_process_wireguard_status()
end_time = time.monotonic()
processing_time_ms = int((end_time - start_time) * 1000)
WireguardStatusCache.objects.create(data=data, processing_time_ms=processing_time_ms, cache_type='master')
return JsonResponse(data)
@require_http_methods(["GET"])
def wireguard_status(request):
user_acl = None
enhanced_filter = False
filter_peer_list = []
if request.user.is_authenticated:
user_acl = get_object_or_404(UserAcl, user=request.user)
if user_acl.enable_enhanced_filter and user_acl.peer_groups.count() > 0:
enhanced_filter = True
elif request.GET.get('key'):
api_key = get_api_key('api')
if api_key and api_key == request.GET.get('key'):
pass
else:
return HttpResponseForbidden()
elif request.GET.get('rrdkey'):
api_key = get_api_key('rrdkey')
if api_key and api_key == request.GET.get('rrdkey'):
pass
else:
return HttpResponseForbidden()
else:
return HttpResponseForbidden()
data = func_get_wireguard_status()
return JsonResponse(data)
@require_http_methods(["GET"])
def legacy_wireguard_status(request):
user_acl = None
enhanced_filter = False
filter_peer_list = []
if request.user.is_authenticated:
user_acl = get_object_or_404(UserAcl, user=request.user)
if user_acl.enable_enhanced_filter and user_acl.peer_groups.count() > 0: