Peer display filtering for restricted user

This commit is contained in:
Eduardo Silva 2025-01-21 11:56:05 -03:00
parent 9834a3e825
commit 64cd174639
5 changed files with 94 additions and 16 deletions

View File

@ -2,3 +2,5 @@
<a href="/user/list/" class="btn {% if request.path == '/user/list/' %}btn-outline-primary{% else %}btn-primary{% endif %}">List Users</a>
<a href="/user/peer-group/list/" class="btn {% if request.path == '/user/peer-group/list/' %}btn-outline-primary{% else %}btn-primary{% endif %}">List Peer Groups</a>
<a href="/user/peer-group/manage/" class="btn btn-primary">Add Peer Group</a>
<br><br><br><h5>Warning:</h5>
<p>The limitation of Peer Groups for users is implemented. However, in some places, information about other peers may still leak, such as in Console -> wg show or through the endpoint /api/wireguard_status/. In the next version, we will add a fix for this.</p>

View File

@ -56,11 +56,12 @@
</div>
{% if add_peer_enabled %}
<a class="btn btn-primary" href="/peer/manage/?instance={{ current_instance.uuid}}">Create Peer</a>
<a class="btn btn-outline-primary disabled" href="/peer/import_peers/?instance={{ current_instance.uuid}}" title='teste'>Import peers</a>
{% else %}
<a class="btn btn-primary disabled" href="">Create Peer</a>
{% endif %}
{% comment %}<a class="btn btn-outline-primary disabled" href="/peer/import_peers/?instance={{ current_instance.uuid}}" title='teste'>Import peers</a>{% endcomment %}
</div>
</div>

View File

@ -1,5 +1,53 @@
import ipaddress, re
import subprocess
from wireguard.models import Peer, WireGuardInstance
from user_manager.models import UserAcl
def user_has_access_to_instance(user_acl: UserAcl, instance: WireGuardInstance):
if user_acl.peer_groups.all():
if user_acl.peer_groups.filter(server_instance=instance).exists():
return True
else:
return True
return False
def user_has_access_to_peer(user_acl: UserAcl, peer: Peer):
if user_acl.peer_groups.all():
if user_acl.peer_groups.filter(peer=peer).exists():
return True
if user_acl.peer_groups.filter(server_instance=peer.wireguard_instance).exists():
return True
else:
return True
return False
def user_allowed_instances(user_acl: UserAcl):
if not user_acl.peer_groups.exists():
return WireGuardInstance.objects.all().order_by('instance_id')
instances_from_groups = WireGuardInstance.objects.filter(peergroup__in=user_acl.peer_groups.all())
instances_from_peers = WireGuardInstance.objects.filter(peer__peergroup__in=user_acl.peer_groups.all())
return instances_from_groups.union(instances_from_peers).order_by('instance_id')
def user_allowed_peers(user_acl: UserAcl, instance: WireGuardInstance):
if not user_acl.peer_groups.exists():
return Peer.objects.filter(wireguard_instance=instance).order_by('name')
peers_from_direct = Peer.objects.filter(
wireguard_instance=instance,
peergroup__in=user_acl.peer_groups.all()
)
peers_from_instance = Peer.objects.filter(
wireguard_instance=instance,
wireguard_instance__peergroup__in=user_acl.peer_groups.filter(server_instance=instance)
)
return peers_from_direct.union(peers_from_instance).order_by('name')
def is_valid_ip_or_hostname(value):

View File

