2025-04-15 14:22:40 -03:00

256 lines
12 KiB
Python

import ipaddress
import subprocess
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.http import Http404
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.translation import gettext_lazy as _
from user_manager.models import UserAcl
from wgwadmlibrary.tools import check_sort_order_conflict, deduplicate_sort_order, default_sort_peers, \
user_allowed_instances, user_allowed_peers, user_has_access_to_instance, user_has_access_to_peer
from wireguard.models import Peer, PeerAllowedIP, WireGuardInstance
from wireguard_peer.forms import PeerAllowedIPForm, PeerForm
def generate_peer_default(wireguard_instance):
private_key = subprocess.check_output('wg genkey', shell=True).decode('utf-8').strip()
public_key = subprocess.check_output(f'echo {private_key} | wg pubkey', shell=True).decode('utf-8').strip()
pre_shared_key = subprocess.check_output('wg genpsk', shell=True).decode('utf-8').strip()
address = wireguard_instance.address
netmask = wireguard_instance.netmask
cidr_network = f"{address}/{netmask}"
network = ipaddress.ip_network(cidr_network, strict=False)
# the code below can be an issue for larger networks, for now it's fine, but it should be optimized in the future
used_ips = set(WireGuardInstance.objects.all().values_list('address', flat=True)) | \
set(PeerAllowedIP.objects.filter(config_file='server', priority=0).values_list('allowed_ip', flat=True))
free_ip_address = None
for ip in network.hosts():
if str(ip) not in used_ips:
free_ip_address = str(ip)
break
return {
'name': '',
'public_key': public_key,
'pre_shared_key': pre_shared_key,
'persistent_keepalive': 25,
'private_key': private_key,
'wireguard_instance': wireguard_instance,
'allowed_ip': free_ip_address,
}
@login_required
def view_wireguard_peer_list(request):
page_title = _('WireGuard Peer List')
user_acl = get_object_or_404(UserAcl, user=request.user)
wireguard_instances = user_allowed_instances(user_acl)
if wireguard_instances:
if request.GET.get('uuid'):
current_instance = get_object_or_404(WireGuardInstance, uuid=request.GET.get('uuid'))
else:
current_instance = wireguard_instances.first()
if current_instance not in wireguard_instances:
raise Http404
default_sort_peers(current_instance)
peer_list = user_allowed_peers(user_acl, current_instance)
else:
current_instance = None
peer_list = None
add_peer_enabled = False
if current_instance:
if user_has_access_to_instance(user_acl, current_instance):
add_peer_enabled = True
context = {'page_title': page_title, 'wireguard_instances': wireguard_instances, 'current_instance': current_instance, 'peer_list': peer_list, 'add_peer_enabled': add_peer_enabled, 'user_acl': user_acl}
return render(request, 'wireguard/wireguard_peer_list.html', context)
@login_required
def view_wireguard_peer_sort(request):
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=30).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
# check if the current sort order is duplicated with another peer
if check_sort_order_conflict(peer):
deduplicate_sort_order(peer.wireguard_instance)
redirect_url = f'/peer/list/?uuid={peer.wireguard_instance.uuid}#peer-{peer.public_key}'
direction = request.GET.get('direction')
sort_order_changed = False
if direction == 'up':
previous_peer = Peer.objects.filter(wireguard_instance=peer.wireguard_instance, sort_order__lt=peer.sort_order).order_by('-sort_order').first()
if previous_peer:
peer.sort_order, previous_peer.sort_order = previous_peer.sort_order, peer.sort_order
peer.save()
previous_peer.save()
sort_order_changed = True
else:
messages.warning(request, 'Cannot move peer up|Peer is already at the top.')
elif direction == 'down':
next_peer = Peer.objects.filter(wireguard_instance=peer.wireguard_instance, sort_order__gt=peer.sort_order).order_by('sort_order').first()
if next_peer:
peer.sort_order, next_peer.sort_order = next_peer.sort_order, peer.sort_order
peer.save()
next_peer.save()
sort_order_changed = True
else:
messages.warning(request, 'Cannot move peer down|Peer is already at the bottom.')
if sort_order_changed:
# check if the new sort order is duplicated with another peer
if check_sort_order_conflict(peer):
deduplicate_sort_order(peer.wireguard_instance)
return redirect(redirect_url)
@login_required
def view_wireguard_peer_manage(request):
if request.method == 'POST':
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=30).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
else:
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=20).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
user_acl = get_object_or_404(UserAcl, user=request.user)
if request.GET.get('instance'):
current_instance = get_object_or_404(WireGuardInstance, uuid=request.GET.get('instance'))
if not user_has_access_to_instance(user_acl, current_instance):
raise Http404
current_peer = None
page_title = _('Create a new Peer for instance wg') + str(current_instance.instance_id)
new_peer_data = generate_peer_default(current_instance)
if new_peer_data['allowed_ip']:
new_peer = Peer.objects.create(
name=new_peer_data['name'],
public_key=new_peer_data['public_key'],
pre_shared_key=new_peer_data['pre_shared_key'],
persistent_keepalive=new_peer_data['persistent_keepalive'],
private_key=new_peer_data['private_key'],
wireguard_instance=current_instance,
)
PeerAllowedIP.objects.create(
config_file='server',
peer=new_peer,
allowed_ip=new_peer_data['allowed_ip'],
priority=0,
netmask=32,
)
messages.success(request, _('Peer created|Peer created successfully.'))
new_peer.wireguard_instance.pending_changes = True
new_peer.wireguard_instance.save()
return redirect('/peer/manage/?peer=' + str(new_peer.uuid))
else:
messages.warning(request, _('Error creating peer|No available IP address found for peer creation.'))
return redirect('/peer/list/')
elif request.GET.get('peer'):
current_peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
if not user_has_access_to_peer(user_acl, current_peer):
raise Http404
current_instance = current_peer.wireguard_instance
if request.GET.get('action') == 'delete':
if request.GET.get('confirmation') == 'delete':
current_peer.wireguard_instance.pending_changes = True
current_peer.wireguard_instance.save()
current_peer.delete()
messages.success(request, _('Peer deleted|Peer deleted successfully.'))
return redirect('/peer/list/?uuid=' + str(current_instance.uuid))
else:
messages.warning(request, _('Error deleting peer|Invalid confirmation message. Type "delete" to confirm.'))
return redirect('/peer/manage/?peer=' + str(current_peer.uuid))
page_title = _('Update Peer: ')
peer_ip_list = current_peer.peerallowedip_set.filter(config_file='server').order_by('priority')
peer_client_ip_list = current_peer.peerallowedip_set.filter(config_file='client').order_by('priority')
if current_peer.name:
page_title += current_peer.name
else:
page_title += current_peer.public_key[:16] + ("..." if len(current_peer.public_key) > 16 else "")
if request.method == 'POST':
form = PeerForm(request.POST, instance=current_peer)
if form.is_valid():
form.save()
messages.success(request, _('Peer updated|Peer updated successfully.'))
current_peer.wireguard_instance.pending_changes = True
current_peer.wireguard_instance.save()
return redirect('/peer/list/?uuid=' + str(current_peer.wireguard_instance.uuid))
else:
form = PeerForm(instance=current_peer)
else:
return redirect('/peer/list/')
context = {
'page_title': page_title, 'current_instance': current_instance, 'current_peer': current_peer, 'form': form,
'peer_ip_list': peer_ip_list, 'peer_client_ip_list': peer_client_ip_list
}
return render(request, 'wireguard/wireguard_manage_peer.html', context)
def view_manage_ip_address(request):
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=30).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
user_acl = get_object_or_404(UserAcl, user=request.user)
config_file = request.GET.get('config', 'server')
if request.GET.get('peer'):
current_peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
current_ip = None
if not user_has_access_to_peer(user_acl, current_peer):
raise Http404
elif request.GET.get('ip'):
current_ip = get_object_or_404(PeerAllowedIP, uuid=request.GET.get('ip'))
current_peer = current_ip.peer
config_file = current_ip.config_file
if not user_has_access_to_peer(user_acl, current_peer):
raise Http404
if request.GET.get('action') == 'delete':
if request.GET.get('confirmation') == 'delete':
current_ip.delete()
messages.success(request, _('IP address deleted|IP address deleted successfully.'))
current_peer.wireguard_instance.pending_changes = True
current_peer.wireguard_instance.save()
return redirect('/peer/manage/?peer=' + str(current_peer.uuid))
else:
messages.warning(request, _('Error deleting IP address|Invalid confirmation message. Type "delete" to confirm.'))
return redirect('/peer/manage_ip_address/?ip=' + str(current_ip.uuid))
if config_file not in ['client', 'server']:
config_file = 'server'
if config_file == 'client':
page_title = _('Manage client route')
else:
page_title = _('Manage IP address or Network')
if request.method == 'POST':
form = PeerAllowedIPForm(request.POST or None, instance=current_ip, current_peer=current_peer, config_file=config_file)
if form.is_valid():
this_form = form.save(commit=False)
if not current_ip:
this_form.peer = current_peer
this_form.config_file = config_file
this_form.save()
current_peer.wireguard_instance.pending_changes = True
current_peer.wireguard_instance.save()
if current_ip:
messages.success(request, _('IP address updated|IP address updated successfully.'))
else:
messages.success(request, _('IP address added|IP address added successfully.'))
return redirect('/peer/manage/?peer=' + str(current_peer.uuid))
else:
form = PeerAllowedIPForm(instance=current_ip, current_peer=current_peer)
return render(request, 'wireguard/wireguard_manage_ip.html', {
'page_title': page_title, 'form': form, 'current_peer': current_peer, 'current_ip': current_ip
})