2026-02-08 13:19:24 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ProxMenux Security Manager
Handles Proxmox firewall status , rules , and security tool detection .
"""
import os
import json
import subprocess
import re
# =================================================================
# Proxmox Firewall Management
# =================================================================
# Proxmox firewall config paths
CLUSTER_FW = " /etc/pve/firewall/cluster.fw "
HOST_FW_DIR = " /etc/pve/local " # host.fw is per-node
def _run_cmd ( cmd , timeout = 10 ) :
""" Run a shell command and return (returncode, stdout, stderr) """
try :
result = subprocess . run (
cmd , capture_output = True , text = True , timeout = timeout
)
return result . returncode , result . stdout . strip ( ) , result . stderr . strip ( )
except subprocess . TimeoutExpired :
return - 1 , " " , " Command timed out "
except FileNotFoundError :
return - 1 , " " , f " Command not found: { cmd [ 0 ] } "
except Exception as e :
return - 1 , " " , str ( e )
def get_firewall_status ( ) :
"""
Get the overall Proxmox firewall status .
Returns dict with status info .
"""
result = {
" pve_firewall_installed " : False ,
" pve_firewall_active " : False ,
" cluster_fw_enabled " : False ,
" host_fw_enabled " : False ,
" rules_count " : 0 ,
" rules " : [ ] ,
" monitor_port_open " : False ,
}
# Check if pve-firewall service exists
rc , out , _ = _run_cmd ( [ " systemctl " , " is-active " , " pve-firewall " ] )
result [ " pve_firewall_installed " ] = rc == 0 or " inactive " in out or " active " in out
result [ " pve_firewall_active " ] = ( rc == 0 and out == " active " )
# If not installed or inactive, check if the service unit exists
if not result [ " pve_firewall_installed " ] :
rc2 , _ , _ = _run_cmd ( [ " systemctl " , " cat " , " pve-firewall " ] )
result [ " pve_firewall_installed " ] = rc2 == 0
# Parse cluster firewall config
if os . path . isfile ( CLUSTER_FW ) :
try :
with open ( CLUSTER_FW , ' r ' ) as f :
content = f . read ( )
# Check if firewall is enabled at cluster level
for line in content . splitlines ( ) :
line = line . strip ( )
if line . lower ( ) . startswith ( " enable: " ) :
val = line . split ( " : " , 1 ) [ 1 ] . strip ( )
result [ " cluster_fw_enabled " ] = val == " 1 "
break
except Exception :
pass
# Parse host firewall config
host_fw = os . path . join ( HOST_FW_DIR , " host.fw " )
if os . path . isfile ( host_fw ) :
try :
with open ( host_fw , ' r ' ) as f :
content = f . read ( )
for line in content . splitlines ( ) :
line = line . strip ( )
if line . lower ( ) . startswith ( " enable: " ) :
val = line . split ( " : " , 1 ) [ 1 ] . strip ( )
result [ " host_fw_enabled " ] = val == " 1 "
break
except Exception :
pass
# Get rules
rules = _parse_firewall_rules ( )
result [ " rules " ] = rules
result [ " rules_count " ] = len ( rules )
# Check if port 8008 is allowed
for rule in rules :
dport = str ( rule . get ( " dport " , " " ) )
if " 8008 " in dport and rule . get ( " action " , " " ) . upper ( ) == " ACCEPT " :
result [ " monitor_port_open " ] = True
break
return result
def _parse_firewall_rules ( ) :
""" Parse all firewall rules from cluster and host configs """
rules = [ ]
2026-02-08 16:19:48 +01:00
rule_idx_by_file = { } # Track rule index per file for deletion
2026-02-08 13:19:24 +01:00
for fw_file , source in [ ( CLUSTER_FW , " cluster " ) , ( os . path . join ( HOST_FW_DIR , " host.fw " ) , " host " ) ] :
if not os . path . isfile ( fw_file ) :
continue
2026-02-08 16:19:48 +01:00
rule_idx_by_file [ source ] = 0
2026-02-08 13:19:24 +01:00
try :
with open ( fw_file , ' r ' ) as f :
content = f . read ( )
in_rules = False
section = " "
for line in content . splitlines ( ) :
line = line . strip ( )
if not line or line . startswith ( ' # ' ) :
continue
# Detect section headers
if line . startswith ( ' [ ' ) :
section_match = re . match ( r ' \ [( \ w+) \ ] ' , line )
if section_match :
section = section_match . group ( 1 ) . upper ( )
in_rules = section in ( " RULES " , " IN " , " OUT " )
continue
if in_rules or section in ( " RULES " , " IN " , " OUT " ) :
rule = _parse_rule_line ( line , source , section )
if rule :
2026-02-08 16:19:48 +01:00
rule [ " rule_index " ] = rule_idx_by_file [ source ]
2026-02-08 13:19:24 +01:00
rules . append ( rule )
2026-02-08 16:19:48 +01:00
rule_idx_by_file [ source ] + = 1
2026-02-08 13:19:24 +01:00
except Exception :
pass
return rules
def _parse_rule_line ( line , source , section ) :
""" Parse a single firewall rule line """
# Proxmox rule format: |ACTION MACRO(params) -option value ...
# or: IN/OUT ACTION -p proto -dport port -source addr
parts = line . split ( )
if len ( parts ) < 2 :
return None
rule = {
" raw " : line ,
" source_file " : source ,
" section " : section ,
}
idx = 0
# Direction
if parts [ 0 ] . upper ( ) in ( " IN " , " OUT " ) :
rule [ " direction " ] = parts [ 0 ] . upper ( )
idx = 1
elif section in ( " IN " , ) :
rule [ " direction " ] = " IN "
elif section in ( " OUT " , ) :
rule [ " direction " ] = " OUT "
if idx < len ( parts ) :
rule [ " action " ] = parts [ idx ] . upper ( )
idx + = 1
# Parse options
while idx < len ( parts ) :
opt = parts [ idx ]
if opt . startswith ( " - " ) and idx + 1 < len ( parts ) :
key = opt . lstrip ( " - " )
val = parts [ idx + 1 ]
rule [ key ] = val
idx + = 2
else :
idx + = 1
return rule
2026-02-08 16:19:48 +01:00
def add_firewall_rule ( direction = " IN " , action = " ACCEPT " , protocol = " tcp " , dport = " " , sport = " " ,
source = " " , dest = " " , iface = " " , comment = " " , level = " host " ) :
"""
Add a custom firewall rule to host or cluster firewall config .
Returns ( success , message )
"""
# Validate inputs
action = action . upper ( )
if action not in ( " ACCEPT " , " DROP " , " REJECT " ) :
return False , f " Invalid action: { action } . Must be ACCEPT, DROP, or REJECT "
direction = direction . upper ( )
if direction not in ( " IN " , " OUT " ) :
return False , f " Invalid direction: { direction } . Must be IN or OUT "
# Build rule line
parts = [ direction , action ]
if protocol :
parts . extend ( [ " -p " , protocol . lower ( ) ] )
if dport :
# Validate port
if not re . match ( r ' ^[ \ d:,]+$ ' , dport ) :
return False , f " Invalid destination port: { dport } "
parts . extend ( [ " -dport " , dport ] )
if sport :
if not re . match ( r ' ^[ \ d:,]+$ ' , sport ) :
return False , f " Invalid source port: { sport } "
parts . extend ( [ " -sport " , sport ] )
if source :
parts . extend ( [ " -source " , source ] )
if dest :
parts . extend ( [ " -dest " , dest ] )
if iface :
parts . extend ( [ " -i " , iface ] )
parts . extend ( [ " -log " , " nolog " ] )
if comment :
# Sanitize comment
safe_comment = re . sub ( r ' [^ \ w \ s \ -._/():] ' , ' ' , comment )
parts . append ( f " # { safe_comment } " )
rule_line = " " . join ( parts )
# Determine target file
if level == " cluster " :
fw_file = CLUSTER_FW
else :
fw_file = os . path . join ( HOST_FW_DIR , " host.fw " )
try :
content = " "
has_rules_section = False
if os . path . isfile ( fw_file ) :
with open ( fw_file , ' r ' ) as f :
content = f . read ( )
has_rules_section = " [RULES] " in content
if has_rules_section :
lines = content . splitlines ( )
new_lines = [ ]
inserted = False
for line in lines :
new_lines . append ( line )
if not inserted and line . strip ( ) == " [RULES] " :
new_lines . append ( rule_line )
inserted = True
content = " \n " . join ( new_lines ) + " \n "
else :
if content and not content . endswith ( " \n " ) :
content + = " \n "
content + = " \n [RULES] \n "
content + = rule_line + " \n "
os . makedirs ( os . path . dirname ( fw_file ) , exist_ok = True )
with open ( fw_file , ' w ' ) as f :
f . write ( content )
_run_cmd ( [ " pve-firewall " , " reload " ] )
return True , f " Firewall rule added: { direction } { action } { protocol } { ' : ' + dport if dport else ' ' } "
except PermissionError :
return False , " Permission denied. Cannot write to firewall config. "
except Exception as e :
return False , f " Failed to add firewall rule: { str ( e ) } "
def delete_firewall_rule ( rule_index , level = " host " ) :
"""
Delete a firewall rule by index from host or cluster config .
The index corresponds to the order of rules in [ RULES ] section .
Returns ( success , message )
"""
if level == " cluster " :
fw_file = CLUSTER_FW
else :
fw_file = os . path . join ( HOST_FW_DIR , " host.fw " )
if not os . path . isfile ( fw_file ) :
return False , " Firewall config file not found "
try :
with open ( fw_file , ' r ' ) as f :
content = f . read ( )
lines = content . splitlines ( )
new_lines = [ ]
in_rules = False
current_rule_idx = 0
removed_rule = None
for line in lines :
stripped = line . strip ( )
if stripped . startswith ( ' [ ' ) :
section_match = re . match ( r ' \ [( \ w+) \ ] ' , stripped )
if section_match :
section = section_match . group ( 1 ) . upper ( )
in_rules = section in ( " RULES " , " IN " , " OUT " )
if in_rules and stripped and not stripped . startswith ( ' # ' ) and not stripped . startswith ( ' [ ' ) :
# This is a rule line
if current_rule_idx == rule_index :
removed_rule = stripped
current_rule_idx + = 1
continue # Skip this line (delete it)
current_rule_idx + = 1
new_lines . append ( line )
if removed_rule is None :
return False , f " Rule index { rule_index } not found "
with open ( fw_file , ' w ' ) as f :
f . write ( " \n " . join ( new_lines ) + " \n " )
_run_cmd ( [ " pve-firewall " , " reload " ] )
return True , f " Firewall rule deleted: { removed_rule } "
except PermissionError :
return False , " Permission denied. Cannot modify firewall config. "
except Exception as e :
return False , f " Failed to delete rule: { str ( e ) } "
2026-02-08 13:19:24 +01:00
def add_monitor_port_rule ( ) :
"""
Add a firewall rule to allow port 8008 ( ProxMenux Monitor ) on the host .
Returns ( success , message )
"""
host_fw = os . path . join ( HOST_FW_DIR , " host.fw " )
# Check if rule already exists
status = get_firewall_status ( )
if status . get ( " monitor_port_open " ) :
return True , " Port 8008 is already allowed in the firewall "
try :
content = " "
has_rules_section = False
if os . path . isfile ( host_fw ) :
with open ( host_fw , ' r ' ) as f :
content = f . read ( )
has_rules_section = " [RULES] " in content
rule_line = " IN ACCEPT -p tcp -dport 8008 -log nolog # ProxMenux Monitor "
if has_rules_section :
# Add rule after [RULES] section header
lines = content . splitlines ( )
new_lines = [ ]
inserted = False
for line in lines :
new_lines . append ( line )
if not inserted and line . strip ( ) == " [RULES] " :
new_lines . append ( rule_line )
inserted = True
content = " \n " . join ( new_lines ) + " \n "
else :
# Add [RULES] section
if content and not content . endswith ( " \n " ) :
content + = " \n "
content + = " \n [RULES] \n "
content + = rule_line + " \n "
with open ( host_fw , ' w ' ) as f :
f . write ( content )
# Reload firewall
_run_cmd ( [ " pve-firewall " , " reload " ] )
return True , " Firewall rule added: port 8008 (TCP) allowed for ProxMenux Monitor "
except PermissionError :
return False , " Permission denied. Cannot write to firewall config. "
except Exception as e :
return False , f " Failed to add firewall rule: { str ( e ) } "
def remove_monitor_port_rule ( ) :
"""
Remove the ProxMenux Monitor port 8008 rule from host firewall .
Returns ( success , message )
"""
host_fw = os . path . join ( HOST_FW_DIR , " host.fw " )
if not os . path . isfile ( host_fw ) :
return True , " No host firewall config found "
try :
with open ( host_fw , ' r ' ) as f :
lines = f . readlines ( )
new_lines = [ ]
removed = False
for line in lines :
if " 8008 " in line and " ProxMenux " in line :
removed = True
continue
new_lines . append ( line )
if not removed :
return True , " No ProxMenux Monitor rule found to remove "
with open ( host_fw , ' w ' ) as f :
f . writelines ( new_lines )
_run_cmd ( [ " pve-firewall " , " reload " ] )
return True , " ProxMenux Monitor firewall rule removed "
except Exception as e :
return False , f " Failed to remove firewall rule: { str ( e ) } "
def enable_firewall ( level = " host " ) :
"""
Enable the Proxmox firewall at host or cluster level .
Returns ( success , message )
"""
if level == " cluster " :
return _set_firewall_enabled ( CLUSTER_FW , True )
else :
host_fw = os . path . join ( HOST_FW_DIR , " host.fw " )
return _set_firewall_enabled ( host_fw , True )
def disable_firewall ( level = " host " ) :
"""
Disable the Proxmox firewall at host or cluster level .
Returns ( success , message )
"""
if level == " cluster " :
return _set_firewall_enabled ( CLUSTER_FW , False )
else :
host_fw = os . path . join ( HOST_FW_DIR , " host.fw " )
return _set_firewall_enabled ( host_fw , False )
def _set_firewall_enabled ( fw_file , enabled ) :
""" Set enable: 1 or enable: 0 in firewall config """
try :
content = " "
if os . path . isfile ( fw_file ) :
with open ( fw_file , ' r ' ) as f :
content = f . read ( )
enable_val = " 1 " if enabled else " 0 "
has_options = " [OPTIONS] " in content
has_enable = False
lines = content . splitlines ( )
new_lines = [ ]
in_options = False
for line in lines :
stripped = line . strip ( )
if stripped . startswith ( " [ " ) :
in_options = stripped == " [OPTIONS] "
if in_options and stripped . lower ( ) . startswith ( " enable: " ) :
new_lines . append ( f " enable: { enable_val } " )
has_enable = True
else :
new_lines . append ( line )
if not has_enable :
if has_options :
# Add enable line after [OPTIONS]
final_lines = [ ]
for line in new_lines :
final_lines . append ( line )
if line . strip ( ) == " [OPTIONS] " :
final_lines . append ( f " enable: { enable_val } " )
new_lines = final_lines
else :
# Add [OPTIONS] section at the beginning
new_lines . insert ( 0 , " [OPTIONS] " )
new_lines . insert ( 1 , f " enable: { enable_val } " )
new_lines . insert ( 2 , " " )
# Ensure parent directory exists
os . makedirs ( os . path . dirname ( fw_file ) , exist_ok = True )
with open ( fw_file , ' w ' ) as f :
f . write ( " \n " . join ( new_lines ) + " \n " )
# Reload or start the firewall service
if enabled :
_run_cmd ( [ " systemctl " , " enable " , " pve-firewall " ] )
_run_cmd ( [ " systemctl " , " start " , " pve-firewall " ] )
_run_cmd ( [ " pve-firewall " , " reload " ] )
state = " enabled " if enabled else " disabled "
level = " cluster " if fw_file == CLUSTER_FW else " host "
return True , f " Firewall { state } at { level } level "
except PermissionError :
return False , " Permission denied. Cannot modify firewall config. "
except Exception as e :
return False , f " Failed to modify firewall: { str ( e ) } "
# =================================================================
# Security Tools Detection
# =================================================================
# =================================================================
# Fail2Ban Detailed Management
# =================================================================
def get_fail2ban_details ( ) :
"""
Get detailed Fail2Ban info : per - jail banned IPs , ban times , etc .
Returns dict with detailed jail information .
"""
result = {
" installed " : False ,
" active " : False ,
" version " : " " ,
" jails " : [ ] ,
}
rc , out , _ = _run_cmd ( [ " fail2ban-client " , " --version " ] )
if rc != 0 :
return result
result [ " installed " ] = True
result [ " version " ] = out . split ( " \n " ) [ 0 ] . strip ( ) if out else " "
rc2 , out2 , _ = _run_cmd ( [ " systemctl " , " is-active " , " fail2ban " ] )
result [ " active " ] = ( rc2 == 0 and out2 == " active " )
if not result [ " active " ] :
return result
# Get jail list
rc3 , out3 , _ = _run_cmd ( [ " fail2ban-client " , " status " ] )
jail_names = [ ]
if rc3 == 0 :
for line in out3 . splitlines ( ) :
if " Jail list: " in line :
jails_str = line . split ( " : " , 1 ) [ 1 ] . strip ( )
jail_names = [ j . strip ( ) for j in jails_str . split ( " , " ) if j . strip ( ) ]
# Get detailed info per jail
for jail_name in jail_names :
jail_info = {
" name " : jail_name ,
" currently_failed " : 0 ,
" total_failed " : 0 ,
" currently_banned " : 0 ,
" total_banned " : 0 ,
" banned_ips " : [ ] ,
" findtime " : " " ,
" bantime " : " " ,
" maxretry " : " " ,
}
rc4 , out4 , _ = _run_cmd ( [ " fail2ban-client " , " status " , jail_name ] )
if rc4 == 0 :
for line in out4 . splitlines ( ) :
line = line . strip ( )
if " Currently failed: " in line :
try :
jail_info [ " currently_failed " ] = int ( line . split ( " : " , 1 ) [ 1 ] . strip ( ) )
except ValueError :
pass
elif " Total failed: " in line :
try :
jail_info [ " total_failed " ] = int ( line . split ( " : " , 1 ) [ 1 ] . strip ( ) )
except ValueError :
pass
elif " Currently banned: " in line :
try :
jail_info [ " currently_banned " ] = int ( line . split ( " : " , 1 ) [ 1 ] . strip ( ) )
except ValueError :
pass
elif " Total banned: " in line :
try :
jail_info [ " total_banned " ] = int ( line . split ( " : " , 1 ) [ 1 ] . strip ( ) )
except ValueError :
pass
elif " Banned IP list: " in line :
ips_str = line . split ( " : " , 1 ) [ 1 ] . strip ( )
if ips_str :
2026-02-08 17:01:49 +01:00
raw_ips = [ ip . strip ( ) for ip in ips_str . split ( ) if ip . strip ( ) ]
jail_info [ " banned_ips " ] = [
{ " ip " : ip , " type " : classify_ip ( ip ) } for ip in raw_ips
]
2026-02-08 13:19:24 +01:00
# Get jail config values
for key in [ " findtime " , " bantime " , " maxretry " ] :
rc5 , out5 , _ = _run_cmd ( [ " fail2ban-client " , " get " , jail_name , key ] )
if rc5 == 0 and out5 :
jail_info [ key ] = out5 . strip ( )
result [ " jails " ] . append ( jail_info )
return result
2026-02-08 17:01:49 +01:00
def classify_ip ( ip_address ) :
"""
Classify an IP address as ' local ' or ' external ' .
Local : 10. x . x . x , 172.16 - 31. x . x , 192.168 . x . x , 127. x . x . x , fd00 : : / 8 , fe80 : : / 10 , : : 1
"""
if not ip_address :
return " unknown "
ip = ip_address . strip ( )
# IPv4 private ranges
if ip . startswith ( " 10. " ) or ip . startswith ( " 127. " ) or ip . startswith ( " 192.168. " ) :
return " local "
if ip . startswith ( " 172. " ) :
try :
second_octet = int ( ip . split ( " . " ) [ 1 ] )
if 16 < = second_octet < = 31 :
return " local "
except ( ValueError , IndexError ) :
pass
# IPv6 private/link-local
ip_lower = ip . lower ( )
if ip_lower == " ::1 " or ip_lower . startswith ( " fd " ) or ip_lower . startswith ( " fe80 " ) :
return " local "
return " external "
def update_jail_config ( jail_name , maxretry = None , bantime = None , findtime = None ) :
"""
Update Fail2Ban jail configuration ( maxretry , bantime , findtime ) .
Uses fail2ban - client set commands for live changes , and also writes
to the jail . local file for persistence .
bantime = - 1 means permanent ban .
Returns ( success , message )
"""
if not jail_name :
return False , " Jail name is required "
changes = [ ]
errors = [ ]
# Apply live changes via fail2ban-client
if maxretry is not None :
try :
val = int ( maxretry )
if val < 1 :
return False , " Max retries must be at least 1 "
rc , _ , err = _run_cmd ( [ " fail2ban-client " , " set " , jail_name , " maxretry " , str ( val ) ] )
if rc == 0 :
changes . append ( f " maxretry= { val } " )
else :
errors . append ( f " maxretry: { err } " )
except ValueError :
errors . append ( " maxretry must be a number " )
if bantime is not None :
try :
val = int ( bantime )
# -1 = permanent, otherwise must be positive
if val < - 1 or val == 0 :
return False , " Ban time must be positive seconds or -1 for permanent "
rc , _ , err = _run_cmd ( [ " fail2ban-client " , " set " , jail_name , " bantime " , str ( val ) ] )
if rc == 0 :
changes . append ( f " bantime= { val } " )
else :
errors . append ( f " bantime: { err } " )
except ValueError :
errors . append ( " bantime must be a number " )
if findtime is not None :
try :
val = int ( findtime )
if val < 1 :
return False , " Find time must be positive "
rc , _ , err = _run_cmd ( [ " fail2ban-client " , " set " , jail_name , " findtime " , str ( val ) ] )
if rc == 0 :
changes . append ( f " findtime= { val } " )
else :
errors . append ( f " findtime: { err } " )
except ValueError :
errors . append ( " findtime must be a number " )
# Also persist to jail.local so changes survive restart
if changes :
_persist_jail_config ( jail_name , maxretry , bantime , findtime )
if errors :
return False , " Errors: " + " ; " . join ( errors )
if changes :
return True , f " Jail ' { jail_name } ' updated: { ' , ' . join ( changes ) } "
return False , " No changes specified "
def _persist_jail_config ( jail_name , maxretry = None , bantime = None , findtime = None ) :
"""
Write jail config changes to / etc / fail2ban / jail . local for persistence .
"""
jail_local = " /etc/fail2ban/jail.local "
try :
content = " "
if os . path . isfile ( jail_local ) :
with open ( jail_local , ' r ' ) as f :
content = f . read ( )
lines = content . splitlines ( ) if content else [ ]
# Find or create the jail section
jail_section = f " [ { jail_name } ] "
section_start = - 1
section_end = len ( lines )
for i , line in enumerate ( lines ) :
if line . strip ( ) == jail_section :
section_start = i
elif section_start > = 0 and line . strip ( ) . startswith ( " [ " ) and i > section_start :
section_end = i
break
# Build settings to update
settings = { }
if maxretry is not None :
settings [ " maxretry " ] = str ( int ( maxretry ) )
if bantime is not None :
settings [ " bantime " ] = str ( int ( bantime ) )
if findtime is not None :
settings [ " findtime " ] = str ( int ( findtime ) )
if section_start > = 0 :
# Update existing section
for key , val in settings . items ( ) :
found = False
for i in range ( section_start + 1 , section_end ) :
stripped = lines [ i ] . strip ( )
if stripped . startswith ( f " { key } " ) and " = " in stripped :
lines [ i ] = f " { key } = { val } "
found = True
break
if not found :
lines . insert ( section_start + 1 , f " { key } = { val } " )
section_end + = 1
else :
# Create new section
if lines and lines [ - 1 ] . strip ( ) :
lines . append ( " " )
lines . append ( jail_section )
for key , val in settings . items ( ) :
lines . append ( f " { key } = { val } " )
with open ( jail_local , ' w ' ) as f :
f . write ( " \n " . join ( lines ) + " \n " )
except Exception :
pass # Best effort persistence
2026-02-08 17:23:53 +01:00
def apply_missing_jails ( ) :
"""
Check for missing Fail2Ban jails ( proxmox , proxmenux ) and create them .
Returns ( success , message , applied_jails ) .
"""
applied = [ ]
errors = [ ]
# Check which jails are currently active
rc , out , _ = _run_cmd ( [ " fail2ban-client " , " status " ] )
if rc != 0 :
return False , " Cannot communicate with fail2ban-client " , [ ]
current_jails = [ ]
for line in out . splitlines ( ) :
if " Jail list: " in line :
jails_str = line . split ( " : " , 1 ) [ 1 ] . strip ( )
current_jails = [ j . strip ( ) . lower ( ) for j in jails_str . split ( " , " ) if j . strip ( ) ]
# --- Proxmox jail (port 8006) ---
2026-02-10 19:32:49 +01:00
# Uses backend=systemd because Proxmox does not use rsyslog,
# so pvedaemon logs go to the systemd journal, not /var/log/daemon.log.
2026-02-08 17:23:53 +01:00
if " proxmox " not in current_jails :
try :
2026-02-12 18:58:39 +01:00
# Create filter with journalmatch for systemd backend.
2026-02-12 19:23:55 +01:00
# With backend=systemd, fail2ban receives only the MESSAGE field.
# We use _SYSTEMD_UNIT instead of _COMM (Proxmox truncates _COMM).
# Proxmox logs IPs as ::ffff:x.x.x.x (IPv4-mapped IPv6).
2026-02-08 17:23:53 +01:00
filter_content = """ [Definition]
2026-02-12 19:23:55 +01:00
failregex = ^ ( pvedaemon \\[ \\d + \\] : \\s + ) ? authentication ( failure | error ) ; rhost = ( : : ffff : ) ? < HOST > user = . * msg = . * $
2026-02-08 17:23:53 +01:00
ignoreregex =
2026-02-12 19:15:08 +01:00
journalmatch = _SYSTEMD_UNIT = pvedaemon . service
2026-02-08 17:23:53 +01:00
"""
with open ( " /etc/fail2ban/filter.d/proxmox.conf " , " w " ) as f :
f . write ( filter_content )
# Create jail
jail_content = """ [proxmox]
enabled = true
port = 8006
filter = proxmox
2026-02-10 19:32:49 +01:00
backend = systemd
2026-02-08 17:23:53 +01:00
maxretry = 3
bantime = 3600
findtime = 600
"""
with open ( " /etc/fail2ban/jail.d/proxmox.conf " , " w " ) as f :
f . write ( jail_content )
applied . append ( " proxmox " )
except Exception as e :
errors . append ( f " proxmox: { str ( e ) } " )
# --- ProxMenux Monitor jail (port 8008 + reverse proxy) ---
2026-02-10 19:32:49 +01:00
# Uses backend=auto with logpath because the Flask app writes
# auth failures directly to this file (not via syslog/journal).
2026-02-08 17:23:53 +01:00
if " proxmenux " not in current_jails :
try :
2026-02-10 19:32:49 +01:00
# Create filter with datepattern for Python logging format
2026-02-08 17:23:53 +01:00
filter_content = """ [Definition]
2026-02-10 19:32:49 +01:00
failregex = ^ . * proxmenux - auth : authentication failure ; rhost = < HOST > user = . * $
2026-02-08 17:23:53 +01:00
ignoreregex =
2026-02-10 19:32:49 +01:00
datepattern = ^ % % Y - % % m - % % d % % H : % % M : % % S
2026-02-08 17:23:53 +01:00
"""
with open ( " /etc/fail2ban/filter.d/proxmenux.conf " , " w " ) as f :
f . write ( filter_content )
# Create jail
jail_content = """ [proxmenux]
enabled = true
port = 8008 , http , https
filter = proxmenux
2026-02-10 19:04:04 +01:00
backend = auto
2026-02-08 17:23:53 +01:00
logpath = / var / log / proxmenux - auth . log
maxretry = 3
bantime = 3600
findtime = 600
"""
with open ( " /etc/fail2ban/jail.d/proxmenux.conf " , " w " ) as f :
f . write ( jail_content )
# Ensure log file exists
if not os . path . isfile ( " /var/log/proxmenux-auth.log " ) :
with open ( " /var/log/proxmenux-auth.log " , " w " ) as f :
pass
os . chmod ( " /var/log/proxmenux-auth.log " , 0o640 )
applied . append ( " proxmenux " )
except Exception as e :
errors . append ( f " proxmenux: { str ( e ) } " )
if not applied and not errors :
return True , " All jails are already configured " , [ ]
if applied :
# Restart fail2ban to load new jails
_run_cmd ( [ " systemctl " , " restart " , " fail2ban " ] )
import time
time . sleep ( 2 )
if errors :
return False , " Errors: " + " ; " . join ( errors ) , applied
return True , f " Applied jails: { ' , ' . join ( applied ) } " , applied
2026-02-08 13:19:24 +01:00
def unban_ip ( jail_name , ip_address ) :
"""
Unban a specific IP from a Fail2Ban jail .
Returns ( success , message )
"""
if not jail_name or not ip_address :
return False , " Jail name and IP address are required "
# Validate IP format (basic check)
if not re . match ( r ' ^[ \ d.:a-fA-F]+$ ' , ip_address ) :
return False , f " Invalid IP address format: { ip_address } "
rc , out , err = _run_cmd ( [ " fail2ban-client " , " set " , jail_name , " unbanip " , ip_address ] )
if rc == 0 :
return True , f " IP { ip_address } has been unbanned from jail ' { jail_name } ' "
else :
return False , f " Failed to unban IP: { err or out } "
def get_fail2ban_recent_activity ( lines = 50 ) :
"""
Get recent Fail2Ban log activity ( bans and unbans ) .
Returns list of recent events .
"""
events = [ ]
log_file = " /var/log/fail2ban.log "
if not os . path . isfile ( log_file ) :
return events
try :
# Read last N lines using tail
rc , out , _ = _run_cmd ( [ " tail " , f " - { lines } " , log_file ] , timeout = 5 )
if rc != 0 or not out :
return events
for line in out . splitlines ( ) :
event = None
# Parse ban events
ban_match = re . search (
r ' ( \ d {4} - \ d {2} - \ d {2} \ s+ \ d {2} : \ d {2} : \ d {2} )[, \ d]* \ s+.* \ [( \ w+) \ ] \ s+Ban \ s+([ \ d.:a-fA-F]+) ' ,
line
)
if ban_match :
event = {
" timestamp " : ban_match . group ( 1 ) ,
" jail " : ban_match . group ( 2 ) ,
" ip " : ban_match . group ( 3 ) ,
" action " : " ban " ,
}
# Parse unban events
if not event :
unban_match = re . search (
r ' ( \ d {4} - \ d {2} - \ d {2} \ s+ \ d {2} : \ d {2} : \ d {2} )[, \ d]* \ s+.* \ [( \ w+) \ ] \ s+Unban \ s+([ \ d.:a-fA-F]+) ' ,
line
)
if unban_match :
event = {
" timestamp " : unban_match . group ( 1 ) ,
" jail " : unban_match . group ( 2 ) ,
" ip " : unban_match . group ( 3 ) ,
" action " : " unban " ,
}
# Parse found (failed attempt) events
if not event :
found_match = re . search (
r ' ( \ d {4} - \ d {2} - \ d {2} \ s+ \ d {2} : \ d {2} : \ d {2} )[, \ d]* \ s+.* \ [( \ w+) \ ] \ s+Found \ s+([ \ d.:a-fA-F]+) ' ,
line
)
if found_match :
event = {
" timestamp " : found_match . group ( 1 ) ,
" jail " : found_match . group ( 2 ) ,
" ip " : found_match . group ( 3 ) ,
" action " : " found " ,
}
if event :
events . append ( event )
# Return most recent first
events . reverse ( )
except Exception :
pass
return events
def detect_security_tools ( ) :
"""
Detect installed security tools on the system .
Returns dict with tool status info .
"""
tools = { }
# Fail2Ban
tools [ " fail2ban " ] = _detect_fail2ban ( )
# Lynis
tools [ " lynis " ] = _detect_lynis ( )
return tools
def _detect_fail2ban ( ) :
""" Detect Fail2Ban installation and status """
info = {
" installed " : False ,
" active " : False ,
" version " : " " ,
" jails " : [ ] ,
" banned_ips_count " : 0 ,
}
rc , out , _ = _run_cmd ( [ " fail2ban-client " , " --version " ] )
if rc == 0 :
info [ " installed " ] = True
info [ " version " ] = out . split ( " \n " ) [ 0 ] . strip ( ) if out else " "
# Check service status
rc2 , out2 , _ = _run_cmd ( [ " systemctl " , " is-active " , " fail2ban " ] )
info [ " active " ] = ( rc2 == 0 and out2 == " active " )
if info [ " active " ] :
# Get jails
rc3 , out3 , _ = _run_cmd ( [ " fail2ban-client " , " status " ] )
if rc3 == 0 :
for line in out3 . splitlines ( ) :
if " Jail list: " in line :
jails_str = line . split ( " : " , 1 ) [ 1 ] . strip ( )
info [ " jails " ] = [ j . strip ( ) for j in jails_str . split ( " , " ) if j . strip ( ) ]
# Count banned IPs across all jails
total_banned = 0
for jail in info [ " jails " ] :
rc4 , out4 , _ = _run_cmd ( [ " fail2ban-client " , " status " , jail ] )
if rc4 == 0 :
for line in out4 . splitlines ( ) :
if " Currently banned: " in line :
try :
count = int ( line . split ( " : " , 1 ) [ 1 ] . strip ( ) )
total_banned + = count
except ValueError :
pass
info [ " banned_ips_count " ] = total_banned
return info
2026-02-08 17:51:20 +01:00
def _find_lynis_cmd ( ) :
""" Find the lynis binary path """
for path in [ " /usr/local/bin/lynis " , " /opt/lynis/lynis " , " /usr/bin/lynis " ] :
if os . path . isfile ( path ) and os . access ( path , os . X_OK ) :
return path
return None
2026-02-08 13:19:24 +01:00
def _detect_lynis ( ) :
""" Detect Lynis installation and status """
info = {
" installed " : False ,
" version " : " " ,
" last_scan " : None ,
" hardening_index " : None ,
}
2026-02-08 17:51:20 +01:00
lynis_cmd = _find_lynis_cmd ( )
2026-02-08 13:19:24 +01:00
if lynis_cmd :
info [ " installed " ] = True
rc , out , _ = _run_cmd ( [ lynis_cmd , " show " , " version " ] )
if rc == 0 :
info [ " version " ] = out . strip ( )
2026-02-08 18:30:18 +01:00
# Check for last scan report - use full parser for accurate data
report = parse_lynis_report ( )
if report :
info [ " last_scan " ] = report . get ( " datetime_start " , None )
info [ " hardening_index " ] = report . get ( " hardening_index " , None )
else :
# Fallback: quick read of report.dat
report_file = " /var/log/lynis-report.dat "
if os . path . isfile ( report_file ) :
try :
with open ( report_file , ' r ' ) as f :
for line in f :
if line . startswith ( " report_datetime_start= " ) :
info [ " last_scan " ] = line . split ( " = " , 1 ) [ 1 ] . strip ( )
elif line . startswith ( " hardening_index= " ) :
try :
info [ " hardening_index " ] = int ( line . split ( " = " , 1 ) [ 1 ] . strip ( ) )
except ValueError :
pass
except Exception :
pass
2026-02-08 13:19:24 +01:00
return info
2026-02-08 17:51:20 +01:00
# Track running audit
_lynis_audit_running = False
_lynis_audit_progress = " "
def run_lynis_audit ( ) :
"""
Run lynis audit system in the background .
Returns ( success , message ) .
"""
global _lynis_audit_running , _lynis_audit_progress
if _lynis_audit_running :
return False , " An audit is already running "
lynis_cmd = _find_lynis_cmd ( )
if not lynis_cmd :
return False , " Lynis is not installed "
_lynis_audit_running = True
_lynis_audit_progress = " starting "
import threading
def _run_audit ( ) :
global _lynis_audit_running , _lynis_audit_progress
try :
_lynis_audit_progress = " running "
# Remove old report so lynis creates a fresh one
report_file = " /var/log/lynis-report.dat "
if os . path . isfile ( report_file ) :
os . remove ( report_file )
2026-02-08 19:24:40 +01:00
# Capture full formatted output. Lynis suppresses its nice
# formatted output ([+] sections) when stdout is not a tty.
# Use 'script' to simulate a terminal so Lynis outputs everything.
output_log = " /var/log/lynis-output.log "
rc = - 1
out = " "
err = " "
# Method 1: Use 'script -qc' (simulates tty)
try :
script_cmd = [
" script " , " -qec " ,
f " { lynis_cmd } audit system --no-colors --quick " ,
output_log
]
result = subprocess . run (
script_cmd ,
capture_output = True , text = True , timeout = 600 ,
env = { * * os . environ , " TERM " : " dumb " }
)
rc = result . returncode
out = result . stdout . strip ( )
err = result . stderr . strip ( )
except Exception :
pass
# If script failed or output file is too small, try direct method
output_ok = (
os . path . isfile ( output_log )
and os . path . getsize ( output_log ) > 500
2026-02-08 17:51:20 +01:00
)
2026-02-08 19:24:40 +01:00
if not output_ok :
2026-02-08 18:30:18 +01:00
try :
2026-02-08 19:24:40 +01:00
rc , out , err = _run_cmd (
[ lynis_cmd , " audit " , " system " , " --no-colors " , " --quick " ] ,
timeout = 600
)
if out :
with open ( output_log , " w " ) as fout :
fout . write ( out )
2026-02-08 18:30:18 +01:00
except Exception :
pass
2026-02-08 19:24:40 +01:00
2026-02-08 19:37:44 +01:00
# Clean ALL ANSI/terminal escape codes and control chars from output
2026-02-08 19:24:40 +01:00
if os . path . isfile ( output_log ) :
try :
import re as _re
with open ( output_log , ' r ' , errors = ' replace ' ) as fout :
raw = fout . read ( )
2026-02-08 19:37:44 +01:00
# Remove ALL ANSI escape sequences comprehensively:
# CSI sequences: \x1b[ ... (letter) e.g. \x1b[0m, \x1b[?25h
cleaned = _re . sub ( r ' \ x1b \ [[ \ x20- \ x3f]*[ \ x40- \ x7e] ' , ' ' , raw )
# OSC sequences: \x1b] ... \x07 or \x1b\\
cleaned = _re . sub ( r ' \ x1b \ ].*?(?: \ x07| \ x1b \\ ) ' , ' ' , cleaned )
# Other escape sequences: \x1b followed by one char
cleaned = _re . sub ( r ' \ x1b[ \ x20- \ x7e] ' , ' ' , cleaned )
# Remove remaining control characters except \n and \t
cleaned = _re . sub ( r ' [ \ x00- \ x08 \ x0b \ x0c \ x0e- \ x1f \ x7f] ' , ' ' , cleaned )
# Remove carriage returns
2026-02-08 19:24:40 +01:00
cleaned = cleaned . replace ( ' \r ' , ' ' )
# Remove 'Script started/done' lines added by script cmd
lines = cleaned . splitlines ( )
2026-02-08 19:37:44 +01:00
lines = [ l for l in lines
if not l . strip ( ) . startswith ( ' Script started ' )
and not l . strip ( ) . startswith ( ' Script done ' ) ]
# Remove empty lines at start/end
while lines and not lines [ 0 ] . strip ( ) :
lines . pop ( 0 )
while lines and not lines [ - 1 ] . strip ( ) :
lines . pop ( )
2026-02-08 19:24:40 +01:00
cleaned = ' \n ' . join ( lines )
with open ( output_log , ' w ' ) as fout :
fout . write ( cleaned )
except Exception :
pass
# Lynis returns the hardening index as exit code (e.g. 65)
# Any non-negative code means the audit ran successfully
if rc > = 0 :
2026-02-08 17:51:20 +01:00
_lynis_audit_progress = " completed "
else :
_lynis_audit_progress = f " error: { err [ : 200 ] if err else ' unknown error ' } "
except Exception as e :
_lynis_audit_progress = f " error: { str ( e ) } "
finally :
_lynis_audit_running = False
t = threading . Thread ( target = _run_audit , daemon = True )
t . start ( )
return True , " Audit started "
def get_lynis_audit_status ( ) :
""" Get current audit status """
return {
" running " : _lynis_audit_running ,
" progress " : _lynis_audit_progress ,
}
def parse_lynis_report ( ) :
"""
Parse / var / log / lynis - report . dat into structured report data .
2026-02-08 18:30:18 +01:00
Also enriches with data from lynis . log when report . dat is sparse .
2026-02-08 17:51:20 +01:00
Returns a dict with all audit findings .
"""
report_file = " /var/log/lynis-report.dat "
2026-02-08 19:24:40 +01:00
output_file = " /var/log/lynis-output.log "
# Need at least one data source
if not os . path . isfile ( report_file ) and not os . path . isfile ( output_file ) :
2026-02-08 17:51:20 +01:00
return None
report = {
" datetime_start " : " " ,
" datetime_end " : " " ,
" lynis_version " : " " ,
" os_name " : " " ,
" os_version " : " " ,
2026-02-08 18:30:18 +01:00
" os_fullname " : " " ,
2026-02-08 17:51:20 +01:00
" hostname " : " " ,
" hardening_index " : None ,
" tests_performed " : 0 ,
" warnings " : [ ] ,
" suggestions " : [ ] ,
" categories " : { } ,
" installed_packages " : 0 ,
" kernel_version " : " " ,
" firewall_active " : False ,
" malware_scanner " : False ,
}
2026-02-08 18:30:18 +01:00
# Collect all raw key-value pairs first for flexible matching
raw_data = { }
warnings_raw = [ ]
suggestions_raw = [ ]
2026-02-08 19:24:40 +01:00
if os . path . isfile ( report_file ) :
try :
with open ( report_file , ' r ' ) as f :
for line in f :
line = line . strip ( )
if not line or line . startswith ( " # " ) or line . startswith ( " [ " ) :
continue
2026-02-08 17:51:20 +01:00
2026-02-08 19:24:40 +01:00
if " = " not in line :
continue
2026-02-08 17:51:20 +01:00
2026-02-08 19:24:40 +01:00
key , _ , value = line . partition ( " = " )
key = key . strip ( )
value = value . strip ( )
if key == " warning[] " :
warnings_raw . append ( value )
elif key == " suggestion[] " :
suggestions_raw . append ( value )
else :
# Last value wins (some keys appear multiple times)
raw_data [ key ] = value
except Exception :
pass # Continue with output.log data
2026-02-08 18:30:18 +01:00
# Map known fields (Lynis uses varied naming across versions)
report [ " datetime_start " ] = raw_data . get ( " report_datetime_start " , " " )
report [ " datetime_end " ] = raw_data . get ( " report_datetime_end " , " " )
report [ " lynis_version " ] = raw_data . get ( " lynis_version " , " " )
report [ " hostname " ] = raw_data . get ( " hostname " , " " )
# OS name - try multiple fields
report [ " os_name " ] = ( raw_data . get ( " os_name " , " " ) or
raw_data . get ( " os " , " " ) or
raw_data . get ( " os_fullname " , " " ) )
report [ " os_version " ] = ( raw_data . get ( " os_version " , " " ) or
raw_data . get ( " os_version_id " , " " ) )
report [ " os_fullname " ] = raw_data . get ( " os_fullname " , " " )
# Kernel - try multiple field names
report [ " kernel_version " ] = ( raw_data . get ( " os_kernel_version_full " , " " ) or
raw_data . get ( " os_kernel_version " , " " ) or
raw_data . get ( " linux_kernel_version " , " " ) or
raw_data . get ( " linux_version " , " " ) or
raw_data . get ( " os_kernelversion_full " , " " ) or
raw_data . get ( " os_kernelversion " , " " ) )
# Hardening index
for k in [ " hardening_index " , " hpindex " , " hp_index " ] :
if k in raw_data :
try :
report [ " hardening_index " ] = int ( raw_data [ k ] )
break
except ValueError :
pass
# Tests performed
for k in [ " tests_performed " , " ctests_performed " , " total_tests " ] :
if k in raw_data :
try :
val = int ( raw_data [ k ] )
if val > report [ " tests_performed " ] :
report [ " tests_performed " ] = val
except ValueError :
pass
# Installed packages
for k in [ " installed_packages " , " installed_packages_array " ] :
if k in raw_data :
try :
report [ " installed_packages " ] = int ( raw_data [ k ] )
except ValueError :
# Might be a string like "package1,package2" - count them
pkgs = raw_data [ k ]
if pkgs :
report [ " installed_packages " ] = len ( pkgs . split ( " , " ) )
# Firewall
for k in [ " firewall_active " , " firewall_installed " ] :
if k in raw_data and raw_data [ k ] in ( " 1 " , " true " , " yes " ) :
report [ " firewall_active " ] = True
break
# Malware scanner
for k in [ " malware_scanner_installed " , " malware_scanner " ] :
if k in raw_data and raw_data [ k ] in ( " 1 " , " true " , " yes " ) :
report [ " malware_scanner " ] = True
break
# Parse warnings
for w in warnings_raw :
parts = w . split ( " | " )
if len ( parts ) > = 2 :
report [ " warnings " ] . append ( {
" test_id " : parts [ 0 ] . strip ( ) if len ( parts ) > 0 else " " ,
" severity " : parts [ 1 ] . strip ( ) if len ( parts ) > 1 else " " ,
" description " : parts [ 2 ] . strip ( ) if len ( parts ) > 2 else parts [ 1 ] . strip ( ) ,
" solution " : parts [ 3 ] . strip ( ) if len ( parts ) > 3 else " " ,
} )
# Parse suggestions
for s in suggestions_raw :
parts = s . split ( " | " )
if len ( parts ) > = 2 :
report [ " suggestions " ] . append ( {
" test_id " : parts [ 0 ] . strip ( ) if len ( parts ) > 0 else " " ,
" description " : parts [ 1 ] . strip ( ) if len ( parts ) > 1 else " " ,
" solution " : parts [ 2 ] . strip ( ) if len ( parts ) > 2 else " " ,
" details " : parts [ 3 ] . strip ( ) if len ( parts ) > 3 else " " ,
} )
# Parse lynis-output.log (stdout) for section checks, fallback to lynis.log
report [ " sections " ] = [ ]
# Prefer the stdout output which has clean formatted sections
output_file = " /var/log/lynis-output.log "
log_file = output_file if os . path . isfile ( output_file ) else " /var/log/lynis.log "
if os . path . isfile ( log_file ) :
try :
import re
with open ( log_file , ' r ' ) as f :
log_lines = f . readlines ( )
current_section = None
current_checks = [ ]
for line in log_lines :
line = line . rstrip ( ' \n ' )
stripped = line . strip ( )
# Detect section headers: "[+] Boot and services"
2026-02-08 19:37:44 +01:00
# Use 'in' check first for speed, then regex for extraction
if ' [+] ' in stripped :
section_match = re . search ( r ' \ [ \ + \ ] \ s+(.+) ' , stripped )
if section_match :
# Save previous section
if current_section and current_checks :
report [ " sections " ] . append ( {
" name " : current_section ,
" checks " : current_checks ,
} )
current_section = section_match . group ( 1 ) . strip ( )
# Remove trailing dashes from names like "Boot and services ------"
current_section = re . sub ( r ' \ s*-+ \ s*$ ' , ' ' , current_section )
current_checks = [ ]
continue
2026-02-08 17:51:20 +01:00
2026-02-08 18:30:18 +01:00
# Skip separator lines, empty, banner lines
if stripped . startswith ( ' --- ' ) or not stripped :
continue
if stripped . startswith ( ' === ' ) or stripped . startswith ( ' # ' ) :
current_section = None # Stop parsing after results summary
continue
# Detect any line with [ STATUS ] pattern (covers -, File:, Directory:, etc.)
2026-02-08 19:37:44 +01:00
# After ANSI cleaning, cursor-positioning sequences are removed so
# there may be only 1 space between the name and [ STATUS ].
# Strategy: find the LAST occurrence of [ ... ] on the line.
bracket_match = re . search ( r ' \ [ \ s*([A-Za-z0-9_ /.:!-]+?) \ s* \ ] \ s*$ ' , stripped )
if bracket_match and current_section :
# Everything before the bracket is the check name
bracket_start = bracket_match . start ( )
check_name = stripped [ : bracket_start ] . strip ( )
check_status = bracket_match . group ( 1 ) . strip ( )
# Remove leading - or * from name
check_name = re . sub ( r ' ^[-*]+ \ s* ' , ' ' , check_name ) . strip ( )
# Skip noise lines (too short, dots-only, plugin progress)
if ( check_name
and not check_name . startswith ( ' .. ' )
and len ( check_name ) > 2
and check_name not in ( ' .. ' , ' ... ' , ' .... ' ) ) :
2026-02-08 18:30:18 +01:00
current_checks . append ( {
" name " : check_name ,
" status " : check_status ,
2026-02-08 17:51:20 +01:00
} )
2026-02-08 18:30:18 +01:00
continue
2026-02-08 17:51:20 +01:00
2026-02-08 18:30:18 +01:00
# Detect sub-results: " Result: found 35 running services"
2026-02-08 19:24:40 +01:00
# After strip(), line becomes "Result: ..." (no leading spaces)
if stripped . startswith ( " Result: " ) and current_section and current_checks :
detail = stripped [ 7 : ] . strip ( )
if detail :
current_checks [ - 1 ] [ " detail " ] = detail
2026-02-08 18:30:18 +01:00
continue
2026-02-08 17:51:20 +01:00
2026-02-08 19:24:40 +01:00
# Extract key data from the info block and summary
# Format: "Key: value" or "Key : value"
if " : " in stripped :
if not report [ " hardening_index " ] and " Hardening index " in stripped :
m = re . search ( r ' Hardening index \ s*: \ s*( \ d+) ' , stripped )
if m :
report [ " hardening_index " ] = int ( m . group ( 1 ) )
elif report [ " tests_performed " ] == 0 and " Tests performed " in stripped :
m = re . search ( r ' Tests performed \ s*: \ s*( \ d+) ' , stripped )
if m :
report [ " tests_performed " ] = int ( m . group ( 1 ) )
elif not report [ " kernel_version " ] and " Kernel version " in stripped :
m = re . search ( r ' Kernel version \ s*: \ s*(.+) ' , stripped )
if m :
report [ " kernel_version " ] = m . group ( 1 ) . strip ( )
elif not report [ " hostname " ] and stripped . startswith ( " Hostname " ) :
m = re . search ( r ' Hostname \ s*: \ s*(.+) ' , stripped )
if m :
val = m . group ( 1 ) . strip ( )
if val and val != " N/A " :
report [ " hostname " ] = val
elif not report [ " os_name " ] and " Operating system name " in stripped :
m = re . search ( r ' Operating system name \ s*: \ s*(.+) ' , stripped )
if m :
report [ " os_name " ] = m . group ( 1 ) . strip ( )
elif not report [ " os_version " ] and " Operating system version " in stripped :
m = re . search ( r ' Operating system version \ s*: \ s*(.+) ' , stripped )
if m :
report [ " os_version " ] = m . group ( 1 ) . strip ( )
elif not report [ " os_fullname " ] and " Operating system: " in stripped :
m = re . search ( r ' Operating system \ s*: \ s*(.+) ' , stripped )
if m :
report [ " os_fullname " ] = m . group ( 1 ) . strip ( )
elif not report [ " lynis_version " ] and " Program version " in stripped :
m = re . search ( r ' Program version \ s*: \ s*(.+) ' , stripped )
if m :
report [ " lynis_version " ] = m . group ( 1 ) . strip ( )
elif not report [ " datetime_start " ] and " report_datetime_start " in stripped :
pass # already from .dat
2026-02-08 18:30:18 +01:00
# Save last section
if current_section and current_checks :
report [ " sections " ] . append ( {
" name " : current_section ,
" checks " : current_checks ,
} )
# Filter out sections with no meaningful checks
report [ " sections " ] = [
s for s in report [ " sections " ]
if len ( s [ " checks " ] ) > 0
]
2026-02-08 19:37:44 +01:00
# Store debug info about parsing
report [ " _parse_debug " ] = {
" source " : log_file ,
" total_lines " : len ( log_lines ) ,
" sections_found " : len ( report [ " sections " ] ) ,
" section_names " : [ s [ " name " ] for s in report [ " sections " ] ] ,
}
except Exception as e :
2026-02-08 18:30:18 +01:00
report [ " sections " ] = [ ]
2026-02-08 19:37:44 +01:00
report [ " _parse_debug " ] = { " error " : str ( e ) }
2026-02-08 18:30:18 +01:00
2026-02-08 18:52:18 +01:00
# Post-processing: extract firewall/malware status from parsed sections
# This catches cases where report.dat doesn't have firewall_active but the
# stdout shows "Checking host based firewall [ ACTIVE ]"
if not report [ " firewall_active " ] and report [ " sections " ] :
for section in report [ " sections " ] :
section_lower = section [ " name " ] . lower ( )
if " firewall " in section_lower :
for check in section [ " checks " ] :
name_lower = check [ " name " ] . lower ( )
status_upper = check [ " status " ] . upper ( )
if " host based firewall " in name_lower or " checking host " in name_lower :
if status_upper in ( " ACTIVE " , " ENABLED " , " FOUND " , " OK " ) :
report [ " firewall_active " ] = True
break
if " iptables " in name_lower and status_upper in ( " ACTIVE " , " FOUND " , " OK " ) :
report [ " firewall_active " ] = True
break
if report [ " firewall_active " ] :
break
# Also check pve-firewall directly (Proxmox uses its own firewall service)
if not report [ " firewall_active " ] :
try :
rc , out , _ = _run_cmd ( [ " systemctl " , " is-active " , " pve-firewall " ] )
if rc == 0 and out . strip ( ) == " active " :
report [ " firewall_active " ] = True
except Exception :
pass
# Extract malware scanner status from sections
if not report [ " malware_scanner " ] and report [ " sections " ] :
for section in report [ " sections " ] :
for check in section [ " checks " ] :
name_lower = check [ " name " ] . lower ( )
status_upper = check [ " status " ] . upper ( )
if any ( x in name_lower for x in [ " malware " , " clamav " , " rkhunter " , " chkrootkit " ] ) :
if status_upper in ( " FOUND " , " INSTALLED " , " OK " ) :
report [ " malware_scanner " ] = True
break
if report [ " malware_scanner " ] :
break
2026-02-08 19:24:40 +01:00
# Always parse lynis-output.log for warnings, suggestions, software
# components. The report.dat is often sparse/empty on many systems.
output_file = " /var/log/lynis-output.log "
_log = output_file if os . path . isfile ( output_file ) else " /var/log/lynis.log "
if os . path . isfile ( _log ) :
try :
import re
with open ( _log , ' r ' ) as f :
stdout_lines = f . readlines ( )
in_warnings = False
in_suggestions = False
in_software = False
stdout_warnings = [ ]
stdout_suggestions = [ ]
last_suggestion = None
for sline in stdout_lines :
sline = sline . rstrip ( ' \n ' )
sstripped = sline . strip ( )
# Detect "Warnings (N):" header
if re . match ( r ' ^Warnings \ s* \ ( \ d+ \ ) \ s*: ' , sstripped ) :
in_warnings = True
in_suggestions = False
in_software = False
last_suggestion = None
continue
# Detect "Suggestions (N):" header
if re . match ( r ' ^Suggestions \ s* \ ( \ d+ \ ) \ s*: ' , sstripped ) :
in_suggestions = True
in_warnings = False
in_software = False
last_suggestion = None
continue
# Detect "Software components:" section
if " Software components: " in sstripped :
in_software = True
in_warnings = False
in_suggestions = False
last_suggestion = None
continue
# End of sections on major separators
if sstripped . startswith ( ' === ' ) :
in_warnings = False
in_suggestions = False
in_software = False
last_suggestion = None
continue
2026-02-08 18:52:18 +01:00
2026-02-08 19:24:40 +01:00
# Parse "Software components" for firewall/malware
# Format: "- Firewall [V]" or "[X]"
if in_software :
sw_match = re . match ( r ' ^- \ s+(.+?) \ s+ \ [([VX]) \ ] ' , sstripped )
if sw_match :
sw_name = sw_match . group ( 1 ) . strip ( ) . lower ( )
sw_status = sw_match . group ( 2 )
if " firewall " in sw_name and sw_status == " V " :
report [ " firewall_active " ] = True
if " malware " in sw_name and sw_status == " V " :
report [ " malware_scanner " ] = True
# Parse warning lines: "! Warning text [TEST-ID]"
if in_warnings and sstripped . startswith ( ' ! ' ) :
wm = re . match ( r ' ^! \ s+(.+?) \ s+ \ [([A-Z0-9_-]+) \ ] ' , sstripped )
if wm :
stdout_warnings . append ( {
" test_id " : wm . group ( 2 ) ,
" severity " : " Warning " ,
" description " : wm . group ( 1 ) . strip ( ) ,
" solution " : " " ,
} )
2026-02-08 18:52:18 +01:00
2026-02-08 19:24:40 +01:00
# Parse suggestion lines: "* Suggestion text [TEST-ID]"
if in_suggestions :
if sstripped . startswith ( ' * ' ) :
sm = re . match ( r ' ^ \ * \ s+(.+?) \ s+ \ [([A-Z0-9_-]+) \ ] ' , sstripped )
2026-02-08 18:52:18 +01:00
if sm :
2026-02-08 19:24:40 +01:00
last_suggestion = {
2026-02-08 18:52:18 +01:00
" test_id " : sm . group ( 2 ) ,
" description " : sm . group ( 1 ) . strip ( ) ,
" solution " : " " ,
" details " : " " ,
2026-02-08 19:24:40 +01:00
}
stdout_suggestions . append ( last_suggestion )
elif last_suggestion and sstripped . startswith ( ' - Details ' ) :
dm = re . match ( r ' ^- \ s*Details \ s*: \ s*(.+) ' , sstripped )
if dm :
last_suggestion [ " details " ] = dm . group ( 1 ) . strip ( )
elif last_suggestion and sstripped . startswith ( ' - Solution ' ) :
sm2 = re . match ( r ' ^- \ s*Solution \ s*: \ s*(.+) ' , sstripped )
if sm2 :
last_suggestion [ " solution " ] = sm2 . group ( 1 ) . strip ( )
# Use stdout data if report.dat had none
if len ( report [ " warnings " ] ) == 0 and stdout_warnings :
report [ " warnings " ] = stdout_warnings
if len ( report [ " suggestions " ] ) == 0 and stdout_suggestions :
report [ " suggestions " ] = stdout_suggestions
2026-02-08 18:52:18 +01:00
2026-02-08 19:24:40 +01:00
except Exception :
pass
2026-02-08 18:52:18 +01:00
2026-02-08 19:24:40 +01:00
# Fallback: datetime from file modification time
if not report [ " datetime_start " ] :
for fpath in [ " /var/log/lynis-output.log " , " /var/log/lynis-report.dat " ] :
if os . path . isfile ( fpath ) :
try :
import time
mtime = os . path . getmtime ( fpath )
report [ " datetime_start " ] = time . strftime (
" % Y- % m- %d % H: % M " , time . localtime ( mtime )
)
break
except Exception :
pass
2026-02-08 18:52:18 +01:00
2026-02-08 18:30:18 +01:00
# Fallback: get kernel from uname if still empty
if not report [ " kernel_version " ] :
try :
rc , out , _ = _run_cmd ( [ " uname " , " -r " ] )
if rc == 0 and out . strip ( ) :
report [ " kernel_version " ] = out . strip ( )
except Exception :
pass
# Fallback: get hostname from system
if not report [ " hostname " ] :
try :
import socket
report [ " hostname " ] = socket . gethostname ( )
except Exception :
pass
2026-02-08 17:51:20 +01:00
2026-02-08 18:52:18 +01:00
# Fallback: get installed packages count
if report [ " installed_packages " ] == 0 :
try :
rc , out , _ = _run_cmd ( [ " dpkg " , " -l " ] )
if rc == 0 and out :
# Count lines that start with "ii " (installed packages)
count = sum ( 1 for l in out . splitlines ( ) if l . startswith ( " ii " ) )
if count > 0 :
report [ " installed_packages " ] = count
except Exception :
pass
2026-02-08 20:00:23 +01:00
# ── Proxmox Context: classify warnings/suggestions ────────────────
# Proxmox VE is a hypervisor with specific requirements that cause
# Lynis to flag items that are normal/expected in this environment.
# We classify each finding and calculate an adjusted score.
PVE_WARNING_CONTEXT = {
" FIRE-4512 " : {
" reason " : " Proxmox uses pve-firewall which manages iptables/nftables rules dynamically. Direct iptables rules are not used. " ,
" expected " : True ,
} ,
" NETW-3015 " : {
" reason " : " Network bridges (vmbr*) operate in promiscuous mode by design to forward traffic between VMs/containers. " ,
" expected " : True ,
} ,
" MAIL-8818 " : {
" reason " : " Postfix is used by Proxmox for system notifications. The SMTP banner can be customized but is low risk on an internal server. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" NETW-2705 " : {
" reason " : " Single DNS server is common in home/lab environments. Add a secondary DNS in /etc/resolv.conf if possible. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" PKGS-7392 " : {
" reason " : " Package updates should be applied regularly. Check if the ' vulnerable ' packages are Proxmox-specific packages pending a PVE update. " ,
" expected " : False ,
} ,
}
PVE_SUGGESTION_CONTEXT = {
" BOOT-5122 " : {
" reason " : " GRUB password is recommended for physical servers but less critical for headless/remote Proxmox nodes. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" BOOT-5264 " : {
" reason " : " Many Proxmox core services (pve-*, corosync, spiceproxy, etc.) run without systemd hardening. This is by design as they need broad system access. " ,
" expected " : True ,
} ,
" KRNL-5788 " : {
" reason " : " Proxmox uses its own kernel (pve-kernel) which may not place vmlinuz in the standard location. " ,
" expected " : True ,
} ,
" AUTH-9282 " : {
" reason " : " Proxmox system accounts (www-data, backup, etc.) don ' t use password expiry. Only applies to interactive user accounts. " ,
" expected " : True ,
} ,
" AUTH-9284 " : {
" reason " : " Locked system accounts are normal in Proxmox (daemon, nobody, etc.). " ,
" expected " : True ,
} ,
" USB-1000 " : {
" reason " : " USB passthrough to VMs may require USB drivers. Disable only if USB passthrough is not needed. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" STRG-1846 " : {
" reason " : " FireWire is typically not used in modern servers. Safe to disable. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" NETW-3200 " : {
" reason " : " Protocols dccp, sctp, rds, tipc are typically not needed. Can be disabled via modprobe blacklist. " ,
" expected " : False ,
} ,
" SSH-7408 " : {
" reason " : " SSH hardening is recommended but PermitRootLogin is required for Proxmox API/CLI management. Other SSH settings can be tuned. " ,
" expected " : False ,
} ,
" FILE-6310 " : {
" reason " : " Separate partitions for /home and /var are best practice but Proxmox typically uses a simple partition layout with LVM-thin for VM storage. " ,
" expected " : True ,
} ,
" KRNL-6000 " : {
" reason " : " Some sysctl values differ because Proxmox needs IP forwarding, bridge-nf-call, and relaxed kernel settings for VM/container networking. " ,
" expected " : True ,
} ,
" HRDN-7230 " : {
" reason " : " A malware scanner (rkhunter, ClamAV) is recommended but optional for a dedicated hypervisor. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" HRDN-7222 " : {
" reason " : " Restricting compiler access is good practice on production servers. Less critical on a hypervisor where only root has access. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" FINT-4350 " : {
" reason " : " File integrity monitoring (AIDE, Tripwire) is recommended for production but optional for home/lab environments. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" ACCT-9622 " : {
" reason " : " Process accounting is useful for forensics but not required for a hypervisor. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" ACCT-9626 " : {
" reason " : " Sysstat is useful for performance monitoring but not a security requirement. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" ACCT-9628 " : {
" reason " : " Auditd provides detailed audit logging. Recommended for production, optional for home/lab. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" TOOL-5002 " : {
" reason " : " Automation tools (Ansible, Puppet) are useful for multi-node clusters but not required for single-node setups. " ,
" expected " : True ,
} ,
" LOGG-2154 " : {
" reason " : " External logging is recommended for production but optional for single-node environments. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" BANN-7126 " : {
" reason " : " Legal banners in /etc/issue are recommended for compliance but not a security risk. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
" BANN-7130 " : {
" reason " : " Legal banners in /etc/issue.net are recommended for compliance but not a security risk. " ,
" expected " : False ,
" severity_override " : " low " ,
} ,
}
# Apply Proxmox context to warnings
pve_expected_warnings = 0
for w in report [ " warnings " ] :
tid = w . get ( " test_id " , " " )
ctx = PVE_WARNING_CONTEXT . get ( tid )
if ctx :
w [ " proxmox_context " ] = ctx [ " reason " ]
w [ " proxmox_expected " ] = ctx . get ( " expected " , False )
if ctx . get ( " severity_override " ) :
w [ " proxmox_severity " ] = ctx [ " severity_override " ]
if ctx . get ( " expected " , False ) :
pve_expected_warnings + = 1
else :
w [ " proxmox_context " ] = " "
w [ " proxmox_expected " ] = False
# Apply Proxmox context to suggestions
pve_expected_suggestions = 0
for s in report [ " suggestions " ] :
tid = s . get ( " test_id " , " " )
ctx = PVE_SUGGESTION_CONTEXT . get ( tid )
if ctx :
s [ " proxmox_context " ] = ctx [ " reason " ]
s [ " proxmox_expected " ] = ctx . get ( " expected " , False )
if ctx . get ( " severity_override " ) :
s [ " proxmox_severity " ] = ctx [ " severity_override " ]
if ctx . get ( " expected " , False ) :
pve_expected_suggestions + = 1
else :
s [ " proxmox_context " ] = " "
s [ " proxmox_expected " ] = False
# Calculate Proxmox-adjusted score
# Lynis score is based on total tests and findings.
# We boost the score proportionally to the expected items.
raw_score = report [ " hardening_index " ] or 0
total_findings = len ( report [ " warnings " ] ) + len ( report [ " suggestions " ] )
expected_findings = pve_expected_warnings + pve_expected_suggestions
if total_findings > 0 and raw_score > 0 :
# Each finding roughly reduces the score. Expected findings should
# not penalize. We estimate the boost proportionally.
penalty_per_finding = ( 100 - raw_score ) / max ( total_findings , 1 )
boost = int ( penalty_per_finding * expected_findings * 0.8 )
adjusted_score = min ( 100 , raw_score + boost )
else :
adjusted_score = raw_score
report [ " proxmox_adjusted_score " ] = adjusted_score
report [ " proxmox_expected_warnings " ] = pve_expected_warnings
report [ " proxmox_expected_suggestions " ] = pve_expected_suggestions
report [ " proxmox_context_applied " ] = True
2026-02-08 17:51:20 +01:00
return report