Update firewall

This commit is contained in:
MacRimi
2026-02-14 16:49:08 +01:00
parent ace4d83789
commit 1ee5863da7
3 changed files with 312 additions and 56 deletions

View File

@@ -274,6 +274,96 @@ def add_firewall_rule(direction="IN", action="ACCEPT", protocol="tcp", dport="",
return False, f"Failed to add firewall rule: {str(e)}"
def edit_firewall_rule(rule_index, level="host", direction="IN", action="ACCEPT",
protocol="tcp", dport="", sport="", source="", iface="", comment=""):
"""
Edit an existing firewall rule by replacing it in-place.
Deletes the old rule at rule_index and inserts the new one at the same position.
Returns (success, message)
"""
# Validate inputs
action = action.upper()
if action not in ("ACCEPT", "DROP", "REJECT"):
return False, f"Invalid action: {action}. Must be ACCEPT, DROP, or REJECT"
direction = direction.upper()
if direction not in ("IN", "OUT"):
return False, f"Invalid direction: {direction}. Must be IN or OUT"
# Build new rule line
parts = [direction, action]
if protocol:
parts.extend(["-p", protocol.lower()])
if dport:
if not re.match(r'^[\d:,]+$', dport):
return False, f"Invalid destination port: {dport}"
parts.extend(["-dport", dport])
if sport:
if not re.match(r'^[\d:,]+$', sport):
return False, f"Invalid source port: {sport}"
parts.extend(["-sport", sport])
if source:
parts.extend(["-source", source])
if iface:
parts.extend(["-i", iface])
parts.extend(["-log", "nolog"])
if comment:
safe_comment = re.sub(r'[^\w\s\-._/():]', '', comment)
parts.append(f"# {safe_comment}")
new_rule_line = " ".join(parts)
# Determine target file
if level == "cluster":
fw_file = CLUSTER_FW
else:
fw_file = os.path.join(HOST_FW_DIR, "host.fw")
if not os.path.isfile(fw_file):
return False, "Firewall config file not found"
try:
with open(fw_file, 'r') as f:
content = f.read()
lines = content.splitlines()
new_lines = []
in_rules = False
current_rule_idx = 0
replaced = False
for line in lines:
stripped = line.strip()
if stripped.startswith('['):
section_match = re.match(r'\[(\w+)\]', stripped)
if section_match:
section = section_match.group(1).upper()
in_rules = section in ("RULES", "IN", "OUT")
if in_rules and stripped and not stripped.startswith('#') and not stripped.startswith('['):
if current_rule_idx == rule_index:
# Replace the old rule with the new one
new_lines.append(new_rule_line)
replaced = True
current_rule_idx += 1
continue
current_rule_idx += 1
new_lines.append(line)
if not replaced:
return False, f"Rule index {rule_index} not found"
with open(fw_file, 'w') as f:
f.write("\n".join(new_lines) + "\n")
_run_cmd(["pve-firewall", "reload"])
return True, f"Firewall rule updated: {direction} {action} {protocol}{':' + dport if dport else ''}"
except PermissionError:
return False, "Permission denied. Cannot modify firewall config."
except Exception as e:
return False, f"Failed to edit rule: {str(e)}"
def delete_firewall_rule(rule_index, level="host"):
"""
Delete a firewall rule by index from host or cluster config.