update firewall

This commit is contained in:
MacRimi
2026-02-08 16:19:48 +01:00
parent f07e8cfe14
commit 7f9da757aa
3 changed files with 532 additions and 78 deletions

View File

@@ -106,10 +106,12 @@ def get_firewall_status():
def _parse_firewall_rules():
"""Parse all firewall rules from cluster and host configs"""
rules = []
rule_idx_by_file = {} # Track rule index per file for deletion
for fw_file, source in [(CLUSTER_FW, "cluster"), (os.path.join(HOST_FW_DIR, "host.fw"), "host")]:
if not os.path.isfile(fw_file):
continue
rule_idx_by_file[source] = 0
try:
with open(fw_file, 'r') as f:
content = f.read()
@@ -132,7 +134,9 @@ def _parse_firewall_rules():
if in_rules or section in ("RULES", "IN", "OUT"):
rule = _parse_rule_line(line, source, section)
if rule:
rule["rule_index"] = rule_idx_by_file[source]
rules.append(rule)
rule_idx_by_file[source] += 1
except Exception:
pass
@@ -181,6 +185,152 @@ def _parse_rule_line(line, source, section):
return rule
def add_firewall_rule(direction="IN", action="ACCEPT", protocol="tcp", dport="", sport="",
source="", dest="", iface="", comment="", level="host"):
"""
Add a custom firewall rule to host or cluster firewall config.
Returns (success, message)
"""
# Validate inputs
action = action.upper()
if action not in ("ACCEPT", "DROP", "REJECT"):
return False, f"Invalid action: {action}. Must be ACCEPT, DROP, or REJECT"
direction = direction.upper()
if direction not in ("IN", "OUT"):
return False, f"Invalid direction: {direction}. Must be IN or OUT"
# Build rule line
parts = [direction, action]
if protocol:
parts.extend(["-p", protocol.lower()])
if dport:
# Validate port
if not re.match(r'^[\d:,]+$', dport):
return False, f"Invalid destination port: {dport}"
parts.extend(["-dport", dport])
if sport:
if not re.match(r'^[\d:,]+$', sport):
return False, f"Invalid source port: {sport}"
parts.extend(["-sport", sport])
if source:
parts.extend(["-source", source])
if dest:
parts.extend(["-dest", dest])
if iface:
parts.extend(["-i", iface])
parts.extend(["-log", "nolog"])
if comment:
# Sanitize comment
safe_comment = re.sub(r'[^\w\s\-._/():]', '', comment)
parts.append(f"# {safe_comment}")
rule_line = " ".join(parts)
# Determine target file
if level == "cluster":
fw_file = CLUSTER_FW
else:
fw_file = os.path.join(HOST_FW_DIR, "host.fw")
try:
content = ""
has_rules_section = False
if os.path.isfile(fw_file):
with open(fw_file, 'r') as f:
content = f.read()
has_rules_section = "[RULES]" in content
if has_rules_section:
lines = content.splitlines()
new_lines = []
inserted = False
for line in lines:
new_lines.append(line)
if not inserted and line.strip() == "[RULES]":
new_lines.append(rule_line)
inserted = True
content = "\n".join(new_lines) + "\n"
else:
if content and not content.endswith("\n"):
content += "\n"
content += "\n[RULES]\n"
content += rule_line + "\n"
os.makedirs(os.path.dirname(fw_file), exist_ok=True)
with open(fw_file, 'w') as f:
f.write(content)
_run_cmd(["pve-firewall", "reload"])
return True, f"Firewall rule added: {direction} {action} {protocol}{':' + dport if dport else ''}"
except PermissionError:
return False, "Permission denied. Cannot write to firewall config."
except Exception as e:
return False, f"Failed to add firewall rule: {str(e)}"
def delete_firewall_rule(rule_index, level="host"):
"""
Delete a firewall rule by index from host or cluster config.
The index corresponds to the order of rules in [RULES] section.
Returns (success, message)
"""
if level == "cluster":
fw_file = CLUSTER_FW
else:
fw_file = os.path.join(HOST_FW_DIR, "host.fw")
if not os.path.isfile(fw_file):
return False, "Firewall config file not found"
try:
with open(fw_file, 'r') as f:
content = f.read()
lines = content.splitlines()
new_lines = []
in_rules = False
current_rule_idx = 0
removed_rule = None
for line in lines:
stripped = line.strip()
if stripped.startswith('['):
section_match = re.match(r'\[(\w+)\]', stripped)
if section_match:
section = section_match.group(1).upper()
in_rules = section in ("RULES", "IN", "OUT")
if in_rules and stripped and not stripped.startswith('#') and not stripped.startswith('['):
# This is a rule line
if current_rule_idx == rule_index:
removed_rule = stripped
current_rule_idx += 1
continue # Skip this line (delete it)
current_rule_idx += 1
new_lines.append(line)
if removed_rule is None:
return False, f"Rule index {rule_index} not found"
with open(fw_file, 'w') as f:
f.write("\n".join(new_lines) + "\n")
_run_cmd(["pve-firewall", "reload"])
return True, f"Firewall rule deleted: {removed_rule}"
except PermissionError:
return False, "Permission denied. Cannot modify firewall config."
except Exception as e:
return False, f"Failed to delete rule: {str(e)}"
def add_monitor_port_rule():
"""
Add a firewall rule to allow port 8008 (ProxMenux Monitor) on the host.