@ -6,7 +6,8 @@ from django.contrib import messages
from django.db.models import Max
import subprocess
import ipaddress
from wgwadmlibrary.tools import user_has_access_to_peer, user_has_access_to_instance, user_allowed_instances, user_allowed_peers
from django.http import Http404
from wireguard_peer.forms import PeerAllowedIPForm, PeerForm
@ -44,22 +45,32 @@ def generate_peer_default(wireguard_instance):
@login_required
def view_wireguard_peer_list(request):
page_title = 'WireGuard Peer List'
wireguard_instances = WireGuardInstance.objects.all().order_by('instance_id')
if wireguard_instances.filter(pending_changes=True).exists():
user_acl = get_object_or_404(UserAcl, user=request.user)
wireguard_instances = user_allowed_instances(user_acl)
if WireGuardInstance.objects.filter(pending_changes=True).exists():
pending_changes_warning = True
else:
pending_changes_warning = False
if wireguard_instances:
if request.GET.get('uuid'):
current_instance = get_object_or_404(WireGuardInstance, uuid=request.GET.get('uuid'))
else:
current_instance = wireguard_instances.first()
peer_list = current_instance.peer_set.all()
if current_instance not in wireguard_instances:
raise Http404
peer_list = user_allowed_peers(user_acl, current_instance)
else:
current_instance = None
peer_list = None
context = {'page_title': page_title, 'wireguard_instances': wireguard_instances, 'current_instance': current_instance, 'peer_list': peer_list, 'pending_changes_warning': pending_changes_warning}
add_peer_enabled = False
if current_instance:
if user_has_access_to_instance(user_acl, current_instance):
add_peer_enabled = True
context = {'page_title': page_title, 'wireguard_instances': wireguard_instances, 'current_instance': current_instance, 'peer_list': peer_list, 'pending_changes_warning': pending_changes_warning, 'add_peer_enabled': add_peer_enabled}
return render(request, 'wireguard/wireguard_peer_list.html', context)
@ -71,9 +82,12 @@ def view_wireguard_peer_manage(request):
else:
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=20).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
user_acl = get_object_or_404(UserAcl, user=request.user)
if request.GET.get('instance'):
current_instance = get_object_or_404(WireGuardInstance, uuid=request.GET.get('instance'))
if not user_has_access_to_instance(user_acl, current_instance):
raise Http404
current_peer = None
page_title = 'Create a new Peer for instance wg' + str(current_instance.instance_id)
new_peer_data = generate_peer_default(current_instance)
@ -104,6 +118,8 @@ def view_wireguard_peer_manage(request):
elif request.GET.get('peer'):
current_peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
if not user_has_access_to_peer(user_acl, current_peer):
raise Http404
current_instance = current_peer.wireguard_instance
if request.GET.get('action') == 'delete':
if request.GET.get('confirmation') == 'delete':
@ -145,17 +161,21 @@ def view_manage_ip_address(request):
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=30).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
user_acl = get_object_or_404(UserAcl, user=request.user)
config_file = request.GET.get('config', 'server')
if request.GET.get('peer'):
current_peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
#page_title = 'Add new IP address for Peer ' + str(current_peer)
current_ip = None
if not user_has_access_to_peer(user_acl, current_peer):
raise Http404
elif request.GET.get('ip'):
current_ip = get_object_or_404(PeerAllowedIP, uuid=request.GET.get('ip'))
current_peer = current_ip.peer
config_file = current_ip.config_file
#page_title = 'Update IP address for Peer ' + str(current_peer)
if not user_has_access_to_peer(user_acl, current_peer):
raise Http404
if request.GET.get('action') == 'delete':
if request.GET.get('confirmation') == 'delete':

View File

@ -3,7 +3,7 @@ import re
import qrcode
import subprocess
from django.http import HttpResponse
from django.shortcuts import redirect, get_object_or_404, render
from django.shortcuts import redirect, get_object_or_404, render, Http404
from dns.views import export_dns_configuration
from firewall.tools import generate_firewall_header, generate_firewall_footer, generate_port_forward_firewall, \
@ -14,6 +14,8 @@ from firewall.models import RedirectRule
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from io import BytesIO
from wgwadmlibrary.tools import user_has_access_to_peer
def clean_command_field(command_field):
@ -161,10 +163,14 @@ def export_wireguard_configs(request):
def download_config_or_qrcode(request):
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=20).exists():
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
peer_uuid = request.GET.get('uuid')
peer = get_object_or_404(Peer, uuid=request.GET.get('uuid'))
user_acl = get_object_or_404(UserAcl, user=request.user)
if not user_has_access_to_peer(user_acl, peer):
raise Http404
format_type = request.GET.get('format', 'conf')
config_content = generate_peer_config(peer_uuid)
config_content = generate_peer_config(peer.uuid)
if format_type == 'qrcode':
qr = qrcode.QRCode(
@ -185,7 +191,8 @@ def download_config_or_qrcode(request):
else:
response = HttpResponse(config_content, content_type="text/plain")
response['Content-Disposition'] = f'attachment; filename="peer_{peer_uuid}.conf"'
peer_filename = re.sub(r'[^a-zA-Z0-9]', '_', str(peer))
response['Content-Disposition'] = f'attachment; filename="peer_{peer_filename}.conf"'
return response