Files
wireguard_webadmin/api_v2/views_api.py
Eduardo Silva 70ba21ec46 Merge pull request #150 from SecareLupus/main
Expanded API Coverage & Documentation
2026-03-17 14:50:24 -03:00

836 lines
36 KiB
Python

import ipaddress
import json
import re
from functools import wraps
from typing import List, Optional, Tuple
from django.db import transaction
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from api.views import func_get_wireguard_status
from dns.models import DNSSettings, StaticHost
from dns.views import export_dns_configuration
from routing_templates.models import RoutingTemplate
from wireguard.models import Peer, PeerAllowedIP, WireGuardInstance
from wireguard_peer.functions import func_create_new_peer
from wireguard_tools.functions import func_reload_wireguard_interface
from wireguard_tools.views import export_wireguard_configuration
from .models import ApiKey
def api_doc(*, summary: str, auth: str, params: list, returns: list, methods: Optional[List[str]] = None, examples: Optional[dict] = None):
def decorator(view_func):
view_func.api_doc = {
"summary": summary,
"auth": auth,
"params": params,
"returns": returns,
"methods": methods or ["POST"],
"examples": examples or {},
}
@wraps(view_func)
def wrapper(*args, **kwargs):
return view_func(*args, **kwargs)
return wrapper
return decorator
def validate_api_key(request, wireguard_instance: WireGuardInstance | None = None):
"""
Validates the API token and optionally validates access to a given WireGuard instance.
Rules:
- token must exist and be enabled
- if ApiKey.allowed_instances is empty => key can access any instance
- otherwise, wireguard_instance must be included in ApiKey.allowed_instances
Notes:
- If wireguard_instance is None, only validates the token (no instance scoping).
"""
token = request.headers.get("token")
if not token:
return None, "Missing API token."
try:
api_key = ApiKey.objects.get(token=token, enabled=True)
except ApiKey.DoesNotExist:
return None, "Invalid API key."
if wireguard_instance is not None:
if api_key.allowed_instances.exists():
if not api_key.allowed_instances.filter(uuid=wireguard_instance.uuid).exists():
return None, "This API key is not allowed to access the requested instance."
return api_key, ""
def _parse_ipv4_cidrs(value) -> Tuple[Optional[List[Tuple[str, int]]], Optional[str]]:
"""
Parses a list of CIDR strings into [(allowed_ip, netmask), ...].
Example:
["10.0.0.0/24"] => [("10.0.0.0", 24)]
"""
if value is None:
return None, None
if not isinstance(value, list):
return None, "Invalid payload: networks must be a list of CIDR strings."
pairs: List[Tuple[str, int]] = []
for item in value:
if not isinstance(item, str) or not item.strip():
return None, "Invalid payload: each network must be a non-empty string."
try:
network = ipaddress.ip_network(item.strip(), strict=False)
except Exception:
return None, f"Invalid network: {item}"
if network.version != 4:
return None, f"Only IPv4 networks are supported: {item}"
pairs.append((str(network.network_address), int(network.prefixlen)))
# De-duplicate while preserving order
seen = set()
unique: List[Tuple[str, int]] = []
for pair in pairs:
if pair not in seen:
seen.add(pair)
unique.append(pair)
return unique, None
def _sync_allowed_ips(peer: Peer, desired_pairs: Optional[List[Tuple[str, int]]], *, config_file: str) -> None:
"""
Sync PeerAllowedIP rows for a peer/config_file, only for priority >= 1.
- Adds missing (allowed_ip, netmask)
- Removes extra (allowed_ip, netmask)
- Never touches priority=0 entries (peer main address)
"""
if desired_pairs is None:
return
current_qs = PeerAllowedIP.objects.filter(peer=peer, config_file=config_file, priority__gte=1)
current_pairs = set(current_qs.values_list("allowed_ip", "netmask"))
desired_set = set(desired_pairs)
pairs_to_remove = current_pairs - desired_set
pairs_to_add = desired_set - current_pairs
for allowed_ip, netmask in pairs_to_remove:
PeerAllowedIP.objects.filter(
peer=peer,
config_file=config_file,
priority__gte=1,
allowed_ip=allowed_ip,
netmask=netmask,
).delete()
for allowed_ip, netmask in pairs_to_add:
PeerAllowedIP.objects.create(
peer=peer,
config_file=config_file,
priority=1,
allowed_ip=allowed_ip,
netmask=int(netmask),
)
def _apply_reload_or_pending_changes(*, wireguard_instance: WireGuardInstance, skip_reload: bool) -> Tuple[bool, str]:
"""
Applies changes after create/update/delete.
If skip_reload=True:
- sets pending_changes=True
Else:
- exports WireGuard configuration
- reloads the interface
"""
if skip_reload:
wireguard_instance.pending_changes = True
wireguard_instance.save(update_fields=["pending_changes", "updated"])
return True, "Changes saved. Reload skipped (pending_changes set to True)."
export_wireguard_configuration(wireguard_instance)
success, message = func_reload_wireguard_interface(wireguard_instance)
return bool(success), str(message or "")
def _get_wireguard_instance(instance_name: str) -> Optional[WireGuardInstance]:
return
def _validate_dns_hostname(hostname: str) -> Tuple[Optional[str], Optional[str]]:
if not isinstance(hostname, str):
return None, "Invalid hostname."
normalized = hostname.strip().lower()
if not normalized:
return None, "Invalid hostname."
if "://" in normalized or "/" in normalized or ":" in normalized:
return None, "Invalid hostname."
domain = normalized[2:] if normalized.startswith("*.") else normalized
labels = domain.split(".")
if len(labels) < 2:
return None, "Invalid hostname."
for label in labels:
if not label:
return None, "Invalid hostname."
if not re.match(r"^[a-z0-9-]+$", label):
return None, "Invalid hostname."
if label.startswith("-") or label.endswith("-"):
return None, "Invalid hostname."
return normalized, None
@csrf_exempt
@api_doc(
summary="Create / Update / Delete a WireGuard peer (and optionally reload the interface)",
auth="Header token: <ApiKey.token>",
methods=["POST", "PUT", "DELETE"],
params=[
{"name": "instance", "in": "json", "type": "string", "required": True, "example": "wg0",
"description": "Target instance name in the format wg{instance_id} (e.g. wg0, wg1)."},
{"name": "skip_reload", "in": "json", "type": "boolean", "required": False, "example": True,
"description": "If true, does not reload the interface and only sets wireguard_instance.pending_changes=True."},
{"name": "peer_uuid", "in": "json", "type": "string", "required": False,
"description": "Peer UUID used to select the peer for update/delete."},
{"name": "peer_public_key", "in": "json", "type": "string", "required": False,
"description": "Peer public key used to select the peer for update/delete."},
{"name": "routing_template_uuid", "in": "json", "type": "string", "required": False,
"description": "Routing template UUID (optional). Must belong to the same WireGuard instance."},
{"name": "allowed_ip", "in": "json", "type": "string", "required": False,
"example": "10.188.0.15", "description": "Optional peer main IP address. It must be an available IP within the instance subnet. If omitted, the system will automatically assign the next available IP address."},
{"name": "announced_networks", "in": "json", "type": "list[string]", "required": False,
"example": ["10.10.0.0/24"], "description": "Server announced networks (priority>=1). Will be synced."},
{"name": "client_routes", "in": "json", "type": "list[string]", "required": False,
"example": ["192.168.1.0/24"],
"description": "Client routes (priority>=1). Will be synced. Not allowed when allow_peer_custom_routes=True."},
{"name": "public_key", "in": "json", "type": "string", "required": False,
"description": "Peer public key (create/update)."},
{"name": "pre_shared_key", "in": "json", "type": "string", "required": False,
"description": "Peer pre-shared key (create/update)."},
{"name": "private_key", "in": "json", "type": "string", "required": False,
"description": "Peer private key (create/update). Optional."},
{"name": "persistent_keepalive", "in": "json", "type": "integer", "required": False,
"description": "Persistent keepalive (create/update)."},
{"name": "suspended", "in": "json", "type": "boolean", "required": False,
"description": "Suspend/unsuspend a peer (update)."},
{"name": "suspend_reason", "in": "json", "type": "string", "required": False,
"description": "Suspend reason (update). Can be cleared by sending null/empty string."},
],
returns=[
{"status": 200, "body": {"status": "success", "message": "Peer updated successfully.", "peer_uuid": "...", "public_key": "...", "main_addresses": ["..."], "reload": {"success": True, "message": "..."}}},
{"status": 201, "body": {"status": "success", "message": "Peer created successfully.", "peer_uuid": "...", "public_key": "...", "main_addresses": ["..."], "reload": {"success": True, "message": "..."}}},
{"status": 400, "body": {"status": "error", "error_message": "Invalid payload: ..."}},
{"status": 403, "body": {"status": "error", "error_message": "Invalid API key."}},
{"status": 405, "body": {"status": "error", "error_message": "Method not allowed."}},
],
examples={
"create_skip_reload": {
"method": "POST",
"json": {
"instance": "wg0",
"name": "John",
"routing_template_uuid": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"announced_networks": ["10.10.0.0/24"],
"skip_reload": True
}
},
"update_with_reload": {
"method": "PUT",
"json": {
"instance": "wg0",
"peer_uuid": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"suspended": True,
"suspend_reason": "Maintenance window",
"skip_reload": False
}
}
}
)
def api_v2_manage_peer(request):
if request.method not in ("POST", "PUT", "DELETE"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
try:
payload = json.loads(request.body.decode("utf-8")) if request.body else {}
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid JSON body."}, status=400)
try:
wireguard_instance = WireGuardInstance.objects.get(instance_id=int(payload.get("instance",).replace("wg", "")))
except:
wireguard_instance = None
if not wireguard_instance:
return JsonResponse({"status": "error", "error_message": "Invalid or missing WireGuard instance."}, status=400)
api_key, api_error = validate_api_key(request, wireguard_instance)
if not api_key:
return JsonResponse({"status": "error", "error_message": api_error}, status=403)
skip_reload = bool(payload.get("skip_reload", False))
# Routing template (optional) - must belong to the same instance
routing_template_uuid = payload.get("routing_template_uuid")
routing_template = None
if routing_template_uuid:
routing_template = RoutingTemplate.objects.filter(uuid=routing_template_uuid).first()
if not routing_template:
return JsonResponse({"status": "error", "error_message": "Invalid routing_template_uuid."}, status=400)
if routing_template.wireguard_instance_id != wireguard_instance.uuid:
return JsonResponse(
{"status": "error", "error_message": "routing_template_uuid does not belong to the requested instance."},
status=400
)
# Parse networks (only if provided)
announced_pairs, announced_error = _parse_ipv4_cidrs(payload.get("announced_networks"))
if announced_error:
return JsonResponse({"status": "error", "error_message": announced_error}, status=400)
client_route_pairs, client_route_error = _parse_ipv4_cidrs(payload.get("client_routes"))
if client_route_error:
return JsonResponse({"status": "error", "error_message": client_route_error}, status=400)
with transaction.atomic():
# CREATE
if request.method == "POST":
peer_name = payload.get("name", "") or ""
peer_public_key = payload.get("public_key") or None
peer_pre_shared_key = payload.get("pre_shared_key") or None
peer_private_key = payload.get("private_key") or None # optional
peer_persistent_keepalive = payload.get("persistent_keepalive")
if peer_persistent_keepalive is not None:
try:
peer_persistent_keepalive = int(peer_persistent_keepalive)
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid persistent_keepalive."}, status=400)
peer_allowed_ip = payload.get("allowed_ip") or None
create_overrides = {"name": peer_name}
if peer_public_key:
create_overrides["public_key"] = peer_public_key
if peer_pre_shared_key:
create_overrides["pre_shared_key"] = peer_pre_shared_key
if peer_private_key:
create_overrides["private_key"] = peer_private_key
if peer_persistent_keepalive is not None:
create_overrides["persistent_keepalive"] = peer_persistent_keepalive
if peer_allowed_ip:
create_overrides["allowed_ip"] = str(peer_allowed_ip).strip()
if routing_template is not None:
create_overrides["default_routing_template"] = routing_template
created_peer, create_message = func_create_new_peer(wireguard_instance=wireguard_instance, overrides=create_overrides)
if not created_peer:
return JsonResponse({"status": "error", "error_message": create_message or "Error creating peer."}, status=400)
# Enforce allow_peer_custom_routes rule:
# If template allows peer custom routes, client_routes must NOT be provided.
if routing_template is not None and routing_template.allow_peer_custom_routes:
if client_route_pairs is not None and len(client_route_pairs) > 0:
return JsonResponse(
{"status": "error", "error_message": "client_routes is not allowed when routing_template.allow_peer_custom_routes is enabled."},
status=400
)
_sync_allowed_ips(created_peer, announced_pairs, config_file="server")
_sync_allowed_ips(created_peer, client_route_pairs, config_file="client")
reload_success, reload_message = _apply_reload_or_pending_changes(
wireguard_instance=created_peer.wireguard_instance,
skip_reload=skip_reload
)
return JsonResponse(
{
"status": "success",
"message": create_message or "Peer created successfully.",
"peer_uuid": str(created_peer.uuid),
"public_key": created_peer.public_key,
"main_addresses": created_peer.main_addresses,
"reload": {"success": reload_success, "message": reload_message},
},
status=201
)
# UPDATE / DELETE: locate peer by uuid or public_key (explicit variable names)
selector_peer_uuid = payload.get("peer_uuid")
selector_peer_public_key = payload.get("peer_public_key")
peer_for_action = None
if selector_peer_uuid:
peer_for_action = Peer.objects.filter(uuid=selector_peer_uuid, wireguard_instance=wireguard_instance).first()
if not peer_for_action:
return JsonResponse({"status": "error", "error_message": "Peer not found for the provided peer_uuid in this instance."}, status=400)
elif selector_peer_public_key:
peer_for_action = Peer.objects.filter(public_key=selector_peer_public_key, wireguard_instance=wireguard_instance).first()
if not peer_for_action:
return JsonResponse({"status": "error", "error_message": "Peer not found for the provided peer_public_key in this instance."}, status=400)
else:
return JsonResponse({"status": "error", "error_message": "Missing peer selector (peer_uuid or peer_public_key)."}, status=400)
# Determine effective routing template for allow_peer_custom_routes validation
effective_routing_template = routing_template if routing_template is not None else peer_for_action.routing_template
if effective_routing_template is not None and effective_routing_template.allow_peer_custom_routes:
if client_route_pairs is not None and len(client_route_pairs) > 0:
return JsonResponse(
{"status": "error", "error_message": "client_routes is not allowed when routing_template.allow_peer_custom_routes is enabled."},
status=400
)
# UPDATE
if request.method == "PUT":
new_public_key = payload.get("public_key")
new_pre_shared_key = payload.get("pre_shared_key")
new_private_key = payload.get("private_key") # optional
new_persistent_keepalive = payload.get("persistent_keepalive")
new_suspended = payload.get("suspended")
new_suspend_reason = payload.get("suspend_reason") if "suspend_reason" in payload else None
if new_public_key:
peer_for_action.public_key = new_public_key
if new_pre_shared_key:
peer_for_action.pre_shared_key = new_pre_shared_key
if new_private_key:
peer_for_action.private_key = new_private_key
if new_persistent_keepalive is not None:
try:
peer_for_action.persistent_keepalive = int(new_persistent_keepalive)
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid persistent_keepalive."}, status=400)
if routing_template is not None:
peer_for_action.routing_template = routing_template
if new_suspended is not None:
peer_for_action.suspended = bool(new_suspended)
if "suspend_reason" in payload:
peer_for_action.suspend_reason = new_suspend_reason
peer_for_action.save()
_sync_allowed_ips(peer_for_action, announced_pairs, config_file="server")
_sync_allowed_ips(peer_for_action, client_route_pairs, config_file="client")
reload_success, reload_message = _apply_reload_or_pending_changes(
wireguard_instance=peer_for_action.wireguard_instance,
skip_reload=skip_reload
)
return JsonResponse(
{
"status": "success",
"message": "Peer updated successfully.",
"peer_uuid": str(peer_for_action.uuid),
"public_key": peer_for_action.public_key,
"main_addresses": peer_for_action.main_addresses,
"reload": {"success": reload_success, "message": reload_message},
},
status=200
)
# DELETE
deleted_uuid = str(peer_for_action.uuid)
peer_for_action.delete()
reload_success, reload_message = _apply_reload_or_pending_changes(
wireguard_instance=wireguard_instance,
skip_reload=skip_reload
)
return JsonResponse(
{
"status": "success",
"message": "Peer deleted successfully.",
"peer_uuid": deleted_uuid,
"reload": {"success": reload_success, "message": reload_message},
},
status=200
)
@csrf_exempt
@api_doc(
summary="Create / Upsert / Delete a static DNS record identified by hostname",
auth="Header token: <ApiKey.token>",
methods=["POST", "PUT", "DELETE"],
params=[
{"name": "hostname", "in": "json", "type": "string", "required": True,
"description": "DNS hostname to manage (supports wildcard like *.example.com)."},
{"name": "ip_address", "in": "json", "type": "string", "required": False,
"description": "IPv4 address for the hostname record (required for POST/PUT, ignored for DELETE)."},
{"name": "skip_reload", "in": "json", "type": "boolean", "required": False, "example": True,
"description": "If true, does not apply DNS changes immediately and only sets dns_settings.pending_changes=True."},
],
returns=[
{"status": 200, "body": {"status": "success", "message": "DNS record updated successfully.", "hostname": "example.com", "ip_address": "10.0.0.50", "apply": {"success": True, "message": "..."}}},
{"status": 200, "body": {"status": "success", "message": "DNS record deleted successfully.", "hostname": "example.com", "apply": {"success": True, "message": "..."}}},
{"status": 201, "body": {"status": "success", "message": "DNS record created successfully.", "hostname": "example.com", "ip_address": "10.0.0.50", "apply": {"success": True, "message": "..."}}},
{"status": 400, "body": {"status": "error", "error_message": "Invalid hostname."}},
{"status": 403, "body": {"status": "error", "error_message": "Invalid API key."}},
{"status": 404, "body": {"status": "error", "error_message": "DNS record not found for the provided hostname."}},
{"status": 500, "body": {"status": "error", "error_message": "DNS changes were saved but apply failed: ..."}},
{"status": 405, "body": {"status": "error", "error_message": "Method not allowed."}},
],
examples={
"create_skip_reload": {
"method": "POST",
"json": {
"hostname": "app.example.com",
"ip_address": "10.20.30.40",
"skip_reload": True
}
},
"put_upsert_apply": {
"method": "PUT",
"json": {
"hostname": "app.example.com",
"ip_address": "10.20.30.41",
"skip_reload": False
}
},
"delete_skip_reload": {
"method": "DELETE",
"json": {
"hostname": "app.example.com",
"skip_reload": True
}
}
}
)
def api_v2_manage_dns_record(request):
if request.method not in ("POST", "PUT", "DELETE"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
try:
payload = json.loads(request.body.decode("utf-8")) if request.body else {}
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid JSON body."}, status=400)
api_key, api_error = validate_api_key(request)
if not api_key:
return JsonResponse({"status": "error", "error_message": api_error}, status=403)
normalized_hostname, hostname_error = _validate_dns_hostname(payload.get("hostname"))
if hostname_error:
return JsonResponse({"status": "error", "error_message": hostname_error}, status=400)
skip_reload = bool(payload.get("skip_reload", False))
normalized_ip = None
if request.method in ("POST", "PUT"):
raw_ip = payload.get("ip_address")
if not isinstance(raw_ip, str) or not raw_ip.strip():
return JsonResponse({"status": "error", "error_message": "Invalid ip_address."}, status=400)
try:
ip = ipaddress.ip_address(raw_ip.strip())
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid ip_address."}, status=400)
if ip.version != 4:
return JsonResponse({"status": "error", "error_message": "Only IPv4 ip_address is supported."}, status=400)
normalized_ip = str(ip)
export_error = None
def _export_after_commit():
nonlocal export_error
try:
export_dns_configuration()
except Exception as exc:
export_error = str(exc)
with transaction.atomic():
dns_settings, dns_settings_created = DNSSettings.objects.select_for_update().get_or_create(name="dns_settings")
if request.method == "POST":
if StaticHost.objects.filter(hostname=normalized_hostname).exists():
return JsonResponse(
{"status": "error", "error_message": "DNS record already exists for the provided hostname."},
status=400,
)
record = StaticHost.objects.create(hostname=normalized_hostname, ip_address=normalized_ip)
action_message = "DNS record created successfully."
status_code = 201
elif request.method == "PUT":
record = StaticHost.objects.filter(hostname=normalized_hostname).first()
if record:
record.ip_address = normalized_ip
record.save(update_fields=["ip_address", "updated"])
action_message = "DNS record updated successfully."
status_code = 200
else:
record = StaticHost.objects.create(hostname=normalized_hostname, ip_address=normalized_ip)
action_message = "DNS record created successfully."
status_code = 201
else:
record = StaticHost.objects.filter(hostname=normalized_hostname).first()
if not record:
return JsonResponse(
{"status": "error", "error_message": "DNS record not found for the provided hostname."},
status=404,
)
record.delete()
action_message = "DNS record deleted successfully."
status_code = 200
if skip_reload:
dns_settings.pending_changes = True
dns_settings.save(update_fields=["pending_changes", "updated"])
apply_success = True
apply_message = "Changes saved. Apply skipped (pending_changes set to True)."
else:
# Mark pending inside transaction; actual export runs only after commit.
dns_settings.pending_changes = True
dns_settings.save(update_fields=["pending_changes", "updated"])
transaction.on_commit(_export_after_commit)
apply_success = True
apply_message = "DNS configuration applied successfully."
if not skip_reload and export_error:
return JsonResponse(
{
"status": "error",
"error_message": f"DNS changes were saved but apply failed: {export_error}",
},
status=500,
)
response_data = {
"status": "success",
"message": action_message,
"hostname": normalized_hostname,
"apply": {"success": apply_success, "message": apply_message},
}
if request.method in ("POST", "PUT"):
response_data["ip_address"] = normalized_ip
return JsonResponse(response_data, status=status_code)
@csrf_exempt
@api_doc(
summary="List peers for a specific instance (required)",
auth="Header token: <ApiKey.token>",
methods=["POST", "GET"],
params=[
{"name": "instance", "in": "json", "type": "string", "required": True, "example": "wg2",
"description": "Required. Target instance name in the format wg{instance_id} (e.g. wg0, wg1)."},
],
returns=[
{"status": 200, "body": {"status": "success", "instance": "wg2", "peers": [{"uuid": "...", "name": "...", "public_key": "...", "suspended": False, "suspend_reason": "...", "disabled_by_schedule": False, "main_addresses": ["..."]}]}},
{"status": 400, "body": {"status": "error", "error_message": "Invalid or missing WireGuard instance."}},
{"status": 403, "body": {"status": "error", "error_message": "Invalid API key."}},
],
examples={
"list_wg2": {"method": "POST", "json": {"instance": "wg2"}},
}
)
def api_v2_peer_list(request):
if request.method not in ("POST", "GET"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
payload = {}
if request.method == "POST":
try:
payload = json.loads(request.body.decode("utf-8")) if request.body else {}
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid JSON body."}, status=400)
else:
payload = request.GET.dict()
try:
wireguard_instance = WireGuardInstance.objects.get(
instance_id=int(str(payload.get("instance")).replace("wg", ""))
)
except Exception:
wireguard_instance = None
if not wireguard_instance:
return JsonResponse({"status": "error", "error_message": "Invalid or missing WireGuard instance."}, status=400)
api_key, api_error = validate_api_key(request, wireguard_instance=wireguard_instance)
if not api_key:
return JsonResponse({"status": "error", "error_message": api_error}, status=403)
peer_qs = (
Peer.objects
.filter(wireguard_instance=wireguard_instance)
.prefetch_related("peerallowedip_set")
.order_by("sort_order", "name", "public_key")
)
peers = []
for current_peer in peer_qs:
peers.append({
"uuid": str(current_peer.uuid),
"name": current_peer.name or "",
"public_key": current_peer.public_key,
"suspended": bool(current_peer.suspended),
"suspend_reason": current_peer.suspend_reason or "",
"disabled_by_schedule": bool(current_peer.disabled_by_schedule),
"main_addresses": current_peer.main_addresses,
})
return JsonResponse(
{
"status": "success",
"instance": f"wg{wireguard_instance.instance_id}",
"peers": peers,
},
status=200
)
@csrf_exempt
@api_doc(
summary="Peer details for a specific instance (required) by peer_uuid or peer_public_key",
auth="Header token: <ApiKey.token>",
methods=["POST", "GET"],
params=[
{"name": "instance", "in": "json", "type": "string", "required": True, "example": "wg2",
"description": "Required. Target instance name in the format wg{instance_id} (e.g. wg0, wg1)."},
{"name": "peer_uuid", "in": "json", "type": "string", "required": False,
"description": "Peer UUID selector."},
{"name": "peer_public_key", "in": "json", "type": "string", "required": False,
"description": "Peer public key selector."},
],
returns=[
{"status": 200, "body": {"status": "success", "peer": {"uuid": "...", "name": "...", "public_key": "...", "pre_shared_key": "...", "private_key": "...", "persistent_keepalive": 25, "routing_template_uuid": "...", "suspended": False, "suspend_reason": "...", "disabled_by_schedule": False, "enabled": True, "main_addresses": ["..."], "announced_networks": ["..."], "client_routes": ["..."], "instance": "...", "instance_uuid": "..."}}},
{"status": 400, "body": {"status": "error", "error_message": "Missing peer selector (peer_uuid or peer_public_key)."}},
{"status": 404, "body": {"status": "error", "error_message": "Peer not found."}},
{"status": 403, "body": {"status": "error", "error_message": "Invalid API key."}},
],
examples={
"detail_by_uuid": {"method": "POST", "json": {"instance": "wg2", "peer_uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}},
"detail_by_public_key": {"method": "POST", "json": {"instance": "wg2", "peer_public_key": "BASE64PUBLICKEY..."}},
}
)
def api_v2_peer_detail(request):
if request.method not in ("POST", "GET"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
payload = {}
if request.method == "POST":
try:
payload = json.loads(request.body.decode("utf-8")) if request.body else {}
except Exception:
return JsonResponse({"status": "error", "error_message": "Invalid JSON body."}, status=400)
else:
payload = request.GET.dict()
try:
wireguard_instance = WireGuardInstance.objects.get(
instance_id=int(str(payload.get("instance")).replace("wg", ""))
)
except Exception:
wireguard_instance = None
if not wireguard_instance:
return JsonResponse({"status": "error", "error_message": "Invalid or missing WireGuard instance."}, status=400)
api_key, api_error = validate_api_key(request, wireguard_instance=wireguard_instance)
if not api_key:
return JsonResponse({"status": "error", "error_message": api_error}, status=403)
selector_peer_uuid = payload.get("peer_uuid")
selector_peer_public_key = payload.get("peer_public_key")
if not selector_peer_uuid and not selector_peer_public_key:
return JsonResponse(
{"status": "error", "error_message": "Missing peer selector (peer_uuid or peer_public_key)."},
status=400,
)
peer_qs = (
Peer.objects
.filter(wireguard_instance=wireguard_instance)
.select_related("routing_template", "wireguard_instance")
.prefetch_related("peerallowedip_set")
)
if selector_peer_uuid:
current_peer = peer_qs.filter(uuid=selector_peer_uuid).first()
else:
current_peer = peer_qs.filter(public_key=selector_peer_public_key).first()
if not current_peer:
return JsonResponse({"status": "error", "error_message": "Peer not found."}, status=404)
peer_data = {
"uuid": str(current_peer.uuid),
"name": current_peer.name or "",
"public_key": current_peer.public_key,
"pre_shared_key": current_peer.pre_shared_key,
"private_key": current_peer.private_key or "",
"persistent_keepalive": int(current_peer.persistent_keepalive),
"routing_template_uuid": str(current_peer.routing_template.uuid) if current_peer.routing_template else "",
"suspended": bool(current_peer.suspended),
"suspend_reason": current_peer.suspend_reason or "",
"disabled_by_schedule": bool(current_peer.disabled_by_schedule),
"enabled": bool(current_peer.enabled),
"main_addresses": current_peer.main_addresses,
"announced_networks": current_peer.announced_networks,
"client_routes": current_peer.client_routes,
"instance": f"wg{wireguard_instance.instance_id}",
"instance_uuid": str(wireguard_instance.uuid),
}
return JsonResponse({"status": "success", "peer": peer_data}, status=200)
@csrf_exempt
@api_doc(
summary="Get WireGuard status (dump) for all interfaces/peers",
auth="Header token: <ApiKey.token>",
methods=["POST", "GET"],
params=[],
returns=[
{"status": 200, "body": {"status": "success", "message": "...", "wg0": { "...": "..." }, "cache_information": { "..." }}},
{"status": 403, "body": {"status": "error", "error_message": "Invalid API key."}},
{"status": 405, "body": {"status": "error", "error_message": "Method not allowed."}},
],
examples={
"get_latest_status": {
"method": "GET",
"json": {}
}
}
)
def api_v2_wireguard_status(request):
if request.method not in ("POST", "GET"):
return JsonResponse({"status": "error", "error_message": "Method not allowed."}, status=405)
api_key, api_error = validate_api_key(request)
if not api_key:
return JsonResponse({"status": "error", "error_message": api_error}, status=403)
data = func_get_wireguard_status()
return JsonResponse(data)