Initial commit

This commit is contained in:
Eduardo Silva
2024-02-14 16:36:01 -03:00
commit ed55724808
2505 changed files with 1136655 additions and 0 deletions

View File

3
wireguard_peer/admin.py Normal file
View File

@@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
wireguard_peer/apps.py Normal file
View File

@@ -0,0 +1,6 @@
from django.apps import AppConfig
class WireguardPeerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'wireguard_peer'

60
wireguard_peer/forms.py Normal file
View File

@@ -0,0 +1,60 @@
from django import forms
from wireguard.models import Peer, PeerAllowedIP, NETMASK_CHOICES
import ipaddress
class PeerForm(forms.ModelForm):
name = forms.CharField(label='Name', required=False)
public_key = forms.CharField(label='Public Key', required=True)
private_key = forms.CharField(label='Private Key', required=False)
pre_shared_key = forms.CharField(label='Pre-Shared Key', required=True)
persistent_keepalive = forms.IntegerField(label='Persistent Keepalive', required=True)
class Meta:
model = Peer
fields = ['name', 'public_key', 'private_key', 'pre_shared_key', 'persistent_keepalive']
class PeerAllowedIPForm(forms.ModelForm):
def __init__(self, *args, **kwargs):
current_peer = kwargs.pop('current_peer', None)
super().__init__(*args, **kwargs)
self.current_peer = current_peer
allowed_ip = forms.GenericIPAddressField(label='Allowed IP or Network', required=True)
netmask = forms.ChoiceField(choices=NETMASK_CHOICES, label='Netmask', initial=24, required=True)
priority = forms.IntegerField(label='Priority', required=True, initial=1)
def clean(self):
cleaned_data = super().clean()
priority = cleaned_data.get('priority')
allowed_ip = cleaned_data.get('allowed_ip')
netmask = cleaned_data.get('netmask')
wireguard_network = ipaddress.ip_network(f"{self.current_peer.wireguard_instance.address}/{self.current_peer.wireguard_instance.netmask}", strict=False)
if priority == 0:
zero_priority_ips_query = PeerAllowedIP.objects.filter(peer=self.current_peer, priority=0)
if self.instance:
zero_priority_ips_query = zero_priority_ips_query.exclude(uuid=self.instance.uuid)
if zero_priority_ips_query.exists():
raise forms.ValidationError("A peer can have only one IP with priority zero.")
duplicated_ip = PeerAllowedIP.objects.filter(allowed_ip=allowed_ip)
if self.instance:
duplicated_ip = duplicated_ip.exclude(uuid=self.instance.uuid)
if duplicated_ip.exists():
raise forms.ValidationError("This IP is already in use by another peer.")
if ipaddress.ip_address(allowed_ip) not in wireguard_network:
raise forms.ValidationError("The IP address does not belong to the Peer's WireGuard instance network range. Please check the IP address or change the priority.")
if str(netmask) != str(32):
raise forms.ValidationError("The netmask for priority 0 IP must be 32.")
if self.current_peer.wireguard_instance.address == allowed_ip:
raise forms.ValidationError("The IP address is the same as the Peer's WireGuard instance address.")
else:
if ipaddress.ip_address(allowed_ip) in wireguard_network:
raise forms.ValidationError("The IP address belongs to the Peer's WireGuard instance network range. Please check the IP address or change use priority 0 instead.")
class Meta:
model = PeerAllowedIP
fields = ['allowed_ip', 'priority', 'netmask']

View File

3
wireguard_peer/models.py Normal file
View File

@@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

3
wireguard_peer/tests.py Normal file
View File

@@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

169
wireguard_peer/views.py Normal file
View File

