From ce7f6c6200f5da166069adfd9513ed1e94c23550 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 28 Feb 2025 16:56:13 -0300 Subject: [PATCH] peer invite apis --- api/views.py | 46 +++++++++++++++++++++++++++++++++++- wgwadmlibrary/tools.py | 53 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/api/views.py b/api/views.py index fa44e1d..bb8c63c 100644 --- a/api/views.py +++ b/api/views.py @@ -11,8 +11,9 @@ from django.conf import settings from django.utils import timezone from user_manager.models import UserAcl, AuthenticationToken +from vpn_invite.models import InviteSettings, PeerInvite from wireguard.models import WebadminSettings, Peer, PeerStatus, WireGuardInstance -from wgwadmlibrary.tools import user_allowed_peers, user_has_access_to_peer +from wgwadmlibrary.tools import user_allowed_peers, user_has_access_to_peer, get_peer_invite_data, create_peer_invite import requests import subprocess import datetime @@ -283,3 +284,46 @@ def cron_check_updates(request): return JsonResponse({'update_available': False}) return JsonResponse({'update_available': webadmin_settings.update_available}) + + + +@login_required +def api_peer_invite(request): + user_acl = get_object_or_404(UserAcl, user=request.user) + data = {'status': '', 'message': '', 'invite_data': {}} + peer_invite = PeerInvite.objects.none() + invite_settings = InviteSettings.objects.filter(name='default_settings').first() + if not invite_settings: + data['status'] = 'error' + data['message'] = 'Default settings not found' + return JsonResponse(data, status=400) + + if user_acl.user_level < invite_settings.required_user_level: + data['status'] = 'error' + data['message'] = 'Permission denied' + return JsonResponse(data, status=403) + + if request.GET.get('peer'): + peer = get_object_or_404(Peer, uuid=request.GET.get('peer')) + if not user_has_access_to_peer(user_acl, peer): + data['status'] = 'error' + data['message'] = 'Permission denied' + return JsonResponse(data, status=403) + peer_invite = create_peer_invite(peer, invite_settings) + elif request.GET.get('invite'): + peer_invite = get_object_or_404(PeerInvite, uuid=request.GET.get('invite')) + if request.GET.get('action') == 'refresh': + peer_invite.invite_expiration = timezone.now() + datetime.timedelta(minutes=invite_settings.invite_expiration) + peer_invite.save() + elif request.GET.get('action') == 'delete': + peer_invite.delete() + data['status'] = 'success' + data['message'] = 'Invite deleted' + return JsonResponse(data) + + if peer_invite: + data['status'] = 'success' + data['message'] = '' + data['invite_data'] = get_peer_invite_data(peer_invite) + + return JsonResponse(data, status=200) \ No newline at end of file diff --git a/wgwadmlibrary/tools.py b/wgwadmlibrary/tools.py index 5986f4c..818eebf 100644 --- a/wgwadmlibrary/tools.py +++ b/wgwadmlibrary/tools.py @@ -1,8 +1,12 @@ import ipaddress, re import subprocess +from django.utils import timezone +from datetime import timedelta +from vpn_invite.models import PeerInvite, InviteSettings from wireguard.models import Peer, WireGuardInstance from user_manager.models import UserAcl from django.db.models import Max +import random def user_has_access_to_instance(user_acl: UserAcl, instance: WireGuardInstance): @@ -113,4 +117,51 @@ def check_sort_order_conflict(peer: Peer): peers = Peer.objects.filter(wireguard_instance=peer.wireguard_instance, sort_order=peer.sort_order).exclude(uuid=peer.uuid) if peers.exists(): return True - return False \ No newline at end of file + return False + + + +def create_random_password(length, complexity): + if complexity == 'digits': + characters = '0123456789' + elif complexity == 'letters': + characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' + elif complexity == 'letters_digits': + characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' + else: + characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()' + return ''.join(random.choice(characters) for _ in range(length)) + + + +def replace_message_variables(message: str, peer_invite: PeerInvite, invite_settings: InviteSettings): + # The & at the end is to prevent the token from being concatenated with any other template text. + message = message.replace('{invite_url}', f'{invite_settings.invite_url}?token{peer_invite.uuid}&') + message = message.replace('{expire_minutes}', f'{invite_settings.invite_expiration}') + return message + + +def get_peer_invite_data(peer_invite: PeerInvite, invite_settings: InviteSettings): + data = { + # The & at the end is to prevent the token from being concatenated with any other template text. + 'url': f'{invite_settings.invite_url}?token{peer_invite.uuid}&', + 'password': peer_invite.invite_password, + 'expiration': peer_invite.invite_expiration.isoformat(), + 'email_subject': replace_message_variables(invite_settings.invite_email_subject), + 'email_body': replace_message_variables(invite_settings.invite_email_body), + 'whatsapp_body': replace_message_variables(invite_settings.invite_whatsapp_body), + 'text_body': replace_message_variables(invite_settings.invite_text_body), + } + return data + + +def create_peer_invite(peer, invite_settings): + if invite_settings.enable_random_password or not invite_settings.default_password: + password = create_random_password(invite_settings.random_password_length, invite_settings.random_password_complexity) + else: + password = invite_settings.default_password + + peer_invite = PeerInvite.objects.create( + peer=peer, password=password[32], invite_expiration=timezone.now() + timedelta(minutes=invite_settings.invite_expiration_minutes) + ) + return peer_invite