Add API endpoints for listing peers and retrieving peer details

This commit is contained in:
Eduardo Silva
2026-02-11 16:05:20 -03:00
parent dc7fee2de8
commit 8c6a5262d3
2 changed files with 187 additions and 15 deletions

View File

@@ -1,8 +1,9 @@
from django.urls import path
from .views_api import api_v2_manage_peer
from .views_api import api_v2_manage_peer, api_v2_peer_list, api_v2_peer_detail
urlpatterns = [
path('manage_peer/', api_v2_manage_peer, name='api_v2_manage_peer'),
path('peer_list/', api_v2_peer_list, name='api_v2_peer_list'),
path('peer_detail/', api_v2_peer_detail, name='api_v2_peer_detail'),
]

View File

@@ -29,20 +29,21 @@ def api_doc(*, summary: str, auth: str, params: list, returns: list, examples: O
def wrapper(*args, **kwargs):
return view_func(*args, **kwargs)
wrapper.__dict__.update(getattr(view_func, "__dict__", {}))
wrapper.api_doc = view_func.api_doc
return wrapper
return decorator
def validate_api_key(request, wireguard_instance: WireGuardInstance):
def validate_api_key(request, wireguard_instance: WireGuardInstance | None = None):
"""
Validates the API key and checks whether it can manage the given instance.
Validates the API token and optionally validates access to a given WireGuard instance.
Rule:
- If ApiKey.allowed_instances is empty => key can manage any instance.
- Otherwise, wireguard_instance must be included in ApiKey.allowed_instances.
Rules:
- token must exist and be enabled
- if ApiKey.allowed_instances is empty => key can access any instance
- otherwise, wireguard_instance must be included in ApiKey.allowed_instances
Notes:
- If wireguard_instance is None, only validates the token (no instance scoping).
"""
token = request.headers.get("token")
if not token:
@@ -53,13 +54,13 @@ def validate_api_key(request, wireguard_instance: WireGuardInstance):
except ApiKey.DoesNotExist:
return None, "Invalid API key."
if api_key.allowed_instances.exists():
if not api_key.allowed_instances.filter(uuid=wireguard_instance.uuid).exists():
return None, "This API key is not allowed to manage the requested instance."
if wireguard_instance is not None:
if api_key.allowed_instances.exists():
if not api_key.allowed_instances.filter(uuid=wireguard_instance.uuid).exists():
return None, "This API key is not allowed to access the requested instance."
return api_key, ""
def _parse_ipv4_cidrs(value) -> Tuple[Optional[List[Tuple[str, int]]], Optional[str]]:
"""
Parses a list of CIDR strings into [(allowed_ip, netmask), ...].
@@ -161,6 +162,7 @@ def _get_wireguard_instance(instance_name: str) -> Optional[WireGuardInstance]:
return
@csrf_exempt
@api_doc(
summary="Create / Update / Delete a WireGuard peer (and optionally reload the interface)",
auth="Header token: <ApiKey.token>",
@@ -227,7 +229,6 @@ def _get_wireguard_instance(instance_name: str) -> Optional[WireGuardInstance]:
}
}
)
@csrf_exempt
def api_v2_manage_peer(request):
if request.method not in ("POST", "PUT", "DELETE"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
@@ -443,3 +444,173 @@ def api_v2_manage_peer(request):
},
status=200
)
@csrf_exempt
@api_doc(
summary="List peers for a specific instance (required)",
auth="Header token: <ApiKey.token>",
params=[
{"name": "instance", "in": "json", "type": "string", "required": True, "example": "wg2",
"description": "Required. Target instance name in the format wg{instance_id} (e.g. wg0, wg1)."},
],
returns=[
{"status": 200, "body": {"status": "success", "instance": "wg2", "peers": [{"uuid": "...", "public_key": "..."}]}},
{"status": 400, "body": {"status": "error", "error_message": "Invalid or missing WireGuard instance."}},
{"status": 403, "body": {"status": "error", "error_message": "Invalid API key."}},
],
examples={
"list_wg2": {"method": "POST", "json": {"instance": "wg2"}},
}
)
def api_v2_peer_list(request):
if request.method not in ("POST", "GET"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
payload = {}
if request.method == "POST":
try:
payload = json.loads(request.body.decode("utf-8")) if request.body else {}
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid JSON body."}, status=400)
else:
payload = request.GET.dict()
try:
wireguard_instance = WireGuardInstance.objects.get(
instance_id=int(str(payload.get("instance")).replace("wg", ""))
)
except Exception:
wireguard_instance = None
if not wireguard_instance:
return JsonResponse({"status": "error", "error_message": "Invalid or missing WireGuard instance."}, status=400)
api_key, api_error = validate_api_key(request, wireguard_instance=wireguard_instance)
if not api_key:
return JsonResponse({"status": "error", "error_message": api_error}, status=403)
peer_qs = (
Peer.objects
.filter(wireguard_instance=wireguard_instance)
.prefetch_related("peerallowedip_set")
.order_by("sort_order", "name", "public_key")
)
peers = []
for current_peer in peer_qs:
peers.append({
"uuid": str(current_peer.uuid),
"name": current_peer.name or "",
"public_key": current_peer.public_key,
"suspended": bool(current_peer.suspended),
"suspend_reason": current_peer.suspend_reason or "",
"disabled_by_schedule": bool(current_peer.disabled_by_schedule),
"main_addresses": current_peer.main_addresses,
})
return JsonResponse(
{
"status": "success",
"instance": f"wg{wireguard_instance.instance_id}",
"peers": peers,
},
status=200
)
@csrf_exempt
@api_doc(
summary="Peer details for a specific instance (required) by peer_uuid or peer_public_key",
auth="Header token: <ApiKey.token>",
params=[
{"name": "instance", "in": "json", "type": "string", "required": True, "example": "wg2",
"description": "Required. Target instance name in the format wg{instance_id} (e.g. wg0, wg1)."},
{"name": "peer_uuid", "in": "json", "type": "string", "required": False,
"description": "Peer UUID selector."},
{"name": "peer_public_key", "in": "json", "type": "string", "required": False,
"description": "Peer public key selector."},
],
returns=[
{"status": 200, "body": {"status": "success", "peer": {"uuid": "...", "name": "...", "public_key": "..."}}},
{"status": 400, "body": {"status": "error", "error_message": "Missing peer selector (peer_uuid or peer_public_key)."}},
{"status": 404, "body": {"status": "error", "error_message": "Peer not found."}},
{"status": 403, "body": {"status": "error", "error_message": "Invalid API key."}},
],
examples={
"detail_by_uuid": {"method": "POST", "json": {"instance": "wg2", "peer_uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}},
"detail_by_public_key": {"method": "POST", "json": {"instance": "wg2", "peer_public_key": "BASE64PUBLICKEY..."}},
}
)
def api_v2_peer_detail(request):
if request.method not in ("POST", "GET"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
payload = {}
if request.method == "POST":
try:
payload = json.loads(request.body.decode("utf-8")) if request.body else {}
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid JSON body."}, status=400)
else:
payload = request.GET.dict()
try:
wireguard_instance = WireGuardInstance.objects.get(
instance_id=int(str(payload.get("instance")).replace("wg", ""))
)
except Exception:
wireguard_instance = None
if not wireguard_instance:
return JsonResponse({"status": "error", "error_message": "Invalid or missing WireGuard instance."}, status=400)
api_key, api_error = validate_api_key(request, wireguard_instance=wireguard_instance)
if not api_key:
return JsonResponse({"status": "error", "error_message": api_error}, status=403)
selector_peer_uuid = payload.get("peer_uuid")
selector_peer_public_key = payload.get("peer_public_key")
if not selector_peer_uuid and not selector_peer_public_key:
return JsonResponse(
{"status": "error", "error_message": "Missing peer selector (peer_uuid or peer_public_key)."},
status=400,
)
peer_qs = (
Peer.objects
.filter(wireguard_instance=wireguard_instance)
.select_related("routing_template", "wireguard_instance")
.prefetch_related("peerallowedip_set")
)
if selector_peer_uuid:
current_peer = peer_qs.filter(uuid=selector_peer_uuid).first()
else:
current_peer = peer_qs.filter(public_key=selector_peer_public_key).first()
if not current_peer:
return JsonResponse({"status": "error", "error_message": "Peer not found."}, status=404)
peer_data = {
"uuid": str(current_peer.uuid),
"name": current_peer.name or "",
"public_key": current_peer.public_key,
"pre_shared_key": current_peer.pre_shared_key,
"private_key": current_peer.private_key or "",
"persistent_keepalive": int(current_peer.persistent_keepalive),
"routing_template_uuid": str(current_peer.routing_template.uuid) if current_peer.routing_template else "",
"suspended": bool(current_peer.suspended),
"suspend_reason": current_peer.suspend_reason or "",
"disabled_by_schedule": bool(current_peer.disabled_by_schedule),
"enabled": bool(current_peer.enabled),
"main_addresses": current_peer.main_addresses,
"announced_networks": current_peer.announced_networks,
"client_routes": current_peer.client_routes,
"instance": f"wg{wireguard_instance.instance_id}",
"instance_uuid": str(wireguard_instance.uuid),
}
return JsonResponse({"status": "success", "peer": peer_data}, status=200)