mirror of
https://github.com/eduardogsilva/wireguard_webadmin.git
synced 2025-08-26 21:31:14 +00:00
Legacy firewall migrate routines and export fw rules.
This commit is contained in:
@@ -142,8 +142,6 @@ class FirewallSettingsForm(forms.ModelForm):
|
||||
if not interface.startswith('wg') and interface != 'lo':
|
||||
interface_choices.append((interface, interface))
|
||||
|
||||
#if interface.startswith('wg'):
|
||||
# list_network_interfaces().remove(interface)
|
||||
default_forward_policy = forms.ChoiceField(label='Default Forward Policy', choices=[('accept', 'ACCEPT'), ('reject', 'REJECT'), ('drop', 'DROP')], initial='accept')
|
||||
allow_peer_to_peer = forms.BooleanField(label='Allow Peer to Peer', required=False)
|
||||
allow_instance_to_instance = forms.BooleanField(label='Allow Instance to Instance', required=False)
|
||||
|
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 5.0.2 on 2024-03-04 11:21
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('firewall', '0010_alter_firewallrule_firewall_chain'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='firewallsettings',
|
||||
name='last_firewall_reset',
|
||||
field=models.DateTimeField(blank=True, null=True),
|
||||
),
|
||||
]
|
@@ -73,6 +73,7 @@ class FirewallSettings(models.Model):
|
||||
allow_instance_to_instance = models.BooleanField(default=True)
|
||||
wan_interface = models.CharField(max_length=12, default='eth0')
|
||||
pending_changes = models.BooleanField(default=False)
|
||||
last_firewall_reset = models.DateTimeField(blank=True, null=True)
|
||||
|
||||
created = models.DateTimeField(auto_now_add=True)
|
||||
updated = models.DateTimeField(auto_now=True)
|
||||
|
@@ -1,10 +1,10 @@
|
||||
from firewall.models import FirewallRule
|
||||
from wireguard.models import Peer, PeerAllowedIP
|
||||
from firewall.models import FirewallRule, FirewallSettings, RedirectRule
|
||||
from wireguard.models import Peer, PeerAllowedIP, WireGuardInstance
|
||||
from django.utils import timezone
|
||||
|
||||
|
||||
def get_peer_addresses(peers, include_networks):
|
||||
addresses = []
|
||||
# Indica se algum peer está completamente sem PeerAllowedIP
|
||||
missing_ip = False
|
||||
for peer in peers.all():
|
||||
peer_ips = peer.peerallowedip_set.all().order_by('priority')
|
||||
if not include_networks:
|
||||
@@ -13,17 +13,65 @@ def get_peer_addresses(peers, include_networks):
|
||||
if peer_ips.exists():
|
||||
addresses.extend([f"{peer_ip.allowed_ip}/{peer_ip.netmask}" for peer_ip in peer_ips])
|
||||
else:
|
||||
missing_ip = True
|
||||
addresses.append(f"Missing IP for selected peer: {peer}")
|
||||
|
||||
return addresses, missing_ip
|
||||
return addresses
|
||||
|
||||
def generate_iptable_rules():
|
||||
|
||||
def reset_firewall_to_default():
|
||||
for wireguard_instance in WireGuardInstance.objects.all():
|
||||
wireguard_instance.pending_changes = True
|
||||
wireguard_instance.legacy_firewall = False
|
||||
wireguard_instance.post_up = ''
|
||||
wireguard_instance.post_down = ''
|
||||
wireguard_instance.save()
|
||||
firewall_settings, firewall_settings_created = FirewallSettings.objects.get_or_create(name='global')
|
||||
firewall_settings.pending_changes = True
|
||||
firewall_settings.last_firewall_reset = timezone.now()
|
||||
firewall_settings.allow_peer_to_peer = True
|
||||
firewall_settings.allow_instance_to_instance = True
|
||||
firewall_settings.wan_interface = 'eth0'
|
||||
firewall_settings.default_forward_policy = 'drop'
|
||||
firewall_settings.save()
|
||||
|
||||
FirewallRule.objects.all().delete()
|
||||
RedirectRule.objects.all().delete()
|
||||
|
||||
FirewallRule.objects.create(
|
||||
firewall_chain='postrouting', sort_order=0, out_interface=firewall_settings.wan_interface, rule_action='masquerade',
|
||||
description='Masquerade traffic from VPN to WAN',
|
||||
)
|
||||
|
||||
FirewallRule.objects.create(
|
||||
firewall_chain='forward', sort_order=0, rule_action='accept', description='Allow established/related traffic',
|
||||
state_established=True, state_related=True
|
||||
)
|
||||
FirewallRule.objects.create(
|
||||
firewall_chain='forward', sort_order=1, rule_action='reject', description='Reject traffic to private networks exiting on WAN interface',
|
||||
in_interface='wg+', out_interface=firewall_settings.wan_interface, destination_ip='10.0.0.0', destination_netmask=8
|
||||
)
|
||||
FirewallRule.objects.create(
|
||||
firewall_chain='forward', sort_order=2, rule_action='reject', description='Reject traffic to private networks exiting on WAN interface',
|
||||
in_interface='wg+', out_interface=firewall_settings.wan_interface, destination_ip='172.16.0.0', destination_netmask=12
|
||||
)
|
||||
FirewallRule.objects.create(
|
||||
firewall_chain='forward', sort_order=3, rule_action='reject', description='Reject traffic to private networks exiting on WAN interface',
|
||||
in_interface='wg+', out_interface=firewall_settings.wan_interface, destination_ip='192.168.0.0', destination_netmask=16
|
||||
)
|
||||
FirewallRule.objects.create(
|
||||
firewall_chain='forward', sort_order=10, rule_action='accept', description='Allow traffic from VPN to WAN',
|
||||
in_interface='wg+', out_interface=firewall_settings.wan_interface
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
def export_user_firewall():
|
||||
iptables_rules = []
|
||||
rules = FirewallRule.objects.all().order_by('firewall_chain', 'sort_order')
|
||||
|
||||
for rule in rules:
|
||||
source_addresses, source_missing_ip = get_peer_addresses(rule.source_peer, rule.source_peer_include_networks)
|
||||
destination_addresses, destination_missing_ip = get_peer_addresses(rule.destination_peer, rule.destination_peer_include_networks)
|
||||
source_addresses = get_peer_addresses(rule.source_peer, rule.source_peer_include_networks)
|
||||
destination_addresses = get_peer_addresses(rule.destination_peer, rule.destination_peer_include_networks)
|
||||
|
||||
# Adiciona source_ip/destination_ip às listas, se definidos
|
||||
if rule.source_ip:
|
||||
@@ -41,19 +89,24 @@ def generate_iptable_rules():
|
||||
|
||||
for protocol in protocols:
|
||||
for source in source_addresses:
|
||||
for destination in destination_addresses:
|
||||
if source and "Missing IP for selected peer:" in source:
|
||||
description = f" - {rule.description}" if rule.description else ""
|
||||
comment = f"\n# {rule.sort_order} - {rule.uuid}{description}"
|
||||
# Pula a geração de regra se um dos peers estiver faltando IP e for relevante para essa combinação
|
||||
if (source is None and source_missing_ip) or (destination is None and destination_missing_ip):
|
||||
iptables_rule = f"{comment} - Missing ip for selected peer\n"
|
||||
iptables_rules.append(iptables_rule)
|
||||
iptables_rules.append(f"# {rule.sort_order} - {rule.uuid}{description} - {source}\n")
|
||||
continue
|
||||
|
||||
for destination in destination_addresses:
|
||||
if destination and "Missing IP for selected peer:" in destination:
|
||||
description = f" - {rule.description}" if rule.description else ""
|
||||
iptables_rules.append(f"# {rule.sort_order} - {rule.uuid}{description} - {destination}\n")
|
||||
continue
|
||||
comment += '\n'
|
||||
|
||||
description = f" - {rule.description}" if rule.description else ""
|
||||
comment = f"# {rule.sort_order} - {rule.uuid}{description}\n"
|
||||
|
||||
if rule.firewall_chain == "forward":
|
||||
rule_base = "iptables -t filter -A FORWARD "
|
||||
rule_base = "iptables -t filter -A WGWADM_FORWARD "
|
||||
elif rule.firewall_chain == "postrouting":
|
||||
rule_base = "iptables -t nat -A POSTROUTING "
|
||||
rule_base = "iptables -t nat -A WGWADM_POSTROUTING "
|
||||
else:
|
||||
rule_base = f"#iptables -A {rule.firewall_chain.upper()} "
|
||||
rule_protocol = f"-p {protocol} " if protocol else ""
|
||||
@@ -61,6 +114,8 @@ def generate_iptable_rules():
|
||||
rule_source = f"-s {source} " if source else ""
|
||||
rule_destination = f"-d {destination} " if destination else ""
|
||||
rule_action = f"-j {rule.rule_action.upper()}"
|
||||
rule_in_interface = f"-i {rule.in_interface} " if rule.in_interface else ""
|
||||
rule_out_interface = f"-o {rule.out_interface} " if rule.out_interface else ""
|
||||
|
||||
states = []
|
||||
if rule.state_new:
|
||||
@@ -80,9 +135,83 @@ def generate_iptable_rules():
|
||||
not_source = "! " if rule.not_source and source else ""
|
||||
not_destination = "! " if rule.not_destination and destination else ""
|
||||
|
||||
|
||||
iptables_rule = f"{comment}{rule_base}{not_source}{rule_source}{not_destination}{rule_destination}{rule_state}{rule_protocol}{rule_destination_port}{rule_action}\n"
|
||||
iptables_rule = f"{comment}{rule_base}{rule_in_interface}{rule_out_interface}{not_source}{rule_source}{not_destination}{rule_destination}{rule_state}{rule_protocol}{rule_destination_port}{rule_action}\n"
|
||||
iptables_rules.append(iptables_rule)
|
||||
|
||||
return "".join(iptables_rules)
|
||||
|
||||
|
||||
def generate_firewall_header():
|
||||
firewall_settings, firewall_settings_created = FirewallSettings.objects.get_or_create(name='global')
|
||||
header = f'''#!/bin/bash
|
||||
# Description: Firewall rules for WireGuard_WebAdmin
|
||||
# Do not edit this file directly. Use the web interface to manage firewall rules.
|
||||
#
|
||||
# This script was generated by WireGuard_WebAdmin on {timezone.now().strftime('%Y-%m-%d %H:%M:%S %Z')}
|
||||
#
|
||||
|
||||
iptables -t nat -N WGWADM_POSTROUTING >> /dev/null 2>&1
|
||||
iptables -t nat -N WGWADM_PREROUTING >> /dev/null 2>&1
|
||||
iptables -t filter -N WGWADM_FORWARD >> /dev/null 2>&1
|
||||
|
||||
iptables -t nat -F WGWADM_POSTROUTING
|
||||
iptables -t nat -F WGWADM_PREROUTING
|
||||
iptables -t filter -F WGWADM_FORWARD
|
||||
|
||||
iptables -t nat -D POSTROUTING -j WGWADM_POSTROUTING >> /dev/null 2>&1
|
||||
iptables -t nat -D PREROUTING -j WGWADM_PREROUTING >> /dev/null 2>&1
|
||||
iptables -t filter -D FORWARD -j WGWADM_FORWARD >> /dev/null 2>&1
|
||||
|
||||
iptables -t nat -I POSTROUTING -j WGWADM_POSTROUTING
|
||||
iptables -t nat -I PREROUTING -j WGWADM_PREROUTING
|
||||
iptables -t filter -I FORWARD -j WGWADM_FORWARD
|
||||
'''
|
||||
|
||||
|
||||
return header
|
||||
|
||||
def generate_firewall_footer():
|
||||
firewall_settings, firewall_settings_created = FirewallSettings.objects.get_or_create(name='global')
|
||||
footer = '# The following rules come from Firewall settings\n'
|
||||
footer += '# Default FORWARD policy\n'
|
||||
footer += f'iptables -t filter -P FORWARD {firewall_settings.default_forward_policy.upper()}\n'
|
||||
|
||||
footer += '# Same instance Peer to Peer traffic\n'
|
||||
for wireguard_instance in WireGuardInstance.objects.all().order_by('instance_id'):
|
||||
footer += f'iptables -t filter -A WGWADM_FORWARD -i wg{wireguard_instance.instance_id} -o wg{wireguard_instance.instance_id} -j '
|
||||
footer += 'ACCEPT\n' if firewall_settings.allow_peer_to_peer else "REJECT\n"
|
||||
footer += '# Instance to Instance traffic\n'
|
||||
footer += 'iptables -t filter -A WGWADM_FORWARD -i wg+ -o wg+ -j '
|
||||
footer += 'ACCEPT\n' if firewall_settings.allow_instance_to_instance else "REJECT\n"
|
||||
return footer
|
||||
|
||||
|
||||
def generate_port_forward_firewall():
|
||||
firewall_settings, firewall_settings_created = FirewallSettings.objects.get_or_create(name='global')
|
||||
redirect_firewall = ''
|
||||
wan_interface = firewall_settings.wan_interface
|
||||
|
||||
for redirect_rule in RedirectRule.objects.all().order_by('port'):
|
||||
description = f" - {redirect_rule.description} " if redirect_rule.description else ""
|
||||
rule_destination = redirect_rule.ip_address
|
||||
if redirect_rule.peer:
|
||||
peer_allowed_ip_address = PeerAllowedIP.objects.filter(peer=redirect_rule.peer, netmask=32, priority=0).first()
|
||||
if peer_allowed_ip_address:
|
||||
rule_destination = peer_allowed_ip_address.allowed_ip
|
||||
if rule_destination:
|
||||
rule_text = f"# {redirect_rule.port}/{redirect_rule.protocol} - {redirect_rule.uuid} - Port Forward Rule set{description}\n"
|
||||
rule_text += f"iptables -t nat -A WGWADM_PREROUTING -p {redirect_rule.protocol} -d wireguard-webadmin -i {wan_interface} --dport {redirect_rule.port} -j DNAT --to-dest {rule_destination}:{redirect_rule.port}\n"
|
||||
|
||||
if redirect_rule.masquerade_source:
|
||||
rule_text += f"iptables -t nat -A WGWADM_POSTROUTING -p {redirect_rule.protocol} -d {rule_destination} -o wg+ --dport {redirect_rule.port} -j MASQUERADE\n"
|
||||
|
||||
if redirect_rule.add_forward_rule:
|
||||
rule_text += f"iptables -t filter -A WGWADM_FORWARD -p {redirect_rule.protocol} -d {rule_destination} -i {wan_interface} -o wg+ --dport {redirect_rule.port} -j ACCEPT\n"
|
||||
|
||||
redirect_firewall += rule_text
|
||||
|
||||
else:
|
||||
rule_text = f"# {redirect_rule.port}/{redirect_rule.protocol} - {redirect_rule.uuid} - Port Forward Rule set{description} - Missing IP for selected peer: {redirect_rule.peer}\n"
|
||||
redirect_firewall += rule_text
|
||||
|
||||
return redirect_firewall
|
@@ -6,11 +6,16 @@ from firewall.forms import RedirectRuleForm, FirewallRuleForm, FirewallSettingsF
|
||||
from django.contrib import messages
|
||||
from wireguard.models import WireGuardInstance
|
||||
from user_manager.models import UserAcl
|
||||
from firewall.tools import generate_iptable_rules
|
||||
from firewall.tools import export_user_firewall, generate_firewall_header, generate_firewall_footer, generate_port_forward_firewall, reset_firewall_to_default
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.utils import timezone
|
||||
|
||||
|
||||
@login_required
|
||||
def view_redirect_rule_list(request):
|
||||
wireguard_instances = WireGuardInstance.objects.all().order_by('instance_id')
|
||||
if wireguard_instances.filter(legacy_firewall=True).exists():
|
||||
return redirect('/firewall/migration_required/')
|
||||
if wireguard_instances.filter(pending_changes=True).exists():
|
||||
pending_changes_warning = True
|
||||
else:
|
||||
@@ -24,6 +29,7 @@ def view_redirect_rule_list(request):
|
||||
return render(request, 'firewall/redirect_rule_list.html', context=context)
|
||||
|
||||
|
||||
@login_required
|
||||
def manage_redirect_rule(request):
|
||||
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=40).exists():
|
||||
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
|
||||
@@ -59,8 +65,11 @@ def manage_redirect_rule(request):
|
||||
return render(request, 'firewall/manage_redirect_rule.html', context=context)
|
||||
|
||||
|
||||
@login_required
|
||||
def view_firewall_rule_list(request):
|
||||
wireguard_instances = WireGuardInstance.objects.all().order_by('instance_id')
|
||||
if wireguard_instances.filter(legacy_firewall=True).exists():
|
||||
return redirect('/firewall/migration_required/')
|
||||
firewall_settings, firewall_settings_created = FirewallSettings.objects.get_or_create(name='global')
|
||||
current_chain = request.GET.get('chain', 'forward')
|
||||
if current_chain not in ['forward', 'portforward', 'postrouting']:
|
||||
@@ -81,6 +90,7 @@ def view_firewall_rule_list(request):
|
||||
return render(request, 'firewall/firewall_rule_list.html', context=context)
|
||||
|
||||
|
||||
@login_required
|
||||
def manage_firewall_rule(request):
|
||||
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=40).exists():
|
||||
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
|
||||
@@ -96,6 +106,12 @@ def manage_firewall_rule(request):
|
||||
firewall_settings.pending_changes = True
|
||||
firewall_settings.save()
|
||||
instance.delete()
|
||||
# Marking wireguard_instance as having pending changes, not the best way to do this, but it works for now.
|
||||
# I will improve it later.
|
||||
wireguard_instance = WireGuardInstance.objects.all().first()
|
||||
if wireguard_instance:
|
||||
wireguard_instance.pending_changes = True
|
||||
wireguard_instance.save()
|
||||
messages.success(request, 'Firewall rule deleted successfully')
|
||||
else:
|
||||
messages.warning(request, 'Error deleting Firewall rule|Confirmation did not match. Firewall rule was not deleted.')
|
||||
@@ -111,6 +127,12 @@ def manage_firewall_rule(request):
|
||||
firewall_settings.save()
|
||||
form.save()
|
||||
messages.success(request, 'Firewall rule saved successfully')
|
||||
# Marking wireguard_instance as having pending changes, not the best way to do this, but it works for now.
|
||||
# I will improve it later.
|
||||
wireguard_instance = WireGuardInstance.objects.all().first()
|
||||
if wireguard_instance:
|
||||
wireguard_instance.pending_changes = True
|
||||
wireguard_instance.save()
|
||||
return redirect('/firewall/rule_list/?chain=' + current_chain)
|
||||
else:
|
||||
form = FirewallRuleForm(instance=instance, current_chain=current_chain)
|
||||
@@ -132,6 +154,7 @@ def manage_firewall_rule(request):
|
||||
return render(request, 'firewall/manage_firewall_rule.html', context=context)
|
||||
|
||||
|
||||
@login_required
|
||||
def view_manage_firewall_settings(request):
|
||||
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=40).exists():
|
||||
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
|
||||
@@ -162,12 +185,39 @@ def view_manage_firewall_settings(request):
|
||||
return render(request, 'firewall/manage_firewall_settings.html', context=context)
|
||||
|
||||
|
||||
@login_required
|
||||
def view_generate_iptables_script(request):
|
||||
data = {'status': 'ok'}
|
||||
firewall_rule_list = FirewallRule.objects.all().order_by('firewall_chain', 'sort_order')
|
||||
for rule in firewall_rule_list:
|
||||
print(str(rule.sort_order) + ' - ' + str(rule.uuid))
|
||||
|
||||
rules_text = generate_iptable_rules()
|
||||
print(rules_text)
|
||||
#firewall_header = generate_firewall_header()
|
||||
#port_forward_firewall = generate_port_forward_firewall()
|
||||
#user_firewall = export_user_firewall()
|
||||
#firewall_footer = generate_firewall_footer()
|
||||
#print(port_forward_firewall)
|
||||
#print(firewall_header)
|
||||
#print(user_firewall)
|
||||
#print(firewall_footer)
|
||||
|
||||
return JsonResponse(data)
|
||||
|
||||
|
||||
@login_required
|
||||
def view_reset_firewall(request):
|
||||
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=40).exists():
|
||||
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
|
||||
if request.GET.get('confirmation') == 'delete all rules and reset firewall':
|
||||
reset_firewall_to_default()
|
||||
messages.success(request, 'VPN Firewall|Firewall reset to default successfully!')
|
||||
else:
|
||||
messages.warning(request, 'VPN Firewall|Firewall was not reset to default. Confirmation did not match.')
|
||||
return redirect('/firewall/rule_list/')
|
||||
|
||||
|
||||
@login_required
|
||||
def view_firewall_migration_required(request):
|
||||
if not WireGuardInstance.objects.filter(legacy_firewall=True).exists():
|
||||
messages.warning(request, 'No Firewall Migration pending|No WireGuard instances with legacy firewall settings found.')
|
||||
return redirect('/firewall/rule_list/')
|
||||
if not UserAcl.objects.filter(user=request.user).filter(user_level__gte=40).exists():
|
||||
return render(request, 'access_denied.html', {'page_title': 'Access Denied'})
|
||||
|
||||
return render(request, 'firewall/firewall_migration_required.html')
|
Reference in New Issue
Block a user