add function to concatenate WireGuard status cache from master and workers

This commit is contained in:
Eduardo Silva
2026-01-13 10:37:40 -03:00
parent f32dda78a2
commit 9df88b0ee9

View File

@@ -4,7 +4,7 @@ import os
import subprocess
import time
import uuid
from typing import Dict, Any
from typing import Any, Dict
import pytz
import requests
@@ -13,6 +13,7 @@ from django.contrib import auth
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied
from django.db import transaction
from django.http import HttpResponseForbidden
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect
@@ -20,6 +21,7 @@ from django.utils import timezone
from django.views.decorators.http import require_http_methods
from api.models import WireguardStatusCache
from cluster.models import ClusterSettings, WorkerStatus
from user_manager.models import AuthenticationToken, UserAcl
from vpn_invite.models import InviteSettings, PeerInvite
from wgwadmlibrary.tools import create_peer_invite, get_peer_invite_data, send_email, user_allowed_peers, \
@@ -313,6 +315,77 @@ def func_get_wireguard_status(cache_previous: int = 0):
return data
def _latest_handshake_as_int(peer_info) -> int:
try:
return int(peer_info.get("latest-handshakes", 0))
except:
return 0
@transaction.atomic
def func_concatenate_cluster_wireguard_status_cache() -> None:
start_time = time.monotonic()
cache_entries = []
combined_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
master_cache = (
WireguardStatusCache.objects.filter(cache_type="master")
.order_by("-created")
.first()
)
if master_cache and isinstance(master_cache.data, dict):
cache_entries.append({"source_name": "master", "source_uuid": "", "data": master_cache.data,})
worker_statuses = (WorkerStatus.objects.filter(worker__enabled=True).select_related("worker").all())
for ws in worker_statuses:
cache_entries.append(
{
"source_name": f"worker_{ws.worker.name}",
"source_uuid": str(ws.worker.uuid),
"data": ws.wireguard_status or {},
}
)
for entry in cache_entries:
source_name = entry["source_name"]
source_uuid = entry["source_uuid"]
data = entry.get("data") or {}
if not isinstance(data, dict):
continue
for interface, peers in data.items():
if not isinstance(peers, dict):
continue
combined_data.setdefault(interface, {})
for peer_key, peer_info in peers.items():
if not isinstance(peer_info, dict):
continue
peer_info_copy = dict(peer_info)
peer_info_copy["source_name"] = source_name
peer_info_copy["source_uuid"] = source_uuid
if peer_key not in combined_data[interface]:
combined_data[interface][peer_key] = peer_info_copy
continue
existing_peer = combined_data[interface][peer_key]
new_hs = _latest_handshake_as_int(peer_info_copy)
old_hs = _latest_handshake_as_int(existing_peer)
if new_hs > old_hs:
combined_data[interface][peer_key] = peer_info_copy
processing_time_ms = int((time.monotonic() - start_time) * 1000)
WireguardStatusCache.objects.create(data=combined_data, processing_time_ms=processing_time_ms, cache_type="cluster")
return
def cron_refresh_wireguard_status_cache(request):
data = {'status': 'success'}
WireguardStatusCache.objects.filter(created__lt=timezone.now() - timezone.timedelta(seconds=settings.WIREGUARD_STATUS_CACHE_MAX_AGE)).delete()
@@ -324,6 +397,8 @@ def cron_refresh_wireguard_status_cache(request):
end_time = time.monotonic()
processing_time_ms = int((end_time - start_time) * 1000)
WireguardStatusCache.objects.create(data=data, processing_time_ms=processing_time_ms, cache_type='master')
if ClusterSettings.objects.filter(name='cluster_settings', enabled=True).exists():
func_concatenate_cluster_wireguard_status_cache()
return JsonResponse(data)