From e57bce8495b711b945c798c36a94b9930afe87e0 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 12 Mar 2026 09:22:00 -0300 Subject: [PATCH] add gatekeeper management views, forms, and templates --- gatekeeper/forms.py | 248 ++++++++++++++++ gatekeeper/urls.py | 28 ++ gatekeeper/views.py | 336 +++++++++++++++++++++- requirements.txt | 1 + templates/gatekeeper/gatekeeper_list.html | 244 ++++++++++++++++ wireguard_webadmin/urls.py | 1 + 6 files changed, 857 insertions(+), 1 deletion(-) create mode 100644 gatekeeper/forms.py create mode 100644 gatekeeper/urls.py create mode 100644 templates/gatekeeper/gatekeeper_list.html diff --git a/gatekeeper/forms.py b/gatekeeper/forms.py new file mode 100644 index 0000000..953d4eb --- /dev/null +++ b/gatekeeper/forms.py @@ -0,0 +1,248 @@ +import pyotp +from crispy_forms.helper import FormHelper +from crispy_forms.layout import Layout, Submit, HTML, Div, Field +from django import forms +from django.utils.translation import gettext_lazy as _ + +from gatekeeper.models import GatekeeperUser, GatekeeperGroup, AuthMethod, AuthMethodAllowedDomain, \ + AuthMethodAllowedEmail + + +class GatekeeperUserForm(forms.ModelForm): + class Meta: + model = GatekeeperUser + fields = ['username', 'email', 'password', 'totp_secret'] + labels = { + 'username': _('Username'), + 'email': _('Email'), + 'password': _('Password'), + 'totp_secret': _('TOTP Secret'), + } + + def __init__(self, *args, **kwargs): + cancel_url = kwargs.pop('cancel_url', '#') + super().__init__(*args, **kwargs) + + self.helper = FormHelper() + self.helper.layout = Layout( + Div( + Div('username', css_class='col-md-6'), + Div('email', css_class='col-md-6'), + css_class='row' + ), + Div( + Div(Field('password', type='password'), css_class='col-md-6'), + Div('totp_secret', css_class='col-md-6'), + css_class='row' + ), + Div( + Div( + Submit('submit', _('Save'), css_class='btn btn-primary'), + HTML(f'{_("Cancel")}'), + css_class='col-12 d-flex justify-content-end gap-2 mt-3' + ), + css_class='row' + ) + ) + + +class GatekeeperGroupForm(forms.ModelForm): + class Meta: + model = GatekeeperGroup + fields = ['name', 'users'] + labels = { + 'name': _('Group Name'), + 'users': _('Members'), + } + + def __init__(self, *args, **kwargs): + cancel_url = kwargs.pop('cancel_url', '#') + super().__init__(*args, **kwargs) + + self.helper = FormHelper() + self.helper.layout = Layout( + Div( + Div('name', css_class='col-md-12'), + css_class='row' + ), + Div( + Div('users', css_class='col-md-12'), + css_class='row' + ), + Div( + Div( + Submit('submit', _('Save'), css_class='btn btn-primary'), + HTML(f'{_("Cancel")}'), + css_class='col-12 d-flex justify-content-end gap-2 mt-3' + ), + css_class='row' + ) + ) + + +class AuthMethodForm(forms.ModelForm): + totp_pin = forms.CharField( + label=_('TOTP Validation PIN'), + max_length=6, + required=False, + help_text=_('Enter a 6-digit PIN generated by your authenticator app to validate the secret.') + ) + + class Meta: + model = AuthMethod + fields = [ + 'name', 'auth_type', 'totp_secret', + 'oidc_provider', 'oidc_client_id', 'oidc_client_secret' + ] + labels = { + 'name': _('Name'), + 'auth_type': _('Authentication Type'), + 'totp_secret': _('Global TOTP Secret'), + 'oidc_provider': _('OIDC Provider URL'), + 'oidc_client_id': _('OIDC Client ID'), + 'oidc_client_secret': _('OIDC Client Secret'), + } + + def __init__(self, *args, **kwargs): + cancel_url = kwargs.pop('cancel_url', '#') + super().__init__(*args, **kwargs) + + if self.instance and self.instance.pk: + self.fields['auth_type'].disabled = True + + self.helper = FormHelper() + self.helper.layout = Layout( + Div( + Div('name', css_class='col-md-6'), + Div('auth_type', css_class='col-md-6'), + css_class='row' + ), + Div( + Div('totp_secret', css_class='col-md-6'), + Div('totp_pin', css_class='col-md-6'), + css_class='row' + ), + Div( + Div('oidc_provider', css_class='col-md-12'), + css_class='row' + ), + Div( + Div('oidc_client_id', css_class='col-md-6'), + Div('oidc_client_secret', css_class='col-md-6'), + css_class='row' + ), + Div( + Div( + Submit('submit', _('Save'), css_class='btn btn-primary'), + HTML(f'{_("Cancel")}'), + css_class='col-12 d-flex justify-content-end gap-2 mt-3' + ), + css_class='row' + ) + ) + + def clean(self): + cleaned_data = super().clean() + auth_type = cleaned_data.get('auth_type') + totp_secret = cleaned_data.get('totp_secret') + oidc_provider = cleaned_data.get('oidc_provider') + oidc_client_id = cleaned_data.get('oidc_client_id') + oidc_client_secret = cleaned_data.get('oidc_client_secret') + + if auth_type == 'local_password': + if totp_secret: + self.add_error('totp_secret', _('TOTP secret must be empty for Local Password authentication.')) + if cleaned_data.get('totp_pin'): + self.add_error('totp_pin', _('TOTP validation PIN must be empty for Local Password authentication.')) + if oidc_provider or oidc_client_id or oidc_client_secret: + self.add_error(None, _('OIDC fields must be empty for Local Password authentication.')) + + existing_local = AuthMethod.objects.filter(auth_type='local_password') + if self.instance and self.instance.pk: + existing_local = existing_local.exclude(pk=self.instance.pk) + if existing_local.exists(): + self.add_error('auth_type', _('Only one Local Password authentication method can be configured.')) + elif auth_type == 'totp': + if oidc_provider or oidc_client_id or oidc_client_secret: + self.add_error(None, _('OIDC fields must be empty for TOTP authentication.')) + if not totp_secret: + self.add_error('totp_secret', _('TOTP secret is required for TOTP authentication.')) + else: + totp_pin = cleaned_data.get('totp_pin') + if not totp_pin: + self.add_error('totp_pin', _('Please provide a PIN to validate the TOTP secret.')) + else: + try: + totp = pyotp.TOTP(totp_secret) + if not totp.verify(totp_pin): + self.add_error('totp_pin', _('Invalid TOTP PIN.')) + except Exception: + self.add_error('totp_secret', _('Invalid TOTP secret format. Must be a valid Base32 string.')) + elif auth_type == 'oidc': + if totp_secret: + self.add_error('totp_secret', _('TOTP secret must be empty for OIDC authentication.')) + if cleaned_data.get('totp_pin'): + self.add_error('totp_pin', _('TOTP validation PIN must be empty for OIDC authentication.')) + + return cleaned_data + +class AuthMethodAllowedDomainForm(forms.ModelForm): + class Meta: + model = AuthMethodAllowedDomain + fields = ['auth_method', 'domain'] + labels = { + 'auth_method': _('Authentication Method'), + 'domain': _('Domain'), + } + + def __init__(self, *args, **kwargs): + cancel_url = kwargs.pop('cancel_url', '#') + super().__init__(*args, **kwargs) + + self.helper = FormHelper() + self.helper.layout = Layout( + Div( + Div('auth_method', css_class='col-md-6'), + Div('domain', css_class='col-md-6'), + css_class='row' + ), + Div( + Div( + Submit('submit', _('Save'), css_class='btn btn-primary'), + HTML(f'{_("Cancel")}'), + css_class='col-12 d-flex justify-content-end gap-2 mt-3' + ), + css_class='row' + ) + ) + + +class AuthMethodAllowedEmailForm(forms.ModelForm): + class Meta: + model = AuthMethodAllowedEmail + fields = ['auth_method', 'email'] + labels = { + 'auth_method': _('Authentication Method'), + 'email': _('Email'), + } + + def __init__(self, *args, **kwargs): + cancel_url = kwargs.pop('cancel_url', '#') + super().__init__(*args, **kwargs) + + self.helper = FormHelper() + self.helper.layout = Layout( + Div( + Div('auth_method', css_class='col-md-6'), + Div('email', css_class='col-md-6'), + css_class='row' + ), + Div( + Div( + Submit('submit', _('Save'), css_class='btn btn-primary'), + HTML(f'{_("Cancel")}'), + css_class='col-12 d-flex justify-content-end gap-2 mt-3' + ), + css_class='row' + ) + ) diff --git a/gatekeeper/urls.py b/gatekeeper/urls.py new file mode 100644 index 0000000..61c66fb --- /dev/null +++ b/gatekeeper/urls.py @@ -0,0 +1,28 @@ +from django.urls import path + +from gatekeeper import views + +urlpatterns = [ + # Main Dashboard / List + path('', views.view_gatekeeper_list, name='gatekeeper_list'), + + # Gatekeeper Users + path('user/manage/', views.view_manage_gatekeeper_user, name='manage_gatekeeper_user'), + path('user/delete/', views.view_delete_gatekeeper_user, name='delete_gatekeeper_user'), + + # Gatekeeper Groups + path('group/manage/', views.view_manage_gatekeeper_group, name='manage_gatekeeper_group'), + path('group/delete/', views.view_delete_gatekeeper_group, name='delete_gatekeeper_group'), + + # Auth Methods + path('auth_method/manage/', views.view_manage_auth_method, name='manage_gatekeeper_auth_method'), + path('auth_method/delete/', views.view_delete_auth_method, name='delete_gatekeeper_auth_method'), + + # Auth Method Allowed Domains + path('domain/manage/', views.view_manage_auth_domain, name='manage_gatekeeper_domain'), + path('domain/delete/', views.view_delete_auth_domain, name='delete_gatekeeper_domain'), + + # Auth Method Allowed Emails + path('email/manage/', views.view_manage_auth_email, name='manage_gatekeeper_email'), + path('email/delete/', views.view_delete_auth_email, name='delete_gatekeeper_email'), +] diff --git a/gatekeeper/views.py b/gatekeeper/views.py index 60f00ef..43b010a 100644 --- a/gatekeeper/views.py +++ b/gatekeeper/views.py @@ -1 +1,335 @@ -# Create your views here. +from django.contrib import messages +from django.contrib.auth.decorators import login_required +from django.shortcuts import render, get_object_or_404, redirect +from django.urls import reverse +from django.utils.translation import gettext as _ + +from gatekeeper.forms import GatekeeperUserForm, GatekeeperGroupForm, AuthMethodForm, AuthMethodAllowedDomainForm, \ + AuthMethodAllowedEmailForm +from gatekeeper.models import GatekeeperUser, GatekeeperGroup, AuthMethod, AuthMethodAllowedDomain, \ + AuthMethodAllowedEmail +from user_manager.models import UserAcl + + +@login_required +def view_gatekeeper_list(request): + """Main list view containing tabs for Users, Groups, and Auth Methods""" + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=20).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + users = GatekeeperUser.objects.all().order_by('username') + groups = GatekeeperGroup.objects.all().order_by('name') + auth_methods = AuthMethod.objects.all().order_by('name') + auth_domains = AuthMethodAllowedDomain.objects.all().order_by('domain') + auth_emails = AuthMethodAllowedEmail.objects.all().order_by('email') + + tab = request.GET.get('tab', 'users') + + context = { + 'users': users, + 'groups': groups, + 'auth_methods': auth_methods, + 'auth_domains': auth_domains, + 'auth_emails': auth_emails, + 'active_tab': tab, + } + return render(request, 'gatekeeper/gatekeeper_list.html', context) + + +@login_required +def view_manage_gatekeeper_user(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + + if obj_uuid: + obj = get_object_or_404(GatekeeperUser, uuid=obj_uuid) + title = _('Edit Gatekeeper User') + else: + obj = None + title = _('Create Gatekeeper User') + + cancel_url = reverse('gatekeeper_list') + '?tab=users' + + if request.method == 'POST': + form = GatekeeperUserForm(request.POST, instance=obj, cancel_url=cancel_url) + if form.is_valid(): + form.save() + messages.success(request, _('Gatekeeper User saved successfully.')) + return redirect(cancel_url) + else: + form = GatekeeperUserForm(instance=obj, cancel_url=cancel_url) + + context = { + 'form': form, + 'title': title, + 'page_title': title, + } + return render(request, 'generic_form.html', context) + + +@login_required +def view_delete_gatekeeper_user(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + obj = get_object_or_404(GatekeeperUser, uuid=obj_uuid) + + cancel_url = reverse('gatekeeper_list') + '?tab=users' + + if request.method == 'POST': + obj.delete() + messages.success(request, _('Gatekeeper User deleted successfully.')) + return redirect(cancel_url) + + context = { + 'object': obj, + 'title': _('Delete Gatekeeper User'), + 'cancel_url': cancel_url, + 'text': _('Are you sure you want to delete the user "%(username)s"?') % {'username': obj.username} + } + return render(request, 'generic_delete_confirmation.html', context) + + +@login_required +def view_manage_gatekeeper_group(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + + if obj_uuid: + obj = get_object_or_404(GatekeeperGroup, uuid=obj_uuid) + title = _('Edit Gatekeeper Group') + else: + obj = None + title = _('Create Gatekeeper Group') + + cancel_url = reverse('gatekeeper_list') + '?tab=groups' + + if request.method == 'POST': + form = GatekeeperGroupForm(request.POST, instance=obj, cancel_url=cancel_url) + if form.is_valid(): + form.save() + messages.success(request, _('Gatekeeper Group saved successfully.')) + return redirect(cancel_url) + else: + form = GatekeeperGroupForm(instance=obj, cancel_url=cancel_url) + + context = { + 'form': form, + 'title': title, + 'page_title': title, + } + return render(request, 'generic_form.html', context) + + +@login_required +def view_delete_gatekeeper_group(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + obj = get_object_or_404(GatekeeperGroup, uuid=obj_uuid) + + cancel_url = reverse('gatekeeper_list') + '?tab=groups' + + if request.method == 'POST': + obj.delete() + messages.success(request, _('Gatekeeper Group deleted successfully.')) + return redirect(cancel_url) + + context = { + 'object': obj, + 'title': _('Delete Gatekeeper Group'), + 'cancel_url': cancel_url, + 'text': _('Are you sure you want to delete the group "%(name)s"?') % {'name': obj.name} + } + return render(request, 'generic_delete_confirmation.html', context) + + +@login_required +def view_manage_auth_method(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + + if obj_uuid: + obj = get_object_or_404(AuthMethod, uuid=obj_uuid) + title = _('Edit Authentication Method') + else: + obj = None + title = _('Create Authentication Method') + + cancel_url = reverse('gatekeeper_list') + '?tab=auth_methods' + + if request.method == 'POST': + form = AuthMethodForm(request.POST, instance=obj, cancel_url=cancel_url) + if form.is_valid(): + form.save() + messages.success(request, _('Authentication Method saved successfully.')) + return redirect(cancel_url) + else: + form = AuthMethodForm(instance=obj, cancel_url=cancel_url) + + form_description = { + 'size': '', + 'content': _(''' +
Authentication Types
+

Select how users will authenticate through this method.

+ + ''') + } + + context = { + 'form': form, + 'title': title, + 'page_title': title, + 'form_description': form_description, + } + return render(request, 'generic_form.html', context) + + +@login_required +def view_delete_auth_method(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + obj = get_object_or_404(AuthMethod, uuid=obj_uuid) + + cancel_url = reverse('gatekeeper_list') + '?tab=auth_methods' + + if request.method == 'POST': + obj.delete() + messages.success(request, _('Authentication Method deleted successfully.')) + return redirect(cancel_url) + + context = { + 'object': obj, + 'title': _('Delete Authentication Method'), + 'cancel_url': cancel_url, + 'text': _('Are you sure you want to delete the authentication method "%(name)s"?') % {'name': obj.name} + } + return render(request, 'generic_delete_confirmation.html', context) + + +@login_required +def view_manage_auth_domain(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + + if obj_uuid: + obj = get_object_or_404(AuthMethodAllowedDomain, uuid=obj_uuid) + title = _('Edit Allowed Domain') + else: + obj = None + title = _('Add Allowed Domain') + + cancel_url = reverse('gatekeeper_list') + '?tab=allowed_identities' + + if request.method == 'POST': + form = AuthMethodAllowedDomainForm(request.POST, instance=obj, cancel_url=cancel_url) + if form.is_valid(): + form.save() + messages.success(request, _('Allowed Domain saved successfully.')) + return redirect(cancel_url) + else: + form = AuthMethodAllowedDomainForm(instance=obj, cancel_url=cancel_url) + + context = { + 'form': form, + 'title': title, + 'page_title': title, + } + return render(request, 'generic_form.html', context) + + +@login_required +def view_delete_auth_domain(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + obj = get_object_or_404(AuthMethodAllowedDomain, uuid=obj_uuid) + + cancel_url = reverse('gatekeeper_list') + '?tab=allowed_identities' + + if request.method == 'POST': + obj.delete() + messages.success(request, _('Allowed Domain deleted successfully.')) + return redirect(cancel_url) + + context = { + 'object': obj, + 'title': _('Delete Allowed Domain'), + 'cancel_url': cancel_url, + 'text': _('Are you sure you want to delete the allowed domain "%(domain)s"?') % {'domain': obj.domain} + } + return render(request, 'generic_delete_confirmation.html', context) + + +@login_required +def view_manage_auth_email(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + + if obj_uuid: + obj = get_object_or_404(AuthMethodAllowedEmail, uuid=obj_uuid) + title = _('Edit Allowed Email') + else: + obj = None + title = _('Add Allowed Email') + + cancel_url = reverse('gatekeeper_list') + '?tab=allowed_identities' + + if request.method == 'POST': + form = AuthMethodAllowedEmailForm(request.POST, instance=obj, cancel_url=cancel_url) + if form.is_valid(): + form.save() + messages.success(request, _('Allowed Email saved successfully.')) + return redirect(cancel_url) + else: + form = AuthMethodAllowedEmailForm(instance=obj, cancel_url=cancel_url) + + context = { + 'form': form, + 'title': title, + 'page_title': title, + } + return render(request, 'generic_form.html', context) + + +@login_required +def view_delete_auth_email(request): + if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=50).exists(): + return render(request, 'access_denied.html', {'page_title': _('Access Denied')}) + + obj_uuid = request.GET.get('uuid') + obj = get_object_or_404(AuthMethodAllowedEmail, uuid=obj_uuid) + + cancel_url = reverse('gatekeeper_list') + '?tab=allowed_identities' + + if request.method == 'POST': + obj.delete() + messages.success(request, _('Allowed Email deleted successfully.')) + return redirect(cancel_url) + + context = { + 'object': obj, + 'title': _('Delete Allowed Email'), + 'cancel_url': cancel_url, + 'text': _('Are you sure you want to delete the allowed email "%(email)s"?') % {'email': obj.email} + } + return render(request, 'generic_delete_confirmation.html', context) diff --git a/requirements.txt b/requirements.txt index ca37b1b..cd833cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,7 @@ packaging==26.0 pillow==12.1.1 polib==1.2.0 pycparser==3.0 +pyotp==2.9.0 pypng==0.20220715.0 pyproject_hooks==1.2.0 pytz==2025.2 diff --git a/templates/gatekeeper/gatekeeper_list.html b/templates/gatekeeper/gatekeeper_list.html new file mode 100644 index 0000000..3aedde1 --- /dev/null +++ b/templates/gatekeeper/gatekeeper_list.html @@ -0,0 +1,244 @@ +{% extends "base.html" %} +{% load i18n %} + +{% block content %} +
+
+ + + +
+ {% if active_tab == 'users' or active_tab == 'groups' %} + + + {% if active_tab == 'users' %} + + + + {% if users %} +
+ + + + + + + + + + {% for user in users %} + + + + + + {% endfor %} + +
{% trans 'Username' %}{% trans 'Email' %}{% trans 'Actions' %}
{{ user.username }}{{ user.email }} + + + + + + +
+
+ {% else %} +
+ {% trans 'No Gatekeeper Users found.' %} +
+ {% endif %} + + {% elif active_tab == 'groups' %} + + + {% if groups %} +
+ + + + + + + + + + {% for group in groups %} + + + + + + {% endfor %} + +
{% trans 'Group Name' %}{% trans 'Members' %}{% trans 'Actions' %}
{{ group.name }}{{ group.users.count }} + + + + + + +
+
+ {% else %} +
+ {% trans 'No Gatekeeper Groups found.' %} +
+ {% endif %} + + {% endif %} + + {% elif active_tab == 'auth_methods' %} + + + {% if auth_methods %} +
+ + + + + + + + + + {% for method in auth_methods %} + + + + + + {% endfor %} + +
{% trans 'Name' %}{% trans 'Type' %}{% trans 'Actions' %}
{{ method.name }}{{ method.get_auth_type_display }} + + + + + + +
+
+ {% else %} +
+ {% trans 'No Authentication Methods found.' %} +
+ {% endif %} + + {% elif active_tab == 'allowed_identities' %} + + + {% if auth_emails or auth_domains %} +
+ + + + + + + + + + + {% for email in auth_emails %} + + + + + + + {% endfor %} + {% for domain in auth_domains %} + + + + + + + {% endfor %} + +
{% trans 'Type' %}{% trans 'Identity' %}{% trans 'Auth Method' %}{% trans 'Actions' %}
{% trans 'Email' %}{{ email.email }}{{ email.auth_method.name }} + + + + + + +
{% trans 'Domain' %}{{ domain.domain }}{{ domain.auth_method.name }} + + + + + + +
+
+ {% else %} +
+ {% trans 'No Allowed Emails or Domains found.' %} +
+ {% endif %} + + {% endif %} +
+ +
+
+{% endblock %} diff --git a/wireguard_webadmin/urls.py b/wireguard_webadmin/urls.py index 38cd721..1e2647d 100644 --- a/wireguard_webadmin/urls.py +++ b/wireguard_webadmin/urls.py @@ -22,6 +22,7 @@ urlpatterns = [ path('firewall/', include('firewall.urls')), path('peer/', include('wireguard_peer.urls')), path('routing-templates/', include('routing_templates.urls')), + path('gatekeeper/', include('gatekeeper.urls')), path('scheduler/', include('scheduler.urls')), path('server/', include('wireguard.urls')), path('tools/', include('wireguard_tools.urls')),