2026-02-08 13:19:24 +01:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
"""
|
|
|
|
|
ProxMenux Security Routes
|
|
|
|
|
Flask blueprint for firewall management and security tool detection.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from flask import Blueprint, jsonify, request
|
|
|
|
|
|
|
|
|
|
security_bp = Blueprint('security', __name__)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
import security_manager
|
|
|
|
|
except ImportError:
|
|
|
|
|
security_manager = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
# Proxmox Firewall
|
|
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/firewall/status', methods=['GET'])
|
|
|
|
|
def firewall_status():
|
|
|
|
|
"""Get Proxmox firewall status, rules, and port 8008 status"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
status = security_manager.get_firewall_status()
|
|
|
|
|
return jsonify({"success": True, **status})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/firewall/enable', methods=['POST'])
|
|
|
|
|
def firewall_enable():
|
|
|
|
|
"""Enable Proxmox firewall at host or cluster level"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
data = request.json or {}
|
|
|
|
|
level = data.get("level", "host")
|
|
|
|
|
success, message = security_manager.enable_firewall(level)
|
|
|
|
|
return jsonify({"success": success, "message": message})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/firewall/disable', methods=['POST'])
|
|
|
|
|
def firewall_disable():
|
|
|
|
|
"""Disable Proxmox firewall at host or cluster level"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
data = request.json or {}
|
|
|
|
|
level = data.get("level", "host")
|
|
|
|
|
success, message = security_manager.disable_firewall(level)
|
|
|
|
|
return jsonify({"success": success, "message": message})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 16:19:48 +01:00
|
|
|
@security_bp.route('/api/security/firewall/rules', methods=['POST'])
|
|
|
|
|
def firewall_add_rule():
|
|
|
|
|
"""Add a custom firewall rule"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
data = request.json or {}
|
|
|
|
|
success, message = security_manager.add_firewall_rule(
|
|
|
|
|
direction=data.get("direction", "IN"),
|
|
|
|
|
action=data.get("action", "ACCEPT"),
|
|
|
|
|
protocol=data.get("protocol", "tcp"),
|
|
|
|
|
dport=data.get("dport", ""),
|
|
|
|
|
sport=data.get("sport", ""),
|
|
|
|
|
source=data.get("source", ""),
|
|
|
|
|
dest=data.get("dest", ""),
|
|
|
|
|
iface=data.get("iface", ""),
|
|
|
|
|
comment=data.get("comment", ""),
|
|
|
|
|
level=data.get("level", "host"),
|
|
|
|
|
)
|
|
|
|
|
if success:
|
|
|
|
|
return jsonify({"success": True, "message": message})
|
|
|
|
|
else:
|
|
|
|
|
return jsonify({"success": False, "message": message}), 400
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/firewall/rules', methods=['DELETE'])
|
|
|
|
|
def firewall_delete_rule():
|
|
|
|
|
"""Delete a firewall rule by index"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
data = request.json or {}
|
|
|
|
|
rule_index = data.get("rule_index")
|
|
|
|
|
level = data.get("level", "host")
|
|
|
|
|
if rule_index is None:
|
|
|
|
|
return jsonify({"success": False, "message": "rule_index is required"}), 400
|
|
|
|
|
success, message = security_manager.delete_firewall_rule(int(rule_index), level)
|
|
|
|
|
if success:
|
|
|
|
|
return jsonify({"success": True, "message": message})
|
|
|
|
|
else:
|
|
|
|
|
return jsonify({"success": False, "message": message}), 400
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 13:19:24 +01:00
|
|
|
@security_bp.route('/api/security/firewall/monitor-port', methods=['POST'])
|
|
|
|
|
def firewall_add_monitor_port():
|
|
|
|
|
"""Add firewall rule to allow port 8008 for ProxMenux Monitor"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
success, message = security_manager.add_monitor_port_rule()
|
|
|
|
|
return jsonify({"success": success, "message": message})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/firewall/monitor-port', methods=['DELETE'])
|
|
|
|
|
def firewall_remove_monitor_port():
|
|
|
|
|
"""Remove the ProxMenux Monitor port 8008 rule"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
success, message = security_manager.remove_monitor_port_rule()
|
|
|
|
|
return jsonify({"success": success, "message": message})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
# Fail2Ban Detailed Management
|
|
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/fail2ban/details', methods=['GET'])
|
|
|
|
|
def fail2ban_details():
|
|
|
|
|
"""Get detailed Fail2Ban info: per-jail banned IPs, stats, config"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
details = security_manager.get_fail2ban_details()
|
|
|
|
|
return jsonify({"success": True, **details})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/fail2ban/unban', methods=['POST'])
|
|
|
|
|
def fail2ban_unban():
|
|
|
|
|
"""Unban a specific IP from a Fail2Ban jail"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
data = request.json or {}
|
|
|
|
|
jail = data.get("jail", "")
|
|
|
|
|
ip = data.get("ip", "")
|
|
|
|
|
success, message = security_manager.unban_ip(jail, ip)
|
|
|
|
|
if success:
|
|
|
|
|
return jsonify({"success": True, "message": message})
|
|
|
|
|
else:
|
|
|
|
|
return jsonify({"success": False, "message": message}), 400
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 17:01:49 +01:00
|
|
|
@security_bp.route('/api/security/fail2ban/jail/config', methods=['PUT'])
|
|
|
|
|
def fail2ban_jail_config():
|
|
|
|
|
"""Update jail configuration (maxretry, bantime, findtime)"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
data = request.json or {}
|
|
|
|
|
jail = data.get("jail", "")
|
|
|
|
|
if not jail:
|
|
|
|
|
return jsonify({"success": False, "message": "Jail name is required"}), 400
|
|
|
|
|
success, message = security_manager.update_jail_config(
|
|
|
|
|
jail,
|
|
|
|
|
maxretry=data.get("maxretry"),
|
|
|
|
|
bantime=data.get("bantime"),
|
|
|
|
|
findtime=data.get("findtime"),
|
|
|
|
|
)
|
|
|
|
|
if success:
|
|
|
|
|
return jsonify({"success": True, "message": message})
|
|
|
|
|
else:
|
|
|
|
|
return jsonify({"success": False, "message": message}), 400
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 17:23:53 +01:00
|
|
|
@security_bp.route('/api/security/fail2ban/apply-jails', methods=['POST'])
|
|
|
|
|
def fail2ban_apply_jails():
|
|
|
|
|
"""Apply missing Fail2Ban jails (proxmox, proxmenux)"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
success, message, applied = security_manager.apply_missing_jails()
|
|
|
|
|
return jsonify({"success": success, "message": message, "applied": applied})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 13:19:24 +01:00
|
|
|
@security_bp.route('/api/security/fail2ban/activity', methods=['GET'])
|
|
|
|
|
def fail2ban_activity():
|
|
|
|
|
"""Get recent Fail2Ban log activity"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
events = security_manager.get_fail2ban_recent_activity()
|
|
|
|
|
return jsonify({"success": True, "events": events})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 17:51:20 +01:00
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
# Lynis Audit
|
|
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/lynis/run', methods=['POST'])
|
|
|
|
|
def lynis_run_audit():
|
|
|
|
|
"""Start a Lynis audit (runs in background)"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
success, message = security_manager.run_lynis_audit()
|
|
|
|
|
return jsonify({"success": success, "message": message})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/lynis/status', methods=['GET'])
|
|
|
|
|
def lynis_audit_status():
|
|
|
|
|
"""Get Lynis audit running status"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
status = security_manager.get_lynis_audit_status()
|
|
|
|
|
return jsonify({"success": True, **status})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/lynis/report', methods=['GET'])
|
|
|
|
|
def lynis_report():
|
|
|
|
|
"""Get parsed Lynis audit report"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
report = security_manager.parse_lynis_report()
|
|
|
|
|
if report:
|
|
|
|
|
return jsonify({"success": True, "report": report})
|
|
|
|
|
else:
|
|
|
|
|
return jsonify({"success": False, "message": "No report available. Run an audit first."})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 18:30:18 +01:00
|
|
|
@security_bp.route('/api/security/lynis/report', methods=['DELETE'])
|
|
|
|
|
def lynis_report_delete():
|
|
|
|
|
"""Delete Lynis audit report files"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
import os
|
|
|
|
|
deleted = []
|
|
|
|
|
for f in ["/var/log/lynis-report.dat", "/var/log/lynis.log", "/var/log/lynis-output.log"]:
|
|
|
|
|
if os.path.isfile(f):
|
|
|
|
|
os.remove(f)
|
|
|
|
|
deleted.append(f)
|
|
|
|
|
if deleted:
|
|
|
|
|
return jsonify({"success": True, "message": f"Deleted: {', '.join(deleted)}"})
|
|
|
|
|
else:
|
|
|
|
|
return jsonify({"success": False, "message": "No report files found to delete"})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
|
|
|
2026-02-08 13:19:24 +01:00
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
# Security Tools Detection
|
|
|
|
|
# -------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
@security_bp.route('/api/security/tools', methods=['GET'])
|
|
|
|
|
def security_tools():
|
|
|
|
|
"""Detect installed security tools (Fail2Ban, Lynis, etc.)"""
|
|
|
|
|
if not security_manager:
|
|
|
|
|
return jsonify({"success": False, "message": "Security manager not available"}), 500
|
|
|
|
|
try:
|
|
|
|
|
tools = security_manager.detect_security_tools()
|
|
|
|
|
return jsonify({"success": True, "tools": tools})
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return jsonify({"success": False, "message": str(e)}), 500
|