@@ -0,0 +1,169 @@
from django.shortcuts import render, get_object_or_404, redirect
from django.contrib.auth.decorators import login_required
from wireguard.models import WireGuardInstance, Peer, PeerAllowedIP
from django.contrib import messages
from django.db.models import Max
import subprocess
import ipaddress
from wireguard_peer.forms import PeerAllowedIPForm, PeerForm
def generate_peer_default(wireguard_instance):
private_key = subprocess.check_output('wg genkey', shell=True).decode('utf-8').strip()
public_key = subprocess.check_output(f'echo {private_key} | wg pubkey', shell=True).decode('utf-8').strip()
pre_shared_key = subprocess.check_output('wg genpsk', shell=True).decode('utf-8').strip()
address = wireguard_instance.address
netmask = wireguard_instance.netmask
cidr_network = f"{address}/{netmask}"
network = ipaddress.ip_network(cidr_network, strict=False)
# the code below can be an issue for larger networks, for now it's fine, but it should be optimized in the future
used_ips = set(WireGuardInstance.objects.all().values_list('address', flat=True)) | \
set(PeerAllowedIP.objects.all().values_list('allowed_ip', flat=True))
free_ip_address = None
for ip in network.hosts():
if str(ip) not in used_ips:
free_ip_address = str(ip)
break
return {
'name': '',
'public_key': public_key,
'pre_shared_key': pre_shared_key,
'persistent_keepalive': 25,
'private_key': private_key,
'wireguard_instance': wireguard_instance,
'allowed_ip': free_ip_address,
}
@login_required
def view_wireguard_peer_list(request):
page_title = 'WireGuard Peer List'
wireguard_instances = WireGuardInstance.objects.all().order_by('instance_id')
if wireguard_instances:
if request.GET.get('uuid'):
current_instance = get_object_or_404(WireGuardInstance, uuid=request.GET.get('uuid'))
else:
current_instance = wireguard_instances.first()
peer_list = current_instance.peer_set.all()
else:
current_instance = None
peer_list = None
context = {'page_title': page_title, 'wireguard_instances': wireguard_instances, 'current_instance': current_instance, 'peer_list': peer_list}
return render(request, 'wireguard/wireguard_peer_list.html', context)
@login_required
def view_wireguard_peer_manage(request):
if request.GET.get('instance'):
current_instance = get_object_or_404(WireGuardInstance, uuid=request.GET.get('instance'))
current_peer = None
page_title = 'Create a new Peer for instance wg' + str(current_instance.instance_id)
new_peer_data = generate_peer_default(current_instance)
if new_peer_data['allowed_ip']:
new_peer = Peer.objects.create(
name=new_peer_data['name'],
public_key=new_peer_data['public_key'],
pre_shared_key=new_peer_data['pre_shared_key'],
persistent_keepalive=new_peer_data['persistent_keepalive'],
private_key=new_peer_data['private_key'],
wireguard_instance=current_instance,
)
PeerAllowedIP.objects.create(
peer=new_peer,
allowed_ip=new_peer_data['allowed_ip'],
priority=0,
netmask=32,
)
messages.success(request, 'Peer created|Peer for instance wg' + str(current_instance.instance_id) + ' created successfully with IP ' + new_peer_data['allowed_ip'] + '/32.')
return redirect('/peer/manage/?peer=' + str(new_peer.uuid))
else:
messages.warning(request, 'Error creating peer|No available IP address found for peer creation.')
return redirect('/peer/list/')
elif request.GET.get('peer'):
current_peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
current_instance = current_peer.wireguard_instance
if request.GET.get('action') == 'delete':
if request.GET.get('confirmation') == 'delete':
current_peer.delete()
messages.success(request, 'Peer deleted|Peer deleted successfully.')
return redirect('/peer/list/?uuid=' + str(current_instance.uuid))
else:
messages.warning(request, 'Error deleting peer|Invalid confirmation message. Type "delete" to confirm.')
return redirect('/peer/manage/?peer=' + str(current_peer.uuid))
page_title = 'Update Peer '
peer_ip_list = current_peer.peerallowedip_set.all().order_by('priority')
if current_peer.name:
page_title += current_peer.name
else:
page_title += current_peer.public_key
if request.method == 'POST':
form = PeerForm(request.POST, instance=current_peer)
if form.is_valid():
form.save()
messages.success(request, 'Peer updated|Peer updated successfully.')
return redirect('/peer/list/?uuid=' + str(current_peer.wireguard_instance.uuid))
else:
form = PeerForm(instance=current_peer)
else:
return redirect('/peer/list/')
context = {
'page_title': page_title, 'current_instance': current_instance, 'current_peer': current_peer, 'form': form, 'peer_ip_list': peer_ip_list
}
return render(request, 'wireguard/wireguard_manage_peer.html', context)
def view_manage_ip_address(request):
if request.GET.get('peer'):
current_peer = get_object_or_404(Peer, uuid=request.GET.get('peer'))
page_title = 'Add new IP address for Peer '
current_ip = None
if current_peer.name:
page_title += current_peer.name
else:
page_title += current_peer.public_key
elif request.GET.get('ip'):
current_ip = get_object_or_404(PeerAllowedIP, uuid=request.GET.get('ip'))
current_peer = current_ip.peer
page_title = 'Update IP address for Peer '
if current_peer.name:
page_title += current_peer.name
else:
page_title += current_peer.public_key
if request.GET.get('action') == 'delete':
if request.GET.get('confirmation') == 'delete':
current_ip.delete()
messages.success(request, 'IP address deleted|IP address deleted successfully.')
return redirect('/peer/manage/?peer=' + str(current_peer.uuid))
else:
messages.warning(request, 'Error deleting IP address|Invalid confirmation message. Type "delete" to confirm.')
return redirect('/peer/ip/?ip=' + str(current_ip.uuid))
if request.method == 'POST':
form = PeerAllowedIPForm(request.POST or None, instance=current_ip, current_peer=current_peer)
if form.is_valid():
this_form = form.save(commit=False)
if not current_ip:
this_form.peer = current_peer
this_form.save()
form.save()
if current_ip:
messages.success(request, 'IP address updated|IP address updated successfully.')
else:
messages.success(request, 'IP address added|IP address added successfully.')
return redirect('/peer/manage/?peer=' + str(current_peer.uuid))
else:
form = PeerAllowedIPForm(instance=current_ip, current_peer=current_peer)
return render(request, 'wireguard/wireguard_manage_ip.html', {
'page_title': page_title, 'form': form, 'current_peer': current_peer, 'current_ip': current_ip
})