Files
wireguard_webadmin/wgwadmlibrary/tools.py

214 lines
8.0 KiB
Python

import ipaddress
import random
import re
import smtplib
import subprocess
from datetime import timedelta
from email.mime.text import MIMEText
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.db.models import Max, Q
from django.utils import timezone
from user_manager.models import UserAcl
from vpn_invite.models import InviteSettings, PeerInvite
from wireguard.models import Peer, WireGuardInstance
from wireguard_tools.models import EmailSettings
def user_has_access_to_instance(user_acl: UserAcl, instance: WireGuardInstance):
if user_acl.peer_groups.all():
if user_acl.peer_groups.filter(server_instance=instance).exists():
return True
else:
return True
return False
def user_has_access_to_peer(user_acl: UserAcl, peer: Peer):
if user_acl.peer_groups.all():
if user_acl.peer_groups.filter(peer=peer).exists():
return True
if user_acl.peer_groups.filter(server_instance=peer.wireguard_instance).exists():
return True
else:
return True
return False
def user_allowed_instances(user_acl: UserAcl):
if not user_acl.peer_groups.exists():
return WireGuardInstance.objects.all().order_by('instance_id')
instances_from_groups = WireGuardInstance.objects.filter(peergroup__in=user_acl.peer_groups.all())
instances_from_peers = WireGuardInstance.objects.filter(peer__peergroup__in=user_acl.peer_groups.all())
return instances_from_groups.union(instances_from_peers).order_by('instance_id')
def user_allowed_peers(user_acl: UserAcl, instance: WireGuardInstance):
base_qs = Peer.objects.filter(wireguard_instance=instance)
if not user_acl.peer_groups.exists():
return base_qs.order_by('sort_order')
return (
base_qs.filter(
Q(peergroup__in=user_acl.peer_groups.all()) |
Q(wireguard_instance__peergroup__in=user_acl.peer_groups.filter(server_instance=instance))
).distinct().order_by('sort_order')
)
def is_valid_ip_or_hostname(value):
"""Check if a given string is a valid IP address or hostname."""
try:
ipaddress.ip_address(value)
return True
except:
pass
# Regex to check valid hostname (RFC 1123)
hostname_regex = r'^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,18}$'
if re.match(hostname_regex, value):
return True
return False
def list_network_interfaces():
cmd = "ip link show | grep UP"
try:
cmd_output = subprocess.check_output(cmd, shell=True, text=True)
except:
cmd_output = ''
interfaces = []
for line in cmd_output.split('\n'):
if line:
parts = line.split(': ')
if len(parts) > 1:
interface_name = parts[1].split('@')[0]
interfaces.append(interface_name)
return interfaces
def default_sort_peers(wireguard_instance: WireGuardInstance):
unsorted_peers = Peer.objects.filter(wireguard_instance=wireguard_instance, sort_order__lte=0).order_by('created')
highest_sort_order = Peer.objects.filter(wireguard_instance=wireguard_instance).aggregate(Max('sort_order'))['sort_order__max']
if not highest_sort_order:
highest_sort_order = 0
if unsorted_peers:
new_sort_order = highest_sort_order + 1
for peer in unsorted_peers:
peer.sort_order = new_sort_order
peer.save()
new_sort_order += 1
return unsorted_peers
def deduplicate_sort_order(wireguard_instance: WireGuardInstance):
peers = Peer.objects.filter(wireguard_instance=wireguard_instance)
for peer in peers:
duplicated_peers = peers.filter(sort_order=peer.sort_order).exclude(uuid=peer.uuid)
for duplicated_peer in duplicated_peers:
duplicated_peer.sort_order = 0
duplicated_peer.save()
return peers
def check_sort_order_conflict(peer: Peer):
peers = Peer.objects.filter(wireguard_instance=peer.wireguard_instance, sort_order=peer.sort_order).exclude(uuid=peer.uuid)
if peers.exists():
return True
return False
def create_random_password(length, complexity):
if complexity == 'digits':
characters = '0123456789'
elif complexity == 'letters':
characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
elif complexity == 'letters_digits':
characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
else:
characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()'
return ''.join(random.choice(characters) for _ in range(length))
def replace_message_variables(message: str, peer_invite: PeerInvite, invite_settings: InviteSettings):
# The & at the end is to prevent the token from being concatenated with any other template text.
message = message.replace('{invite_url}', f'{invite_settings.invite_url}?token={peer_invite.uuid}&')
message = message.replace('{expire_minutes}', f'{invite_settings.invite_expiration}')
return message
def get_peer_invite_data(peer_invite: PeerInvite, invite_settings: InviteSettings):
data = {
# The & at the end is to prevent the token from being concatenated with any other template text.
'url': f'{invite_settings.invite_url}?token{peer_invite.uuid}&',
'password': peer_invite.invite_password,
'expiration': peer_invite.invite_expiration.isoformat(),
'email_subject': replace_message_variables(invite_settings.invite_email_subject, peer_invite, invite_settings),
'email_body': replace_message_variables(invite_settings.invite_email_body, peer_invite, invite_settings),
'whatsapp_body': replace_message_variables(invite_settings.invite_whatsapp_body, peer_invite, invite_settings),
'text_body': replace_message_variables(invite_settings.invite_text_body, peer_invite, invite_settings),
'uuid': str(peer_invite.uuid),
}
return data
def create_peer_invite(peer: Peer, invite_settings: InviteSettings):
if invite_settings.enforce_random_password or not invite_settings.default_password:
password = create_random_password(invite_settings.random_password_length, invite_settings.random_password_complexity)
else:
password = invite_settings.default_password
peer_invite = PeerInvite.objects.create(
peer=peer, invite_password=password[:32], invite_expiration=timezone.now() + timedelta(minutes=invite_settings.invite_expiration)
)
return peer_invite
def send_email(destination, subject, body):
success = 'error'
message = ''
try:
validate_email(destination)
except ValidationError:
return 'error', 'Invalid email address.'
email_settings = EmailSettings.objects.filter(name='email_settings', enabled=True).first()
if not email_settings:
message = 'Email not configured.'
return success, message
try:
msg = MIMEText(body, 'plain')
msg['Subject'] = subject
msg['From'] = email_settings.smtp_from_address
msg['To'] = destination
if email_settings.smtp_encryption.lower() == 'ssl':
server = smtplib.SMTP_SSL(email_settings.smtp_host, email_settings.smtp_port)
elif email_settings.smtp_encryption.lower() in ['none', 'noauth']:
server = smtplib.SMTP(email_settings.smtp_host, email_settings.smtp_port)
else:
server = smtplib.SMTP(email_settings.smtp_host, email_settings.smtp_port)
server.starttls()
if email_settings.smtp_username and email_settings.smtp_password and email_settings.smtp_encryption.lower() != 'noauth':
server.login(email_settings.smtp_username, email_settings.smtp_password)
server.sendmail(email_settings.smtp_from_address, destination, msg.as_string())
server.quit()
success = 'success'
message = 'Email sent successfully.'
except Exception as e:
success = 'error'
message = f'Error sending email: {str(e)}'
return success, message