import base64 import datetime import os import subprocess import uuid import pytz import requests from django.conf import settings from django.contrib import auth from django.contrib.auth.decorators import login_required from django.contrib.auth.models import User from django.core.exceptions import PermissionDenied from django.http import HttpResponseForbidden from django.http import JsonResponse from django.shortcuts import get_object_or_404, redirect from django.utils import timezone from django.views.decorators.http import require_http_methods from user_manager.models import AuthenticationToken, UserAcl from vpn_invite.models import InviteSettings, PeerInvite from wgwadmlibrary.tools import create_peer_invite, get_peer_invite_data, send_email, user_allowed_peers, \ user_has_access_to_peer from wireguard.models import Peer, PeerStatus, WebadminSettings, WireGuardInstance def get_api_key(api_name): api_key = None if api_name == 'api': api_file_path = '/etc/wireguard/api_key' elif api_name == 'routerfleet': api_file_path = '/etc/wireguard/routerfleet_key' elif api_name == 'rrdkey': api_file_path = '/app_secrets/rrdtool_key' else: return api_key if os.path.exists(api_file_path) and os.path.isfile(api_file_path): with open(api_file_path, 'r') as api_file: api_file_content = api_file.read().strip() try: uuid_test = uuid.UUID(api_file_content) if str(uuid_test) == api_file_content: api_key = str(uuid_test) except: pass return api_key def routerfleet_authenticate_session(request): AuthenticationToken.objects.filter(created__lt=timezone.now() - timezone.timedelta(minutes=1)).delete() authentication_token = get_object_or_404(AuthenticationToken, uuid=request.GET.get('token')) auth.login(request, authentication_token.user) authentication_token.delete() return redirect('/') @require_http_methods(["GET"]) def routerfleet_get_user_token(request): data = {'status': '', 'message': '', 'authentication_token': ''} if request.GET.get('key'): api_key = get_api_key('routerfleet') if api_key and api_key == request.GET.get('key'): pass else: return HttpResponseForbidden() else: return HttpResponseForbidden() try: default_user_level = int(request.GET.get('default_user_level')) if default_user_level not in [10, 20, 30, 40, 50]: default_user_level = 0 except: default_user_level = 0 if request.GET.get('username'): user = User.objects.filter(username=request.GET.get('username')).first() if request.GET.get('action') == 'test': if UserAcl.objects.filter(user=user, user_level__gte=50).exists(): data['status'] = 'success' data['message'] = 'User exists and is an administrator' else: data['status'] = 'error' data['message'] = f'Administrator with username {request.GET.get("username")} not found at wireguard_webadmin.' elif request.GET.get('action') == 'login': if user: user_acl = UserAcl.objects.filter(user=user).first() else: if default_user_level == 0: data['status'] = 'error' data['message'] = 'User not found' else: user = User.objects.create_user(username=request.GET.get('username'), password=str(uuid.uuid4())) user_acl = UserAcl.objects.create(user=user, user_level=default_user_level) if user and user_acl: authentication_token = AuthenticationToken.objects.create(user=user) data['status'] = 'success' data['message'] = 'User authenticated successfully' data['authentication_token'] = str(authentication_token.uuid) else: data['status'] = 'error' data['message'] = 'Invalid action' else: data['status'] = 'error' data['message'] = 'No username provided' if data['status'] == 'error': return JsonResponse(data, status=400) else: return JsonResponse(data) @login_required def peer_info(request): peer = get_object_or_404(Peer, uuid=request.GET.get('uuid')) user_acl = get_object_or_404(UserAcl, user=request.user) if not user_has_access_to_peer(user_acl, peer): raise PermissionDenied data = { 'name': str(peer), 'public_key': str(peer.public_key), 'uuid': str(peer.uuid), } return JsonResponse(data) @require_http_methods(["GET"]) def api_peer_list(request): if request.GET.get('key'): api_key = get_api_key('api') if api_key and api_key == request.GET.get('key'): pass else: return HttpResponseForbidden() else: return HttpResponseForbidden() data = {} requested_instance = request.GET.get('instance', 'all') if requested_instance == 'all': peer_list = Peer.objects.all() else: peer_list = Peer.objects.filter(wireguard_instance__instance_id=requested_instance.replace('wg', '')) for peer in peer_list: peer_allowed_ips = [] for allowed_ip in peer.peerallowedip_set.all().filter(config_file='server'): peer_allowed_ips.append( { 'ip_address': allowed_ip.allowed_ip, 'priority': allowed_ip.priority, 'netmask': allowed_ip.netmask } ) if f'wg{peer.wireguard_instance.instance_id}' not in data: data[f'wg{peer.wireguard_instance.instance_id}'] = {'peers': []} data[f'wg{peer.wireguard_instance.instance_id}']['peers'].append({ 'name': str(peer), 'public_key': str(peer.public_key), 'uuid': str(peer.uuid), 'rrd_filename' : base64.urlsafe_b64encode(peer.public_key.encode()).decode().replace('=', '') + '.rrd', 'last_handshake': peer.peerstatus.last_handshake.isoformat() if hasattr(peer, 'peerstatus') and peer.peerstatus.last_handshake else '', 'allowed_ips': peer_allowed_ips, }) return JsonResponse(data) @require_http_methods(["GET"]) def api_instance_info(request): if request.GET.get('key'): api_key = get_api_key('api') if api_key and api_key == request.GET.get('key'): pass else: return HttpResponseForbidden() else: return HttpResponseForbidden() data = {} requested_instance = request.GET.get('instance', 'all') if requested_instance == 'all': instances = WireGuardInstance.objects.all() else: instances = WireGuardInstance.objects.filter(instance_id=requested_instance.replace('wg', '')) for instance in instances: data[f'wg{instance.instance_id}'] = { 'name': instance.name, 'instance_id': f'wg{instance.instance_id}', 'public_key': instance.public_key, 'listen_port': instance.listen_port, 'hostname': instance.hostname, 'address': instance.address, 'netmask': instance.netmask, 'peer_list_refresh_interval': instance.peer_list_refresh_interval, 'dns_primary': instance.dns_primary if instance.dns_primary else '', 'dns_secondary': instance.dns_secondary if instance.dns_secondary else '', 'uuid': str(instance.uuid), } return JsonResponse(data) @require_http_methods(["GET"]) def wireguard_status(request): user_acl = None enhanced_filter = False filter_peer_list = [] if request.user.is_authenticated: user_acl = get_object_or_404(UserAcl, user=request.user) if user_acl.enable_enhanced_filter and user_acl.peer_groups.count() > 0: enhanced_filter = True elif request.GET.get('key'): api_key = get_api_key('api') if api_key and api_key == request.GET.get('key'): pass else: return HttpResponseForbidden() elif request.GET.get('rrdkey'): api_key = get_api_key('rrdkey') if api_key and api_key == request.GET.get('rrdkey'): pass else: return HttpResponseForbidden() else: return HttpResponseForbidden() if enhanced_filter: for server_instance in WireGuardInstance.objects.all(): for peer in user_allowed_peers(user_acl, server_instance): if peer.public_key not in filter_peer_list: filter_peer_list.append(peer.public_key) commands = { 'latest-handshakes': "wg show all latest-handshakes | expand | tr -s ' '", 'allowed-ips': "wg show all allowed-ips | expand | tr -s ' '", 'transfer': "wg show all transfer | expand | tr -s ' '", 'endpoints': "wg show all endpoints | expand | tr -s ' '", } output = {} for key, command in commands.items(): process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = process.communicate() if process.returncode != 0: return JsonResponse({'error': stderr}, status=400) current_interface = None for line in stdout.strip().split('\n'): parts = line.split() if len(parts) >= 3: interface, peer, value = parts[0], parts[1], " ".join(parts[2:]) current_interface = interface elif len(parts) == 2 and current_interface: peer, value = parts else: continue if interface not in output: output[interface] = {} if enhanced_filter and peer not in filter_peer_list: continue if peer not in output[interface]: output[interface][peer] = { 'allowed-ips': [], 'latest-handshakes': '', 'transfer': {'tx': 0, 'rx': 0}, 'endpoints': '', } if key == 'allowed-ips': output[interface][peer]['allowed-ips'].append(value) elif key == 'transfer': rx, tx = value.split()[-2:] output[interface][peer]['transfer'] = {'tx': int(tx), 'rx': int(rx)} elif key == 'endpoints': output[interface][peer]['endpoints'] = value else: output[interface][peer][key] = value return JsonResponse(output) @require_http_methods(["GET"]) def cron_update_peer_latest_handshake(request): command = "wg show all latest-handshakes | expand | tr -s ' '" process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = process.communicate() if process.returncode != 0: return JsonResponse({'error': stderr}, status=400) #debug_information = [] for line in stdout.strip().split('\n'): parts = line.split() if len(parts) < 3: continue interface, peer_public_key, latest_handshake = parts[0], parts[1], parts[2] latest_handshake_timestamp = int(latest_handshake) if latest_handshake_timestamp > 0: last_handshake_time = datetime.datetime.fromtimestamp(latest_handshake_timestamp, tz=pytz.utc) #debug_information.append(f'Last handshake for {peer_public_key} is {last_handshake_time}') peer = Peer.objects.filter(public_key=peer_public_key).first() if peer: #debug_information.append(f'Peer found: {peer.public_key}') peer_status, created = PeerStatus.objects.get_or_create( peer=peer, defaults={'last_handshake': last_handshake_time} ) if not created: if peer_status.last_handshake != last_handshake_time: #debug_information.append(f'Updating last_handshake for {peer.public_key} to {last_handshake_time}') peer_status.last_handshake = last_handshake_time peer_status.save() #else: # debug_information.append(f'No changes for {peer.public_key}') return JsonResponse({'status': 'success'}) def cron_check_updates(request): webadmin_settings, webadmin_settings_created = WebadminSettings.objects.get_or_create(name='webadmin_settings') if webadmin_settings.last_checked is None or timezone.now() > (webadmin_settings.last_checked + datetime.timedelta(hours=1)): try: version = settings.WIREGUARD_WEBADMIN_VERSION / 10000 url = f'https://updates.eth0.com.br/api/check_updates/?app=wireguard_webadmin&version={version}' response = requests.get(url) response.raise_for_status() data = response.json() if 'update_available' in data: webadmin_settings.update_available = data['update_available'] if data['update_available']: webadmin_settings.latest_version = float(data['current_version']) * 10000 webadmin_settings.last_checked = timezone.now() webadmin_settings.save() response_data = { 'update_available': webadmin_settings.update_available, 'latest_version': webadmin_settings.latest_version, 'current_version': settings.WIREGUARD_WEBADMIN_VERSION, } return JsonResponse(response_data) except Exception as e: webadmin_settings.update_available = False webadmin_settings.save() return JsonResponse({'update_available': False}) return JsonResponse({'update_available': webadmin_settings.update_available}) @login_required def api_peer_invite(request): PeerInvite.objects.filter(invite_expiration__lt=timezone.now()).delete() user_acl = get_object_or_404(UserAcl, user=request.user) invite_settings = InviteSettings.objects.filter(name='default_settings').first() peer_invite = PeerInvite.objects.none() if not invite_settings: data = {'status': 'error', 'message': 'VPN Invite not configured'} return JsonResponse(data, status=400) data = { 'status': '', 'message': '', 'invite_data': {}, 'whatsapp_enabled': invite_settings.invite_whatsapp_enabled, 'email_enabled': invite_settings.invite_email_enabled, } if user_acl.user_level < invite_settings.required_user_level: data['status'] = 'error' data['message'] = 'Permission denied' return JsonResponse(data, status=403) if request.GET.get('peer'): peer = get_object_or_404(Peer, uuid=request.GET.get('peer')) if not user_has_access_to_peer(user_acl, peer): data['status'] = 'error' data['message'] = 'Permission denied' return JsonResponse(data, status=403) peer_invite = PeerInvite.objects.filter(peer=peer).first() if not peer_invite: peer_invite = create_peer_invite(peer, invite_settings) elif request.GET.get('invite'): peer_invite = get_object_or_404(PeerInvite, uuid=request.GET.get('invite')) if request.GET.get('action') == 'refresh': peer_invite.invite_expiration = timezone.now() + datetime.timedelta(minutes=invite_settings.invite_expiration) peer_invite.save() elif request.GET.get('action') == 'delete': peer_invite.delete() data['status'] = 'success' data['message'] = 'Invite deleted' return JsonResponse(data) if peer_invite: data['status'] = 'success' data['message'] = '' data['invite_data'] = get_peer_invite_data(peer_invite, invite_settings) if request.GET.get('action') == 'email': data['status'], data['message'] = send_email(request.GET.get('address'), data['invite_data']['email_subject'], data['invite_data']['email_body']) if data['status'] == 'success': return JsonResponse(data) else: return JsonResponse(data, status=400) else: if request.GET.get('action') == 'email': data['status'] = 'error' data['message'] = 'Invite not found' return JsonResponse(data) return JsonResponse(data, status=200)