2025-03-12 21:13:22 -03:00

218 lines
8.3 KiB
Python

import ipaddress
import random
import re
import smtplib
import subprocess
from datetime import timedelta
from email.mime.text import MIMEText
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.db.models import Max
from django.utils import timezone
from user_manager.models import UserAcl
from vpn_invite.models import InviteSettings, PeerInvite
from wireguard.models import Peer, WireGuardInstance
from wireguard_tools.models import EmailSettings
def user_has_access_to_instance(user_acl: UserAcl, instance: WireGuardInstance):
if user_acl.peer_groups.all():
if user_acl.peer_groups.filter(server_instance=instance).exists():
return True
else:
return True
return False
def user_has_access_to_peer(user_acl: UserAcl, peer: Peer):
if user_acl.peer_groups.all():
if user_acl.peer_groups.filter(peer=peer).exists():
return True
if user_acl.peer_groups.filter(server_instance=peer.wireguard_instance).exists():
return True
else:
return True
return False
def user_allowed_instances(user_acl: UserAcl):
if not user_acl.peer_groups.exists():
return WireGuardInstance.objects.all().order_by('instance_id')
instances_from_groups = WireGuardInstance.objects.filter(peergroup__in=user_acl.peer_groups.all())
instances_from_peers = WireGuardInstance.objects.filter(peer__peergroup__in=user_acl.peer_groups.all())
return instances_from_groups.union(instances_from_peers).order_by('instance_id')
def user_allowed_peers(user_acl: UserAcl, instance: WireGuardInstance):
if not user_acl.peer_groups.exists():
return Peer.objects.filter(wireguard_instance=instance).order_by('sort_order')
peers_from_direct = Peer.objects.filter(
wireguard_instance=instance,
peergroup__in=user_acl.peer_groups.all()
)
peers_from_instance = Peer.objects.filter(
wireguard_instance=instance,
wireguard_instance__peergroup__in=user_acl.peer_groups.filter(server_instance=instance)
)
return peers_from_direct.union(peers_from_instance).order_by('sort_order')
def is_valid_ip_or_hostname(value):
"""Check if a given string is a valid IP address or hostname."""
try:
ipaddress.ip_address(value)
return True
except:
pass
# Regex to check valid hostname (RFC 1123)
hostname_regex = r'^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,18}$'
if re.match(hostname_regex, value):
return True
return False
def list_network_interfaces():
# Executa o comando 'ip link show' com grep para filtrar linhas com 'UP'
cmd = "ip link show | grep UP"
cmd_output = subprocess.check_output(cmd, shell=True, text=True)
# Processa a saída para extrair os nomes das interfaces
interfaces = []
for line in cmd_output.split('\n'):
if line: # Verifica se a linha não está vazia
parts = line.split(': ')
if len(parts) > 1:
# O nome da interface está na segunda posição após o split
interface_name = parts[1].split('@')[0] # Remove qualquer coisa após '@'
interfaces.append(interface_name)
return interfaces
def default_sort_peers(wireguard_instance: WireGuardInstance):
unsorted_peers = Peer.objects.filter(wireguard_instance=wireguard_instance, sort_order__lte=0).order_by('created')
highest_sort_order = Peer.objects.filter(wireguard_instance=wireguard_instance).aggregate(Max('sort_order'))['sort_order__max']
if not highest_sort_order:
highest_sort_order = 0
if unsorted_peers:
new_sort_order = highest_sort_order + 1
for peer in unsorted_peers:
peer.sort_order = new_sort_order
peer.save()
new_sort_order += 1
return unsorted_peers
def deduplicate_sort_order(wireguard_instance: WireGuardInstance):
peers = Peer.objects.filter(wireguard_instance=wireguard_instance)
for peer in peers:
duplicated_peers = peers.filter(sort_order=peer.sort_order).exclude(uuid=peer.uuid)
for duplicated_peer in duplicated_peers:
duplicated_peer.sort_order = 0
duplicated_peer.save()
return peers
def check_sort_order_conflict(peer: Peer):
peers = Peer.objects.filter(wireguard_instance=peer.wireguard_instance, sort_order=peer.sort_order).exclude(uuid=peer.uuid)
if peers.exists():
return True
return False
def create_random_password(length, complexity):
if complexity == 'digits':
characters = '0123456789'
elif complexity == 'letters':
characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
elif complexity == 'letters_digits':
characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
else:
characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()'
return ''.join(random.choice(characters) for _ in range(length))
def replace_message_variables(message: str, peer_invite: PeerInvite, invite_settings: InviteSettings):
# The & at the end is to prevent the token from being concatenated with any other template text.
message = message.replace('{invite_url}', f'{invite_settings.invite_url}?token={peer_invite.uuid}&')
message = message.replace('{expire_minutes}', f'{invite_settings.invite_expiration}')
return message
def get_peer_invite_data(peer_invite: PeerInvite, invite_settings: InviteSettings):
data = {
# The & at the end is to prevent the token from being concatenated with any other template text.
'url': f'{invite_settings.invite_url}?token{peer_invite.uuid}&',
'password': peer_invite.invite_password,
'expiration': peer_invite.invite_expiration.isoformat(),
'email_subject': replace_message_variables(invite_settings.invite_email_subject, peer_invite, invite_settings),
'email_body': replace_message_variables(invite_settings.invite_email_body, peer_invite, invite_settings),
'whatsapp_body': replace_message_variables(invite_settings.invite_whatsapp_body, peer_invite, invite_settings),
'text_body': replace_message_variables(invite_settings.invite_text_body, peer_invite, invite_settings),
'uuid': str(peer_invite.uuid),
}
return data
def create_peer_invite(peer: Peer, invite_settings: InviteSettings):
if invite_settings.enforce_random_password or not invite_settings.default_password:
password = create_random_password(invite_settings.random_password_length, invite_settings.random_password_complexity)
else:
password = invite_settings.default_password
peer_invite = PeerInvite.objects.create(
peer=peer, invite_password=password[:32], invite_expiration=timezone.now() + timedelta(minutes=invite_settings.invite_expiration)
)
return peer_invite
def send_email(destination, subject, body):
success = 'error'
message = ''
try:
validate_email(destination)
except ValidationError:
return 'error', 'Invalid email address.'
email_settings = EmailSettings.objects.filter(name='email_settings', enabled=True).first()
if not email_settings:
message = 'Email not configured.'
return success, message
try:
msg = MIMEText(body, 'plain')
msg['Subject'] = subject
msg['From'] = email_settings.smtp_from_address
msg['To'] = destination
if email_settings.smtp_encryption.lower() == 'ssl':
server = smtplib.SMTP_SSL(email_settings.smtp_host, email_settings.smtp_port)
elif email_settings.smtp_encryption.lower() in ['none', 'noauth']:
server = smtplib.SMTP(email_settings.smtp_host, email_settings.smtp_port)
else:
server = smtplib.SMTP(email_settings.smtp_host, email_settings.smtp_port)
server.starttls()
if email_settings.smtp_username and email_settings.smtp_password and email_settings.smtp_encryption.lower() != 'noauth':
server.login(email_settings.smtp_username, email_settings.smtp_password)
server.sendmail(email_settings.smtp_from_address, destination, msg.as_string())
server.quit()
success = 'success'
message = 'Email sent successfully.'
except Exception as e:
success = 'error'
message = f'Error sending email: {str(e)}'
return success, message