Files
ProxMenux/AppImage/scripts/health_monitor.py
2026-02-17 20:01:45 +01:00

2806 lines
128 KiB
Python

"""
ProxMenux Health Monitor Module
Provides comprehensive, lightweight health checks for Proxmox systems.
Optimized for minimal system impact with intelligent thresholds and hysteresis.
Author: MacRimi
Version: 1.2 (Always returns all 10 categories)
"""
import psutil
import subprocess
import json
import time
import os
import hashlib # Added for MD5 hashing
from typing import Dict, List, Any, Tuple, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import re
from health_persistence import health_persistence
try:
from proxmox_storage_monitor import proxmox_storage_monitor
PROXMOX_STORAGE_AVAILABLE = True
except ImportError:
PROXMOX_STORAGE_AVAILABLE = False
class HealthMonitor:
"""
Monitors system health across multiple components with minimal impact.
Implements hysteresis, intelligent caching, progressive escalation, and persistent error tracking.
Always returns all 10 health categories.
"""
# CPU Thresholds
CPU_WARNING = 85
CPU_CRITICAL = 95
CPU_RECOVERY = 75
CPU_WARNING_DURATION = 300 # 5 minutes sustained
CPU_CRITICAL_DURATION = 300 # 5 minutes sustained
CPU_RECOVERY_DURATION = 120
# Memory Thresholds
MEMORY_WARNING = 85
MEMORY_CRITICAL = 95
MEMORY_DURATION = 60
SWAP_WARNING_DURATION = 300
SWAP_CRITICAL_PERCENT = 5
SWAP_CRITICAL_DURATION = 120
# Storage Thresholds
STORAGE_WARNING = 85
STORAGE_CRITICAL = 95
# Temperature Thresholds
TEMP_WARNING = 80
TEMP_CRITICAL = 90
# Network Thresholds
NETWORK_LATENCY_WARNING = 100
NETWORK_LATENCY_CRITICAL = 300
NETWORK_TIMEOUT = 2
NETWORK_INACTIVE_DURATION = 600
# Log Thresholds
LOG_ERRORS_WARNING = 5
LOG_ERRORS_CRITICAL = 10
LOG_WARNINGS_WARNING = 15
LOG_WARNINGS_CRITICAL = 30
LOG_CHECK_INTERVAL = 300
# Updates Thresholds
UPDATES_WARNING = 365 # Only warn after 1 year without updates
UPDATES_CRITICAL = 730 # Critical after 2 years
BENIGN_ERROR_PATTERNS = [
# Proxmox specific benign patterns
r'got inotify poll request in wrong process',
r'auth key pair too old, rotating',
r'proxy detected vanished client connection',
r'worker \d+ finished',
r'connection timed out',
r'disconnect peer',
r'task OK',
r'backup finished',
# Systemd informational messages
r'(started|starting|stopped|stopping) session',
r'session \d+ logged (in|out)',
r'new session \d+ of user',
r'removed session \d+',
r'user@\d+\.service:',
r'user runtime directory',
# Network transient errors (common and usually self-recovering)
r'dhcp.*timeout',
r'temporary failure in name resolution',
r'network is unreachable',
r'no route to host',
# Backup and sync normal warnings
r'rsync.*vanished',
r'backup job .* finished',
r'vzdump backup .* finished',
# ZFS informational
r'zfs.*scrub (started|finished|in progress)',
r'zpool.*resilver',
# LXC/Container normal operations
r'lxc.*monitor',
r'systemd\[1\]: (started|stopped) .*\.scope',
]
CRITICAL_LOG_KEYWORDS = [
'out of memory', 'oom_kill', 'kernel panic',
'filesystem read-only', 'cannot mount',
'raid.*failed', 'md.*device failed',
'ext4-fs error', 'xfs.*corruption',
'lvm activation failed',
'hardware error', 'mce:',
'segfault', 'general protection fault'
]
WARNING_LOG_KEYWORDS = [
'i/o error', 'ata error', 'scsi error',
'task hung', 'blocked for more than',
'failed to start', 'service.*failed',
'disk.*offline', 'disk.*removed'
]
# PVE Critical Services
PVE_SERVICES = ['pveproxy', 'pvedaemon', 'pvestatd', 'pve-cluster']
def __init__(self):
"""Initialize health monitor with state tracking"""
self.state_history = defaultdict(list)
self.last_check_times = {}
self.cached_results = {}
self.network_baseline = {}
self.io_error_history = defaultdict(list)
self.failed_vm_history = set() # Track VMs that failed to start
self.persistent_log_patterns = defaultdict(lambda: {'count': 0, 'first_seen': 0, 'last_seen': 0})
self._unknown_counts = {} # Track consecutive UNKNOWN cycles per category
# System capabilities - derived from Proxmox storage types at runtime (Priority 1.5)
# SMART detection still uses filesystem check on init (lightweight)
has_smart = os.path.exists('/usr/sbin/smartctl') or os.path.exists('/usr/bin/smartctl')
self.capabilities = {'has_zfs': False, 'has_lvm': False, 'has_smart': has_smart}
try:
health_persistence.cleanup_old_errors()
except Exception as e:
print(f"[HealthMonitor] Cleanup warning: {e}")
# ─── Lightweight sampling methods for the dedicated vital-signs thread ───
# These ONLY append data to state_history without triggering evaluation,
# persistence, or subprocess-heavy operations.
def _sample_cpu_usage(self):
"""Lightweight CPU sample: read usage % and append to history. ~30ms cost."""
try:
cpu_percent = psutil.cpu_percent(interval=0)
current_time = time.time()
state_key = 'cpu_usage'
self.state_history[state_key].append({
'value': cpu_percent,
'time': current_time
})
# Prune entries older than 6 minutes
self.state_history[state_key] = [
e for e in self.state_history[state_key]
if current_time - e['time'] < 360
]
except Exception:
pass # Sampling must never crash the thread
def _sample_cpu_temperature(self):
"""Lightweight temperature sample: read sensor and append to history. ~50ms cost."""
try:
result = subprocess.run(
['sensors', '-A', '-u'],
capture_output=True, text=True, timeout=2
)
if result.returncode != 0:
return
temps = []
for line in result.stdout.split('\n'):
if 'temp' in line.lower() and '_input' in line:
try:
temp = float(line.split(':')[1].strip())
temps.append(temp)
except Exception:
continue
if temps:
max_temp = max(temps)
current_time = time.time()
state_key = 'cpu_temp_history'
self.state_history[state_key].append({
'value': max_temp,
'time': current_time
})
# Prune entries older than 4 minutes
self.state_history[state_key] = [
e for e in self.state_history[state_key]
if current_time - e['time'] < 240
]
except Exception:
pass # Sampling must never crash the thread
def get_system_info(self) -> Dict[str, Any]:
"""
Get lightweight system info for header display.
Returns: hostname, uptime, and cached health status.
This is extremely lightweight and uses cached health status.
"""
try:
# Get hostname
hostname = os.uname().nodename
# Get uptime (very cheap operation)
uptime_seconds = time.time() - psutil.boot_time()
# Get cached health status (no expensive checks)
health_status = self.get_cached_health_status()
return {
'hostname': hostname,
'uptime_seconds': int(uptime_seconds),
'uptime': self._format_uptime(uptime_seconds),
'health': health_status,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'hostname': 'unknown',
'uptime_seconds': 0,
'uptime': 'Unknown',
'health': {'status': 'UNKNOWN', 'summary': f'Error: {str(e)}'},
'timestamp': datetime.now().isoformat()
}
def _format_uptime(self, seconds: float) -> str:
"""Format uptime in human-readable format"""
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def get_cached_health_status(self) -> Dict[str, str]:
"""
Get cached health status without running expensive checks.
The background health collector keeps '_bg_overall' always fresh (every 5 min).
Falls back to calculating on demand if background data is stale or unavailable.
"""
current_time = time.time()
# 1. Check background collector cache (updated every 5 min by _health_collector_loop)
bg_key = '_bg_overall'
if bg_key in self.last_check_times:
age = current_time - self.last_check_times[bg_key]
if age < 360: # 6 min (5 min interval + 1 min tolerance)
return self.cached_results.get(bg_key, {'status': 'OK', 'summary': 'System operational'})
# 2. Check regular cache (updated by modal fetches or on-demand)
cache_key = 'overall_health'
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key, {'status': 'OK', 'summary': 'System operational'})
# 3. No fresh cache - calculate on demand (happens only on first load before bg thread runs)
status = self.get_overall_status()
self.cached_results[cache_key] = {
'status': status['status'],
'summary': status['summary']
}
self.last_check_times[cache_key] = current_time
return self.cached_results[cache_key]
def get_overall_status(self) -> Dict[str, Any]:
"""Get overall health status summary with minimal overhead"""
details = self.get_detailed_status()
overall_status = details.get('overall', 'OK')
summary = details.get('summary', '')
# Count statuses
critical_count = 0
warning_count = 0
ok_count = 0
for category, data in details.get('details', {}).items():
if isinstance(data, dict):
status = data.get('status', 'OK')
if status == 'CRITICAL':
critical_count += 1
elif status == 'WARNING':
warning_count += 1
elif status == 'OK':
ok_count += 1
return {
'status': overall_status,
'summary': summary,
'critical_count': critical_count,
'warning_count': warning_count,
'ok_count': ok_count,
'timestamp': datetime.now().isoformat()
}
def get_detailed_status(self) -> Dict[str, Any]:
"""
Get comprehensive health status with all checks.
Returns JSON structure with ALL 10 categories always present.
Now includes persistent error tracking.
"""
active_errors = health_persistence.get_active_errors()
# No need to create persistent_issues dict here, it's implicitly handled by the checks
details = {
'cpu': {'status': 'OK'},
'memory': {'status': 'OK'},
'storage': {'status': 'OK'}, # This will be overwritten by specific storage checks
'disks': {'status': 'OK'}, # This will be overwritten by disk/filesystem checks
'network': {'status': 'OK'},
'vms': {'status': 'OK'},
'services': {'status': 'OK'},
'logs': {'status': 'OK'},
'updates': {'status': 'OK'},
'security': {'status': 'OK'}
}
critical_issues = []
warning_issues = []
info_issues = [] # Added info_issues to track INFO separately
# --- Priority Order of Checks ---
# Priority 1: Critical PVE Services
services_status = self._check_pve_services()
details['services'] = services_status
if services_status['status'] == 'CRITICAL':
critical_issues.append(f"PVE Services: {services_status.get('reason', 'Service failure')}")
elif services_status['status'] == 'WARNING':
warning_issues.append(f"PVE Services: {services_status.get('reason', 'Service issue')}")
# Priority 1.5: Proxmox Storage Check (External Module)
proxmox_storage_result = self._check_proxmox_storage()
if proxmox_storage_result: # Only process if the check ran (module available)
details['storage'] = proxmox_storage_result
if proxmox_storage_result.get('status') == 'CRITICAL':
critical_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage unavailable'))
elif proxmox_storage_result.get('status') == 'WARNING':
warning_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage issue'))
# Derive capabilities from Proxmox storage types (immediate, no extra checks)
storage_checks = proxmox_storage_result.get('checks', {})
storage_types = {v.get('detail', '').split(' ')[0].lower() for v in storage_checks.values() if isinstance(v, dict)}
self.capabilities['has_zfs'] = any(t in ('zfspool', 'zfs') for t in storage_types)
self.capabilities['has_lvm'] = any(t in ('lvm', 'lvmthin') for t in storage_types)
# Priority 2: Disk/Filesystem Health (Internal checks: usage, ZFS, SMART, IO errors)
storage_status = self._check_storage_optimized()
details['disks'] = storage_status # Use 'disks' for filesystem/disk specific issues
if storage_status.get('status') == 'CRITICAL':
critical_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage failure')}")
elif storage_status.get('status') == 'WARNING':
warning_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage issue')}")
# Priority 3: VMs/CTs Status (with persistence)
vms_status = self._check_vms_cts_with_persistence()
details['vms'] = vms_status
if vms_status.get('status') == 'CRITICAL':
critical_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT failure')}")
elif vms_status.get('status') == 'WARNING':
warning_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT issue')}")
# Priority 4: Network Connectivity
network_status = self._check_network_optimized()
details['network'] = network_status
if network_status.get('status') == 'CRITICAL':
critical_issues.append(f"Network: {network_status.get('reason', 'Network failure')}")
elif network_status.get('status') == 'WARNING':
warning_issues.append(f"Network: {network_status.get('reason', 'Network issue')}")
# Priority 5: CPU Usage (with hysteresis)
cpu_status = self._check_cpu_with_hysteresis()
details['cpu'] = cpu_status
if cpu_status.get('status') == 'CRITICAL':
critical_issues.append(f"CPU: {cpu_status.get('reason', 'CPU critical')}")
elif cpu_status.get('status') == 'WARNING':
warning_issues.append(f"CPU: {cpu_status.get('reason', 'CPU high')}")
# Priority 6: Memory Usage (RAM and Swap)
memory_status = self._check_memory_comprehensive()
details['memory'] = memory_status
if memory_status.get('status') == 'CRITICAL':
critical_issues.append(f"Memory: {memory_status.get('reason', 'Memory critical')}")
elif memory_status.get('status') == 'WARNING':
warning_issues.append(f"Memory: {memory_status.get('reason', 'Memory high')}")
# Priority 7: Log Analysis (with persistence)
logs_status = self._check_logs_with_persistence()
details['logs'] = logs_status
if logs_status.get('status') == 'CRITICAL':
critical_issues.append(f"Logs: {logs_status.get('reason', 'Critical log errors')}")
elif logs_status.get('status') == 'WARNING':
warning_issues.append(f"Logs: {logs_status.get('reason', 'Log warnings')}")
# Priority 8: System Updates
updates_status = self._check_updates()
details['updates'] = updates_status
if updates_status.get('status') == 'CRITICAL':
critical_issues.append(f"Updates: {updates_status.get('reason', 'System not updated')}")
elif updates_status.get('status') == 'WARNING':
warning_issues.append(f"Updates: {updates_status.get('reason', 'Updates pending')}")
elif updates_status.get('status') == 'INFO':
info_issues.append(f"Updates: {updates_status.get('reason', 'Informational update notice')}")
# Priority 9: Security Checks
security_status = self._check_security()
details['security'] = security_status
if security_status.get('status') == 'WARNING':
warning_issues.append(f"Security: {security_status.get('reason', 'Security issue')}")
elif security_status.get('status') == 'INFO':
info_issues.append(f"Security: {security_status.get('reason', 'Security information')}")
# --- Track UNKNOWN counts and persist if >= 3 consecutive cycles ---
unknown_issues = []
for cat_key, cat_data in details.items():
cat_status = cat_data.get('status', 'OK')
if cat_status == 'UNKNOWN':
count = self._unknown_counts.get(cat_key, 0) + 1
self._unknown_counts[cat_key] = min(count, 10) # Cap to avoid unbounded growth
unknown_issues.append(f"{cat_key}: {cat_data.get('reason', 'Check unavailable')}")
if count == 3: # Only persist on the exact 3rd cycle, not every cycle after
try:
health_persistence.record_unknown_persistent(
cat_key, cat_data.get('reason', 'Check unavailable'))
except Exception:
pass
else:
self._unknown_counts[cat_key] = 0
# --- Determine Overall Status ---
# Severity: CRITICAL > WARNING > UNKNOWN (capped at WARNING) > INFO > OK
if critical_issues:
overall = 'CRITICAL'
summary = '; '.join(critical_issues[:3])
elif warning_issues:
overall = 'WARNING'
summary = '; '.join(warning_issues[:3])
elif unknown_issues:
overall = 'WARNING' # UNKNOWN caps at WARNING, never escalates to CRITICAL
summary = '; '.join(unknown_issues[:3])
elif info_issues:
overall = 'OK' # INFO statuses don't degrade overall health
summary = '; '.join(info_issues[:3])
else:
overall = 'OK'
summary = 'All systems operational'
# --- Emit events for state changes (Bloque A: Notification prep) ---
try:
previous_overall = getattr(self, '_last_overall_status', None)
if previous_overall and previous_overall != overall:
# Overall status changed - emit event
health_persistence.emit_event(
event_type='state_change',
category='overall',
severity=overall,
data={
'previous': previous_overall,
'current': overall,
'summary': summary
}
)
# Track per-category state changes
previous_details = getattr(self, '_last_category_statuses', {})
for cat_key, cat_data in details.items():
cat_status = cat_data.get('status', 'OK')
prev_status = previous_details.get(cat_key, 'OK')
if prev_status != cat_status and cat_status in ('WARNING', 'CRITICAL'):
health_persistence.emit_event(
event_type='state_change',
category=cat_key,
severity=cat_status,
data={
'previous': prev_status,
'current': cat_status,
'reason': cat_data.get('reason', '')
}
)
self._last_overall_status = overall
self._last_category_statuses = {k: v.get('status', 'OK') for k, v in details.items()}
except Exception:
pass # Event emission should never break health checks
return {
'overall': overall,
'summary': summary,
'details': details,
'timestamp': datetime.now().isoformat()
}
def _check_cpu_with_hysteresis(self) -> Dict[str, Any]:
"""Check CPU with hysteresis to avoid flapping alerts - requires 5min sustained high usage"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
current_time = time.time()
state_key = 'cpu_usage'
# Add this reading as well (supplements the sampler thread)
self.state_history[state_key].append({
'value': cpu_percent,
'time': current_time
})
# Snapshot the list for thread-safe reading (sampler may append concurrently)
cpu_snapshot = list(self.state_history[state_key])
# Prune old entries via snapshot replacement (atomic assignment)
self.state_history[state_key] = [
entry for entry in cpu_snapshot
if current_time - entry['time'] < 360
]
critical_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.CPU_CRITICAL and
current_time - entry['time'] <= self.CPU_CRITICAL_DURATION
]
warning_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.CPU_WARNING and
current_time - entry['time'] <= self.CPU_WARNING_DURATION
]
recovery_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] < self.CPU_RECOVERY and
current_time - entry['time'] <= self.CPU_RECOVERY_DURATION
]
if len(critical_samples) >= 3:
status = 'CRITICAL'
reason = f'CPU >{self.CPU_CRITICAL}% sustained for {self.CPU_CRITICAL_DURATION}s'
elif len(warning_samples) >= 3 and len(recovery_samples) < 2:
status = 'WARNING'
reason = f'CPU >{self.CPU_WARNING}% sustained for {self.CPU_WARNING_DURATION}s'
else:
status = 'OK'
reason = None
temp_status = self._check_cpu_temperature()
result = {
'status': status,
'usage': round(cpu_percent, 1),
'cores': psutil.cpu_count()
}
if reason:
result['reason'] = reason
if temp_status and temp_status.get('status') != 'UNKNOWN':
result['temperature'] = temp_status
if temp_status.get('status') == 'CRITICAL':
result['status'] = 'CRITICAL'
result['reason'] = temp_status.get('reason')
elif temp_status.get('status') == 'WARNING' and status == 'OK':
result['status'] = 'WARNING'
result['reason'] = temp_status.get('reason')
# Build checks dict for frontend expandable section
checks = {
'cpu_usage': {
'status': status,
'detail': 'Sustained high CPU usage' if status != 'OK' else 'Normal'
}
}
if temp_status and temp_status.get('status') != 'UNKNOWN':
t_status = temp_status.get('status', 'OK')
checks['cpu_temperature'] = {
'status': t_status,
'detail': 'Temperature elevated' if t_status != 'OK' else 'Normal'
}
else:
checks['cpu_temperature'] = {
'status': 'INFO',
'detail': 'No temperature sensor detected - install lm-sensors if hardware supports it',
}
result['checks'] = checks
return result
except Exception as e:
return {'status': 'UNKNOWN', 'reason': f'CPU check failed: {str(e)}'}
def _check_cpu_temperature(self) -> Optional[Dict[str, Any]]:
"""
Check CPU temperature with temporal logic:
- WARNING if temp >80°C sustained for >3 minutes
- Auto-clears if temp ≤80°C for 30 seconds
- No dismiss button (non-dismissable)
"""
cache_key = 'cpu_temp'
current_time = time.time()
# Check every 10 seconds instead of 60
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 10:
return self.cached_results.get(cache_key)
try:
result = subprocess.run(
['sensors', '-A', '-u'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
temps = []
for line in result.stdout.split('\n'):
if 'temp' in line.lower() and '_input' in line:
try:
temp = float(line.split(':')[1].strip())
temps.append(temp)
except:
continue
if temps:
max_temp = max(temps)
state_key = 'cpu_temp_history'
# Add this reading (supplements the sampler thread)
self.state_history[state_key].append({
'value': max_temp,
'time': current_time
})
# Snapshot for thread-safe reading, then atomic prune
temp_snapshot = list(self.state_history[state_key])
self.state_history[state_key] = [
entry for entry in temp_snapshot
if current_time - entry['time'] < 240
]
# Check if temperature >80°C for more than 3 minutes (180 seconds)
high_temp_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] > 80 and current_time - entry['time'] <= 180
]
# Check if temperature ≤80°C for last 30 seconds (recovery)
recovery_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] <= 80 and current_time - entry['time'] <= 30
]
# Require at least 18 samples over 3 minutes (one every 10 seconds) to trigger alert
if len(high_temp_samples) >= 18:
# Temperature has been >80°C for >3 minutes
status = 'WARNING'
reason = f'CPU temperature {max_temp}°C >80°C sustained >3min'
# Record non-dismissable error
health_persistence.record_error(
error_key='cpu_temperature',
category='temperature',
severity='WARNING',
reason=reason,
details={'temperature': max_temp, 'dismissable': False}
)
elif len(recovery_samples) >= 3:
# Temperature has been ≤80°C for 30 seconds - clear the error
status = 'OK'
reason = None
health_persistence.resolve_error('cpu_temperature', 'Temperature recovered')
else:
# Temperature is elevated but not long enough, or recovering but not yet cleared
# Check if we already have an active error
if health_persistence.is_error_active('cpu_temperature', category='temperature'):
# Keep the warning active
status = 'WARNING'
reason = f'CPU temperature {max_temp}°C still elevated'
else:
# No active warning yet
status = 'OK'
reason = None
temp_result = {
'status': status,
'value': round(max_temp, 1),
'unit': '°C'
}
if reason:
temp_result['reason'] = reason
self.cached_results[cache_key] = temp_result
self.last_check_times[cache_key] = current_time
return temp_result
return None
except Exception:
return None
def _check_memory_comprehensive(self) -> Dict[str, Any]:
"""
Check memory including RAM and swap with realistic thresholds.
Only alerts on truly problematic memory situations.
"""
try:
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
current_time = time.time()
mem_percent = memory.percent
swap_percent = swap.percent if swap.total > 0 else 0
swap_vs_ram = (swap.used / memory.total * 100) if memory.total > 0 else 0
state_key = 'memory_usage'
self.state_history[state_key].append({
'mem_percent': mem_percent,
'swap_percent': swap_percent,
'swap_vs_ram': swap_vs_ram,
'time': current_time
})
self.state_history[state_key] = [
entry for entry in self.state_history[state_key]
if current_time - entry['time'] < 600
]
mem_critical = sum(
1 for entry in self.state_history[state_key]
if entry['mem_percent'] >= 90 and
current_time - entry['time'] <= self.MEMORY_DURATION
)
mem_warning = sum(
1 for entry in self.state_history[state_key]
if entry['mem_percent'] >= self.MEMORY_WARNING and
current_time - entry['time'] <= self.MEMORY_DURATION
)
swap_critical = sum(
1 for entry in self.state_history[state_key]
if entry['swap_vs_ram'] > 20 and
current_time - entry['time'] <= self.SWAP_CRITICAL_DURATION
)
if mem_critical >= 2:
status = 'CRITICAL'
reason = f'RAM >90% for {self.MEMORY_DURATION}s'
elif swap_critical >= 2:
status = 'CRITICAL'
reason = f'Swap >20% of RAM ({swap_vs_ram:.1f}%)'
elif mem_warning >= 2:
status = 'WARNING'
reason = f'RAM >{self.MEMORY_WARNING}% for {self.MEMORY_DURATION}s'
else:
status = 'OK'
reason = None
ram_avail_gb = round(memory.available / (1024**3), 2)
ram_total_gb = round(memory.total / (1024**3), 2)
swap_used_gb = round(swap.used / (1024**3), 2)
swap_total_gb = round(swap.total / (1024**3), 2)
# Determine per-sub-check status
ram_status = 'CRITICAL' if mem_percent >= 90 and mem_critical >= 2 else ('WARNING' if mem_percent >= self.MEMORY_WARNING and mem_warning >= 2 else 'OK')
swap_status = 'CRITICAL' if swap_critical >= 2 else 'OK'
result = {
'status': status,
'ram_percent': round(mem_percent, 1),
'ram_available_gb': ram_avail_gb,
'swap_percent': round(swap_percent, 1),
'swap_used_gb': swap_used_gb,
'checks': {
'ram_usage': {
'status': ram_status,
'detail': 'High RAM usage sustained' if ram_status != 'OK' else 'Normal'
},
'swap_usage': {
'status': swap_status,
'detail': 'Excessive swap usage' if swap_status != 'OK' else ('Normal' if swap.total > 0 else 'No swap configured')
}
}
}
if reason:
result['reason'] = reason
return result
except Exception as e:
return {'status': 'UNKNOWN', 'reason': f'Memory check failed: {str(e)}'}
def _check_storage_optimized(self) -> Dict[str, Any]:
"""
Optimized storage check - monitors Proxmox storages from pvesm status.
Checks for inactive storages, disk health from SMART/events, and ZFS pool health.
"""
issues = []
storage_details = {}
# Check disk usage and mount status first for critical mounts
critical_mounts = ['/']
for mount_point in critical_mounts:
try:
result = subprocess.run(
['mountpoint', '-q', mount_point],
capture_output=True,
timeout=2
)
if result.returncode != 0:
issues.append(f'{mount_point}: Not mounted')
storage_details[mount_point] = {
'status': 'CRITICAL',
'reason': 'Not mounted'
}
continue
# Check if read-only
with open('/proc/mounts', 'r') as f:
for line in f:
parts = line.split()
if len(parts) >= 4 and parts[1] == mount_point:
options = parts[3].split(',')
if 'ro' in options:
issues.append(f'{mount_point}: Mounted read-only')
storage_details[mount_point] = {
'status': 'CRITICAL',
'reason': 'Mounted read-only'
}
break # Found it, no need to check further for this mountpoint
# Check filesystem usage only if not already flagged as critical
if mount_point not in storage_details or storage_details[mount_point].get('status') == 'OK':
fs_status = self._check_filesystem(mount_point)
if fs_status['status'] != 'OK':
issues.append(f"{mount_point}: {fs_status['reason']}")
storage_details[mount_point] = fs_status
except Exception:
pass # Silently skip if mountpoint check fails
# Check ZFS pool health status
zfs_pool_issues = self._check_zfs_pool_health()
if zfs_pool_issues:
for pool_name, pool_info in zfs_pool_issues.items():
issues.append(f'{pool_name}: {pool_info["reason"]}')
storage_details[pool_name] = pool_info
# Check disk health from Proxmox task log or system logs (SMART, etc.)
disk_health_issues = self._check_disk_health_from_events()
if disk_health_issues:
for disk, issue in disk_health_issues.items():
# Only add if not already covered by critical mountpoint issues
if disk not in storage_details or storage_details[disk].get('status') == 'OK':
issues.append(f'{disk}: {issue["reason"]}')
storage_details[disk] = issue
# Check LVM status
lvm_status = self._check_lvm()
if lvm_status.get('status') == 'WARNING':
# LVM volumes might be okay but indicate potential issues
issues.append(f"LVM check: {lvm_status.get('reason')}")
storage_details['lvm_check'] = lvm_status
# Check dmesg for real-time I/O errors (dmesg-based, complements journalctl SMART checks)
dmesg_io_result = self._check_disks_optimized()
if dmesg_io_result.get('status') != 'OK':
dmesg_details = dmesg_io_result.get('details', {})
for disk_path, disk_info in dmesg_details.items():
if disk_path not in storage_details or storage_details[disk_path].get('status') == 'OK':
issues.append(f'{disk_path}: {disk_info.get("reason", "I/O errors")}')
storage_details[disk_path] = disk_info
# Build checks dict from storage_details, adding OK entries for items with no issues
checks = {}
for key, val in storage_details.items():
checks[key] = {
'status': val.get('status', 'OK'),
'detail': val.get('reason', 'OK'),
**{k: v for k, v in val.items() if k not in ('status', 'reason')}
}
if not issues:
# Add descriptive OK entries only for capabilities this server actually has
checks['root_filesystem'] = checks.get('/', {'status': 'OK', 'detail': 'Mounted read-write, space OK'})
checks['io_errors'] = {'status': 'OK', 'detail': 'No I/O errors in dmesg'}
if self.capabilities.get('has_smart'):
checks['smart_health'] = {'status': 'OK', 'detail': 'No SMART warnings in journal'}
if self.capabilities.get('has_zfs'):
checks['zfs_pools'] = {'status': 'OK', 'detail': 'ZFS pools healthy'}
if self.capabilities.get('has_lvm'):
checks['lvm_volumes'] = {'status': 'OK', 'detail': 'LVM volumes OK'}
return {'status': 'OK', 'checks': checks}
# Determine overall status
has_critical = any(d.get('status') == 'CRITICAL' for d in storage_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': storage_details,
'checks': checks
}
def _check_filesystem(self, mount_point: str) -> Dict[str, Any]:
"""Check individual filesystem for space and mount status"""
try:
usage = psutil.disk_usage(mount_point)
percent = usage.percent
if percent >= self.STORAGE_CRITICAL:
status = 'CRITICAL'
reason = f'{percent:.1f}% full (≥{self.STORAGE_CRITICAL}%)'
elif percent >= self.STORAGE_WARNING:
status = 'WARNING'
reason = f'{percent:.1f}% full (≥{self.STORAGE_WARNING}%)'
else:
status = 'OK'
reason = None
result = {
'status': status,
'usage_percent': round(percent, 1)
}
if reason:
result['reason'] = reason
return result
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Check failed: {str(e)}'
}
def _check_lvm(self) -> Dict[str, Any]:
"""Check LVM volumes - improved detection"""
try:
# Check if lvs command is available
result_which = subprocess.run(
['which', 'lvs'],
capture_output=True,
text=True,
timeout=1
)
if result_which.returncode != 0:
return {'status': 'OK'} # LVM not installed
result = subprocess.run(
['lvs', '--noheadings', '--options', 'lv_name,vg_name,lv_attr'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode != 0:
return {'status': 'WARNING', 'reason': 'lvs command failed'}
volumes = []
for line in result.stdout.strip().split('\n'):
if line.strip():
parts = line.split()
if len(parts) >= 2:
lv_name = parts[0].strip()
vg_name = parts[1].strip()
# Check for 'a' attribute indicating active/available
if 'a' in parts[2]:
volumes.append(f'{vg_name}/{lv_name}')
# If LVM is configured but no active volumes are found, it might be an issue or just not used
if not volumes:
# Check if any VGs exist to determine if LVM is truly unconfigured or just inactive
vg_result = subprocess.run(
['vgs', '--noheadings', '--options', 'vg_name'],
capture_output=True,
text=True,
timeout=3
)
if vg_result.returncode == 0 and vg_result.stdout.strip():
return {'status': 'WARNING', 'reason': 'No active LVM volumes detected'}
else:
return {'status': 'OK'} # No VGs found, LVM not in use
return {'status': 'OK', 'volumes': len(volumes)}
except Exception:
return {'status': 'OK'}
# This function is no longer used in get_detailed_status, but kept for reference if needed.
# The new _check_proxmox_storage function handles this logic better.
def _check_proxmox_storages(self) -> Dict[str, Any]:
"""Check Proxmox-specific storages (only report problems)"""
storages = {}
try:
if os.path.exists('/etc/pve/storage.cfg'):
with open('/etc/pve/storage.cfg', 'r') as f:
current_storage = None
storage_type = None
for line in f:
line = line.strip()
if line.startswith('dir:') or line.startswith('nfs:') or \
line.startswith('cifs:') or line.startswith('pbs:') or \
line.startswith('rbd:') or line.startswith('cephfs:') or \
line.startswith('zfs:') or line.startswith('zfs-send:'):
parts = line.split(':', 1)
storage_type = parts[0]
current_storage = parts[1].strip()
elif line.startswith('path ') and current_storage:
path = line.split(None, 1)[1]
if storage_type == 'dir':
if not os.path.exists(path):
storages[f'storage_{current_storage}'] = {
'status': 'CRITICAL',
'reason': 'Directory does not exist',
'type': 'dir',
'path': path
}
current_storage = None
storage_type = None
except Exception:
pass
return storages
def _check_disks_optimized(self) -> Dict[str, Any]:
"""
Optimized disk check - always returns status.
Checks dmesg for I/O errors and SMART status.
NOTE: This function is now largely covered by _check_storage_optimized,
but kept for potential specific disk-level reporting if needed.
Currently, its primary function is to detect recent I/O errors.
"""
current_time = time.time()
disk_issues = {}
try:
# Check dmesg for I/O errors in the last 5 minutes
result = subprocess.run(
['dmesg', '-T', '--level=err,warn', '--since', '5 minutes ago'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
if any(keyword in line_lower for keyword in ['i/o error', 'ata error', 'scsi error', 'medium error']):
# Try to extract disk name
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+)', line)
if disk_match:
disk_name = disk_match.group(1)
self.io_error_history[disk_name].append(current_time)
# Clean old history (keep errors from the last 5 minutes)
for disk in list(self.io_error_history.keys()):
self.io_error_history[disk] = [
t for t in self.io_error_history[disk]
if current_time - t < 300
]
error_count = len(self.io_error_history[disk])
# Report based on recent error count
if error_count >= 3:
error_key = f'disk_{disk}'
severity = 'CRITICAL'
reason = f'{error_count} I/O errors in 5 minutes'
health_persistence.record_error(
error_key=error_key,
category='disks',
severity=severity,
reason=reason,
details={'disk': disk, 'error_count': error_count, 'dismissable': False}
)
disk_details[disk] = {
'status': severity,
'reason': reason,
'dismissable': False
}
elif error_count >= 1:
error_key = f'disk_{disk}'
severity = 'WARNING'
reason = f'{error_count} I/O error(s) in 5 minutes'
health_persistence.record_error(
error_key=error_key,
category='disks',
severity=severity,
reason=reason,
details={'disk': disk, 'error_count': error_count, 'dismissable': True}
)
disk_issues[f'/dev/{disk}'] = {
'status': severity,
'reason': reason,
'dismissable': True
}
else:
error_key = f'disk_{disk}'
health_persistence.resolve_error(error_key, 'Disk errors cleared')
if not disk_issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in disk_issues.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': f"{len(disk_issues)} disk(s) with recent errors",
'details': disk_issues
}
except Exception as e:
print(f"[HealthMonitor] Disk/IO check failed: {e}")
return {'status': 'UNKNOWN', 'reason': f'Disk check unavailable: {str(e)}', 'checks': {}}
def _check_network_optimized(self) -> Dict[str, Any]:
"""
Optimized network check - only alerts for interfaces that are actually in use.
Avoids false positives for unused physical interfaces.
"""
try:
issues = []
interface_details = {}
net_if_stats = psutil.net_if_stats()
try:
net_io_per_nic = psutil.net_io_counters(pernic=True)
except Exception:
net_io_per_nic = {}
try:
net_if_addrs = psutil.net_if_addrs()
except Exception:
net_if_addrs = {}
active_interfaces = set()
for interface, stats in net_if_stats.items():
if interface == 'lo':
continue
# Check if important interface is down
if not stats.isup:
should_alert = False
alert_reason = None
# Check if it's a bridge interface (always important for VMs/LXCs)
if interface.startswith('vmbr'):
should_alert = True
alert_reason = 'Bridge interface DOWN (VMs/LXCs may be affected)'
# Check if physical interface has configuration or traffic
elif interface.startswith(('eth', 'ens', 'enp', 'eno')):
# Check if interface has IP address (configured)
has_ip = False
if interface in net_if_addrs:
for addr in net_if_addrs[interface]:
if addr.family == 2: # IPv4
has_ip = True
break
# Check if interface has traffic (has been used)
has_traffic = False
if interface in net_io_per_nic:
io_stats = net_io_per_nic[interface]
# If interface has sent or received any data, it's being used
if io_stats.bytes_sent > 0 or io_stats.bytes_recv > 0:
has_traffic = True
# Only alert if interface is configured or has been used
if has_ip:
should_alert = True
alert_reason = 'Configured interface DOWN (has IP address)'
elif has_traffic:
should_alert = True
alert_reason = 'Active interface DOWN (was handling traffic)'
if should_alert:
issues.append(f'{interface} is DOWN')
error_key = interface
health_persistence.record_error(
error_key=error_key,
category='network',
severity='CRITICAL',
reason=alert_reason or 'Interface DOWN',
details={'interface': interface, 'dismissable': False}
)
interface_details[interface] = {
'status': 'CRITICAL',
'reason': alert_reason or 'Interface DOWN',
'dismissable': False
}
else:
active_interfaces.add(interface)
if interface.startswith('vmbr') or interface.startswith(('eth', 'ens', 'enp', 'eno')):
health_persistence.resolve_error(interface, 'Interface recovered')
# Check connectivity (latency)
latency_status = self._check_network_latency()
if latency_status:
latency_ms = latency_status.get('latency_ms', 'N/A')
latency_sev = latency_status.get('status', 'OK')
interface_details['connectivity'] = latency_status
connectivity_check = {
'status': latency_sev if latency_sev not in ['UNKNOWN'] else 'OK',
'detail': f'Latency {latency_ms}ms to 1.1.1.1' if isinstance(latency_ms, (int, float)) else latency_status.get('reason', 'Unknown'),
}
if latency_sev not in ['OK', 'INFO', 'UNKNOWN']:
issues.append(latency_status.get('reason', 'Network latency issue'))
else:
connectivity_check = {'status': 'OK', 'detail': 'Not tested'}
# Build checks dict
checks = {}
for iface in active_interfaces:
checks[iface] = {'status': 'OK', 'detail': 'UP'}
for iface, detail in interface_details.items():
if iface != 'connectivity':
checks[iface] = {
'status': detail.get('status', 'OK'),
'detail': detail.get('reason', 'DOWN'),
'dismissable': detail.get('dismissable', False)
}
checks['connectivity'] = connectivity_check
if not issues:
return {'status': 'OK', 'checks': checks}
has_critical = any(d.get('status') == 'CRITICAL' for d in interface_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:2]),
'details': interface_details,
'checks': checks
}
except Exception as e:
print(f"[HealthMonitor] Network check failed: {e}")
return {'status': 'UNKNOWN', 'reason': f'Network check unavailable: {str(e)}', 'checks': {}}
def _check_network_latency(self) -> Optional[Dict[str, Any]]:
"""Check network latency to 1.1.1.1 (cached)"""
cache_key = 'network_latency'
current_time = time.time()
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key)
try:
result = subprocess.run(
['ping', '-c', '1', '-W', '1', '1.1.1.1'],
capture_output=True,
text=True,
timeout=self.NETWORK_TIMEOUT
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if 'time=' in line:
try:
latency_str = line.split('time=')[1].split()[0]
latency = float(latency_str)
if latency > self.NETWORK_LATENCY_CRITICAL:
status = 'CRITICAL'
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_CRITICAL}ms'
elif latency > self.NETWORK_LATENCY_WARNING:
status = 'WARNING'
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_WARNING}ms'
else:
status = 'OK'
reason = None
latency_result = {
'status': status,
'latency_ms': round(latency, 1)
}
if reason:
latency_result['reason'] = reason
self.cached_results[cache_key] = latency_result
self.last_check_times[cache_key] = current_time
return latency_result
except:
pass
# If ping failed (timeout, unreachable) - distinguish the reason
stderr_lower = (result.stderr or '').lower() if hasattr(result, 'stderr') else ''
if 'unreachable' in stderr_lower or 'network is unreachable' in stderr_lower:
fail_reason = 'Network unreachable - no route to 1.1.1.1'
elif result.returncode == 1:
fail_reason = 'Packet loss to 1.1.1.1 (100% loss)'
else:
fail_reason = f'Ping failed (exit code {result.returncode})'
packet_loss_result = {
'status': 'CRITICAL',
'reason': fail_reason
}
self.cached_results[cache_key] = packet_loss_result
self.last_check_times[cache_key] = current_time
return packet_loss_result
except subprocess.TimeoutExpired:
timeout_result = {
'status': 'WARNING',
'reason': f'Ping timeout (>{self.NETWORK_TIMEOUT}s) - possible high latency'
}
self.cached_results[cache_key] = timeout_result
self.last_check_times[cache_key] = current_time
return timeout_result
except Exception:
return {'status': 'UNKNOWN', 'reason': 'Ping command failed'}
def _check_vms_cts_optimized(self) -> Dict[str, Any]:
"""
Optimized VM/CT check - detects qmp failures and startup errors from logs.
Improved detection of container and VM errors from journalctl.
"""
try:
issues = []
vm_details = {}
result = subprocess.run(
['journalctl', '--since', '10 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower)
if vm_qmp_match:
vmid = vm_qmp_match.group(1)
key = f'vm_{vmid}'
if key not in vm_details:
issues.append(f'VM {vmid}: Communication issue')
vm_details[key] = {
'status': 'WARNING',
'reason': 'QMP command timeout',
'id': vmid,
'type': 'VM'
}
continue
ct_error_match = re.search(r'(?:ct|container|lxc)\s+(\d+)', line_lower)
if ct_error_match and ('error' in line_lower or 'fail' in line_lower or 'device' in line_lower):
ctid = ct_error_match.group(1)
key = f'ct_{ctid}'
if key not in vm_details:
if 'device' in line_lower and 'does not exist' in line_lower:
device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower)
if device_match:
reason = f'Device {device_match.group(1)} missing'
else:
reason = 'Device error'
elif 'failed to start' in line_lower:
reason = 'Failed to start'
else:
reason = 'Container error'
issues.append(f'CT {ctid}: {reason}')
vm_details[key] = {
'status': 'WARNING' if 'device' in reason.lower() else 'CRITICAL',
'reason': reason,
'id': ctid,
'type': 'CT'
}
continue
vzstart_match = re.search(r'vzstart:(\d+):', line)
if vzstart_match and ('error' in line_lower or 'fail' in line_lower or 'does not exist' in line_lower):
ctid = vzstart_match.group(1)
key = f'ct_{ctid}'
if key not in vm_details:
# Extraer mensaje de error
if 'device' in line_lower and 'does not exist' in line_lower:
device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower)
if device_match:
reason = f'Device {device_match.group(1)} missing'
else:
reason = 'Device error'
else:
reason = 'Startup error'
issues.append(f'CT {ctid}: {reason}')
vm_details[key] = {
'status': 'WARNING',
'reason': reason,
'id': ctid,
'type': 'CT'
}
continue
if any(keyword in line_lower for keyword in ['failed to start', 'cannot start', 'activation failed', 'start error']):
id_match = re.search(r'\b(\d{3,4})\b', line)
if id_match:
vmid = id_match.group(1)
key = f'vmct_{vmid}'
if key not in vm_details:
issues.append(f'VM/CT {vmid}: Failed to start')
vm_details[key] = {
'status': 'CRITICAL',
'reason': 'Failed to start',
'id': vmid,
'type': 'VM/CT'
}
if not issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': vm_details
}
except Exception as e:
print(f"[HealthMonitor] VMs/CTs check failed: {e}")
return {'status': 'UNKNOWN', 'reason': f'VM/CT check unavailable: {str(e)}', 'checks': {}}
# Modified to use persistence
def _check_vms_cts_with_persistence(self) -> Dict[str, Any]:
"""
Check VMs/CTs with persistent error tracking.
Errors persist until VM starts or 48h elapsed.
"""
try:
issues = []
vm_details = {}
# Get persistent errors first
persistent_errors = health_persistence.get_active_errors('vms')
# Check if any persistent VMs/CTs have started
for error in persistent_errors:
error_key = error['error_key']
if error_key.startswith('vm_') or error_key.startswith('ct_'):
vm_id = error_key.split('_')[1]
# Check if VM is running using persistence helper
if health_persistence.check_vm_running(vm_id):
continue # Error auto-resolved if VM is now running
# Still active, add to details
vm_details[error_key] = {
'status': error['severity'],
'reason': error['reason'],
'id': error.get('details', {}).get('id', 'unknown'),
'type': error.get('details', {}).get('type', 'VM/CT'),
'first_seen': error['first_seen']
}
issues.append(f"{error.get('details', {}).get('type', 'VM')} {error.get('details', {}).get('id', '')}: {error['reason']}")
# Check for new errors in logs
# Using 'warning' priority to catch potential startup issues
result = subprocess.run(
['journalctl', '--since', '10 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
# VM QMP errors
vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower)
if vm_qmp_match:
vmid = vm_qmp_match.group(1)
error_key = f'vm_{vmid}'
if error_key not in vm_details:
# Record persistent error
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='WARNING',
reason='QMP command timeout',
details={'id': vmid, 'type': 'VM'}
)
issues.append(f'VM {vmid}: Communication issue')
vm_details[error_key] = {
'status': 'WARNING',
'reason': 'QMP command timeout',
'id': vmid,
'type': 'VM'
}
continue
# Container errors (including startup issues via vzstart)
vzstart_match = re.search(r'vzstart:(\d+):', line)
if vzstart_match and ('error' in line_lower or 'fail' in line_lower or 'does not exist' in line_lower):
ctid = vzstart_match.group(1)
error_key = f'ct_{ctid}'
if error_key not in vm_details:
if 'device' in line_lower and 'does not exist' in line_lower:
device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower)
if device_match:
reason = f'Device {device_match.group(1)} missing'
else:
reason = 'Device error'
else:
reason = 'Startup error'
# Record persistent error
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='WARNING',
reason=reason,
details={'id': ctid, 'type': 'CT'}
)
issues.append(f'CT {ctid}: {reason}')
vm_details[error_key] = {
'status': 'WARNING',
'reason': reason,
'id': ctid,
'type': 'CT'
}
# Generic failed to start for VMs and CTs
if any(keyword in line_lower for keyword in ['failed to start', 'cannot start', 'activation failed', 'start error']):
# Try contextual VMID patterns first (more precise), then fallback to generic
id_match = (
re.search(r'(?:VMID|vmid|VM|CT|qemu|lxc|pct|qm)[:\s=/]+(\d{3,5})\b', line) or
re.search(r'\b(\d{3,5})\.conf\b', line) or
re.search(r'\b(\d{3,5})\b', line)
)
if id_match:
vmid_ctid = id_match.group(1)
# Determine if it's a VM or CT based on context, if possible
if 'vm' in line_lower or 'qemu' in line_lower:
error_key = f'vm_{vmid_ctid}'
vm_type = 'VM'
elif 'ct' in line_lower or 'lxc' in line_lower:
error_key = f'ct_{vmid_ctid}'
vm_type = 'CT'
else:
# Fallback if type is unclear
error_key = f'vmct_{vmid_ctid}'
vm_type = 'VM/CT'
if error_key not in vm_details:
reason = 'Failed to start'
# Record persistent error
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='CRITICAL',
reason=reason,
details={'id': vmid_ctid, 'type': vm_type}
)
issues.append(f'{vm_type} {vmid_ctid}: {reason}')
vm_details[error_key] = {
'status': 'CRITICAL',
'reason': reason,
'id': vmid_ctid,
'type': vm_type
}
# Build checks dict from vm_details
checks = {}
for key, val in vm_details.items():
vm_label = f"{val.get('type', 'VM')} {val.get('id', key)}"
checks[vm_label] = {
'status': val.get('status', 'WARNING'),
'detail': val.get('reason', 'Error'),
'dismissable': True
}
if not issues:
checks['qmp_communication'] = {'status': 'OK', 'detail': 'No QMP timeouts detected'}
checks['container_startup'] = {'status': 'OK', 'detail': 'No container startup errors'}
checks['vm_startup'] = {'status': 'OK', 'detail': 'No VM startup failures'}
checks['oom_killer'] = {'status': 'OK', 'detail': 'No OOM events detected'}
return {'status': 'OK', 'checks': checks}
has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': vm_details,
'checks': checks
}
except Exception as e:
print(f"[HealthMonitor] VMs/CTs persistence check failed: {e}")
return {'status': 'UNKNOWN', 'reason': f'VM/CT check unavailable: {str(e)}', 'checks': {}}
def _check_pve_services(self) -> Dict[str, Any]:
"""
Check critical Proxmox services with persistence tracking.
- Checks the base PVE_SERVICES list
- Dynamically adds corosync if a cluster config exists
- Records failed services in persistence for tracking/dismiss
- Auto-clears when services recover
"""
try:
# Build service list: base PVE services + corosync if clustered
services_to_check = list(self.PVE_SERVICES)
is_cluster = os.path.exists('/etc/corosync/corosync.conf')
if is_cluster and 'corosync' not in services_to_check:
services_to_check.append('corosync')
failed_services = []
service_details = {}
for service in services_to_check:
try:
result = subprocess.run(
['systemctl', 'is-active', service],
capture_output=True,
text=True,
timeout=2
)
status = result.stdout.strip()
if result.returncode != 0 or status != 'active':
failed_services.append(service)
service_details[service] = status or 'inactive'
except Exception:
failed_services.append(service)
service_details[service] = 'error'
# Build checks dict with status per service
checks = {}
for svc in services_to_check:
if svc in failed_services:
state = service_details.get(svc, 'inactive')
checks[svc] = {
'status': 'CRITICAL',
'detail': f'Service is {state}',
}
else:
checks[svc] = {
'status': 'OK',
'detail': 'Active',
}
if is_cluster:
checks['cluster_mode'] = {
'status': 'OK',
'detail': 'Cluster detected (corosync.conf present)',
}
if failed_services:
reason = f'Services inactive: {", ".join(failed_services)}'
# Record each failed service in persistence
for svc in failed_services:
error_key = f'pve_service_{svc}'
health_persistence.record_error(
error_key=error_key,
category='pve_services',
severity='CRITICAL',
reason=f'PVE service {svc} is {service_details.get(svc, "inactive")}',
details={'service': svc, 'state': service_details.get(svc, 'inactive')}
)
# Auto-clear services that recovered
for svc in services_to_check:
if svc not in failed_services:
error_key = f'pve_service_{svc}'
if health_persistence.is_error_active(error_key):
health_persistence.clear_error(error_key)
return {
'status': 'CRITICAL',
'reason': reason,
'failed': failed_services,
'is_cluster': is_cluster,
'services_checked': len(services_to_check),
'checks': checks
}
# All OK - clear any previously tracked service errors
for svc in services_to_check:
error_key = f'pve_service_{svc}'
if health_persistence.is_error_active(error_key):
health_persistence.clear_error(error_key)
return {
'status': 'OK',
'is_cluster': is_cluster,
'services_checked': len(services_to_check),
'checks': checks
}
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Service check command failed: {str(e)}'
}
def _is_benign_error(self, line: str) -> bool:
"""Check if log line matches benign error patterns"""
line_lower = line.lower()
for pattern in self.BENIGN_ERROR_PATTERNS:
if re.search(pattern, line_lower):
return True
return False
def _classify_log_severity(self, line: str) -> Optional[str]:
"""
Classify log line severity intelligently.
Returns: 'CRITICAL', 'WARNING', or None (benign/info)
"""
line_lower = line.lower()
# Check if benign first
if self._is_benign_error(line):
return None
# Check critical keywords
for keyword in self.CRITICAL_LOG_KEYWORDS:
if re.search(keyword, line_lower):
return 'CRITICAL'
# Check warning keywords
for keyword in self.WARNING_LOG_KEYWORDS:
if re.search(keyword, line_lower):
return 'WARNING'
# Generic error/warning classification based on common terms
if 'critical' in line_lower or 'fatal' in line_lower or 'panic' in line_lower:
return 'CRITICAL'
elif 'error' in line_lower or 'fail' in line_lower:
return 'WARNING'
elif 'warning' in line_lower or 'warn' in line_lower:
return None # Generic warnings are often informational and not critical
return None
def _check_logs_with_persistence(self) -> Dict[str, Any]:
"""
Intelligent log checking with cascade detection and persistence.
Focuses on detecting significant error patterns rather than transient warnings.
New thresholds:
- CASCADE: ≥15 errors (increased from 10)
- SPIKE: ≥5 errors AND 4x increase (more restrictive)
- PERSISTENT: Same error in 3 consecutive checks
"""
cache_key = 'logs_analysis'
current_time = time.time()
# Cache the result for 5 minutes to avoid excessive journalctl calls
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < self.LOG_CHECK_INTERVAL:
# Return the full cached result (which includes 'checks' dict)
cached = self.cached_results.get(cache_key)
if cached:
return cached
return {'status': 'OK', 'checks': {
'log_error_cascade': {'status': 'OK', 'detail': 'No cascading errors'},
'log_error_spike': {'status': 'OK', 'detail': 'No error spikes'},
'log_persistent_errors': {'status': 'OK', 'detail': 'No persistent patterns'},
'log_critical_errors': {'status': 'OK', 'detail': 'No critical errors'}
}}
try:
# Fetch logs from the last 3 minutes for immediate issue detection
result_recent = subprocess.run(
['journalctl', '--since', '3 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
# Fetch logs from the previous 3-minute interval to detect spikes/cascades
result_previous = subprocess.run(
['journalctl', '--since', '6 minutes ago', '--until', '3 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result_recent.returncode == 0:
recent_lines = result_recent.stdout.strip().split('\n')
previous_lines = result_previous.stdout.strip().split('\n') if result_previous.returncode == 0 else []
recent_patterns = defaultdict(int)
previous_patterns = defaultdict(int)
critical_errors_found = {} # To store unique critical error lines for persistence
for line in recent_lines:
if not line.strip():
continue
# Skip benign errors
if self._is_benign_error(line):
continue
# Classify severity
severity = self._classify_log_severity(line)
if severity is None: # Skip informational or classified benign lines
continue
# Normalize to a pattern for grouping
pattern = self._normalize_log_pattern(line)
if severity == 'CRITICAL':
pattern_hash = hashlib.md5(pattern.encode()).hexdigest()[:8]
error_key = f'log_critical_{pattern_hash}'
if pattern not in critical_errors_found:
critical_errors_found[pattern] = line
# Record persistent error if it's not already active
if not health_persistence.is_error_active(error_key, category='logs'):
health_persistence.record_error(
error_key=error_key,
category='logs',
severity='CRITICAL',
reason=line[:100], # Truncate reason for brevity
details={'pattern': pattern, 'dismissable': True}
)
recent_patterns[pattern] += 1
if pattern in self.persistent_log_patterns:
self.persistent_log_patterns[pattern]['count'] += 1
self.persistent_log_patterns[pattern]['last_seen'] = current_time
else:
self.persistent_log_patterns[pattern] = {
'count': 1,
'first_seen': current_time,
'last_seen': current_time
}
for line in previous_lines:
if not line.strip():
continue
# Skip benign errors
if self._is_benign_error(line):
continue
# Classify severity
severity = self._classify_log_severity(line)
if severity is None: # Skip informational or classified benign lines
continue
# Normalize to a pattern for grouping
pattern = self._normalize_log_pattern(line)
previous_patterns[pattern] += 1
cascading_errors = {
pattern: count for pattern, count in recent_patterns.items()
if count >= 15 and self._classify_log_severity(pattern) in ['WARNING', 'CRITICAL']
}
spike_errors = {}
for pattern, recent_count in recent_patterns.items():
prev_count = previous_patterns.get(pattern, 0)
if recent_count >= 5 and recent_count >= prev_count * 4:
spike_errors[pattern] = recent_count
persistent_errors = {}
for pattern, data in self.persistent_log_patterns.items():
time_span = current_time - data['first_seen']
if data['count'] >= 3 and time_span >= 900: # 15 minutes
persistent_errors[pattern] = data['count']
# Record as warning if not already recorded
pattern_hash = hashlib.md5(pattern.encode()).hexdigest()[:8]
error_key = f'log_persistent_{pattern_hash}'
if not health_persistence.is_error_active(error_key, category='logs'):
health_persistence.record_error(
error_key=error_key,
category='logs',
severity='WARNING',
reason=f'Persistent error pattern detected: {pattern[:80]}',
details={'pattern': pattern, 'dismissable': True, 'occurrences': data['count']}
)
patterns_to_remove = [
p for p, data in self.persistent_log_patterns.items()
if current_time - data['last_seen'] > 1800
]
for pattern in patterns_to_remove:
del self.persistent_log_patterns[pattern]
unique_critical_count = len(critical_errors_found)
cascade_count = len(cascading_errors)
spike_count = len(spike_errors)
persistent_count = len(persistent_errors)
if unique_critical_count > 0:
status = 'CRITICAL'
# Get a representative critical error reason
representative_error = next(iter(critical_errors_found.values()))
reason = f'Critical error detected: {representative_error[:100]}'
elif cascade_count > 0:
status = 'WARNING'
reason = f'Error cascade detected: {cascade_count} pattern(s) repeating ≥15 times in 3min'
elif spike_count > 0:
status = 'WARNING'
reason = f'Error spike detected: {spike_count} pattern(s) increased 4x'
elif persistent_count > 0:
status = 'WARNING'
reason = f'Persistent errors: {persistent_count} pattern(s) recurring over 15+ minutes'
else:
# No significant issues found
status = 'OK'
reason = None
# Record/clear persistent errors for each log sub-check so Dismiss works
log_sub_checks = {
'log_error_cascade': {'active': cascade_count > 0, 'severity': 'WARNING',
'reason': f'{cascade_count} pattern(s) repeating >=15 times'},
'log_error_spike': {'active': spike_count > 0, 'severity': 'WARNING',
'reason': f'{spike_count} pattern(s) with 4x increase'},
'log_persistent_errors': {'active': persistent_count > 0, 'severity': 'WARNING',
'reason': f'{persistent_count} recurring pattern(s) over 15+ min'},
'log_critical_errors': {'active': unique_critical_count > 0, 'severity': 'CRITICAL',
'reason': f'{unique_critical_count} critical error(s) found', 'dismissable': False},
}
# Track which sub-checks were dismissed
dismissed_keys = set()
for err_key, info in log_sub_checks.items():
if info['active']:
is_dismissable = info.get('dismissable', True)
result = health_persistence.record_error(
error_key=err_key,
category='logs',
severity=info['severity'],
reason=info['reason'],
details={'dismissable': is_dismissable}
)
if result and result.get('type') == 'skipped_acknowledged':
dismissed_keys.add(err_key)
elif health_persistence.is_error_active(err_key):
health_persistence.clear_error(err_key)
# Build checks dict - downgrade dismissed items to INFO
def _log_check_status(key, active, severity):
if not active:
return 'OK'
if key in dismissed_keys:
return 'INFO'
return severity
log_checks = {
'log_error_cascade': {
'status': _log_check_status('log_error_cascade', cascade_count > 0, 'WARNING'),
'detail': f'{cascade_count} pattern(s) repeating >=15 times' if cascade_count > 0 else 'No cascading errors',
'dismissable': True,
'dismissed': 'log_error_cascade' in dismissed_keys
},
'log_error_spike': {
'status': _log_check_status('log_error_spike', spike_count > 0, 'WARNING'),
'detail': f'{spike_count} pattern(s) with 4x increase' if spike_count > 0 else 'No error spikes',
'dismissable': True,
'dismissed': 'log_error_spike' in dismissed_keys
},
'log_persistent_errors': {
'status': _log_check_status('log_persistent_errors', persistent_count > 0, 'WARNING'),
'detail': f'{persistent_count} recurring pattern(s) over 15+ min' if persistent_count > 0 else 'No persistent patterns',
'dismissable': True,
'dismissed': 'log_persistent_errors' in dismissed_keys
},
'log_critical_errors': {
'status': _log_check_status('log_critical_errors', unique_critical_count > 0, 'CRITICAL'),
'detail': f'{unique_critical_count} critical error(s) found' if unique_critical_count > 0 else 'No critical errors',
'dismissable': False
}
}
# Recalculate overall status considering dismissed items
active_issues = [k for k, v in log_checks.items() if v['status'] in ('WARNING', 'CRITICAL')]
if not active_issues:
status = 'OK'
reason = None
log_result = {'status': status, 'checks': log_checks}
if reason:
log_result['reason'] = reason
self.cached_results[cache_key] = log_result
self.last_check_times[cache_key] = current_time
return log_result
# If journalctl command failed or returned no data
ok_result = {'status': 'OK', 'checks': {
'log_error_cascade': {'status': 'OK', 'detail': 'No cascading errors'},
'log_error_spike': {'status': 'OK', 'detail': 'No error spikes'},
'log_persistent_errors': {'status': 'OK', 'detail': 'No persistent patterns'},
'log_critical_errors': {'status': 'OK', 'detail': 'No critical errors'}
}}
self.cached_results[cache_key] = ok_result
self.last_check_times[cache_key] = current_time
return ok_result
except Exception as e:
print(f"[HealthMonitor] Log check failed: {e}")
return {'status': 'UNKNOWN', 'reason': f'Log check unavailable: {str(e)}', 'checks': {}}
def _normalize_log_pattern(self, line: str) -> str:
"""
Normalize log line to a pattern for grouping similar errors.
Removes timestamps, PIDs, IDs, paths, and other variables.
"""
# Remove standard syslog timestamp and process info if present
pattern = re.sub(r'^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s+\S+(\s+\[\d+\])?:\s+', '', line)
pattern = re.sub(r'\d{4}-\d{2}-\d{2}', '', pattern) # Remove dates
pattern = re.sub(r'\d{2}:\d{2}:\d{2}', '', pattern) # Remove times
pattern = re.sub(r'pid[:\s]+\d+', 'pid:XXX', pattern.lower()) # Normalize PIDs
pattern = re.sub(r'\b\d{3,6}\b', 'ID', pattern) # Normalize IDs (common for container/VM IDs)
pattern = re.sub(r'/dev/\S+', '/dev/XXX', pattern) # Normalize device paths
pattern = re.sub(r'/\S+/\S+', '/PATH/', pattern) # Normalize general paths
pattern = re.sub(r'0x[0-9a-f]+', '0xXXX', pattern) # Normalize hex values
pattern = re.sub(r'\b(uuid|guid|hash)[:=]\s*[\w-]+\b', r'\1=XXX', pattern.lower()) # Normalize UUIDs/GUIDs
pattern = re.sub(r'\s+', ' ', pattern).strip() # Normalize whitespace
return pattern[:150] # Keep first 150 characters to avoid overly long patterns
def _check_updates(self) -> Optional[Dict[str, Any]]:
"""
Check for pending system updates.
- WARNING: Security updates available, or system not updated >1 year (365 days).
- CRITICAL: System not updated >18 months (548 days).
- INFO: Kernel/PVE updates available, or >50 non-security updates pending.
"""
cache_key = 'updates_check'
current_time = time.time()
# Cache for 10 minutes
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 600:
return self.cached_results.get(cache_key)
try:
apt_history_path = '/var/log/apt/history.log'
last_update_days = None
if os.path.exists(apt_history_path):
try:
mtime = os.path.getmtime(apt_history_path)
days_since_update = (current_time - mtime) / 86400
last_update_days = int(days_since_update)
except Exception:
pass # Ignore if mtime fails
# Perform a dry run of apt-get upgrade to see pending packages
try:
result = subprocess.run(
['apt-get', 'upgrade', '--dry-run'],
capture_output=True,
text=True,
timeout=10
)
except subprocess.TimeoutExpired:
print("[HealthMonitor] apt-get upgrade --dry-run timed out")
return {
'status': 'UNKNOWN',
'reason': 'apt-get timed out - repository may be unreachable',
'count': 0,
'checks': {}
}
status = 'OK'
reason = None
update_count = 0
security_updates_packages = []
kernel_pve_updates_packages = []
sec_result = None
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
for line in lines:
# 'Inst ' indicates a package will be installed/upgraded
if line.startswith('Inst '):
update_count += 1
line_lower = line.lower()
package_name = line.split()[1].split(':')[0] # Get package name, strip arch if present
# Check for security updates (common pattern in repo names)
if 'security' in line_lower or 'debian-security' in line_lower:
security_updates_packages.append(package_name)
# Check for kernel or critical PVE updates
if any(pkg in line_lower for pkg in ['linux-image', 'pve-kernel', 'pve-manager', 'proxmox-ve', 'qemu-server', 'pve-api-core']):
kernel_pve_updates_packages.append(package_name)
# Determine overall status based on findings
if security_updates_packages:
status = 'WARNING'
reason = f'{len(security_updates_packages)} security update(s) available'
# Record persistent error for security updates to ensure it's visible
sec_result = health_persistence.record_error(
error_key='security_updates',
category='updates',
severity='WARNING',
reason=reason,
details={'count': len(security_updates_packages), 'packages': security_updates_packages[:5], 'dismissable': True}
)
# If previously dismissed, downgrade to INFO
if sec_result and sec_result.get('type') == 'skipped_acknowledged':
status = 'INFO'
reason = None
elif last_update_days and last_update_days >= 548:
# 18+ months without updates - CRITICAL
status = 'CRITICAL'
reason = f'System not updated in {last_update_days} days (>18 months)'
health_persistence.record_error(
error_key='system_age',
category='updates',
severity='CRITICAL',
reason=reason,
details={'days': last_update_days, 'update_count': update_count, 'dismissable': False}
)
elif last_update_days and last_update_days >= 365:
# 1+ year without updates - WARNING
status = 'WARNING'
reason = f'System not updated in {last_update_days} days (>1 year)'
health_persistence.record_error(
error_key='system_age',
category='updates',
severity='WARNING',
reason=reason,
details={'days': last_update_days, 'update_count': update_count}
)
elif kernel_pve_updates_packages:
# Informational: Kernel or critical PVE components need update
status = 'INFO'
reason = f'{len(kernel_pve_updates_packages)} kernel/PVE update(s) available'
elif update_count > 50:
# Informational: Large number of pending updates
status = 'INFO'
reason = f'{update_count} updates pending (consider maintenance window)'
# If apt-get upgrade --dry-run failed
elif result.returncode != 0:
status = 'WARNING'
reason = 'Failed to check for updates (apt-get error)'
# Build checks dict for updates sub-items
update_age_status = 'CRITICAL' if (last_update_days and last_update_days >= 548) else ('WARNING' if (last_update_days and last_update_days >= 365) else 'OK')
sec_dismissed = security_updates_packages and sec_result and sec_result.get('type') == 'skipped_acknowledged'
sec_status = 'INFO' if sec_dismissed else ('WARNING' if security_updates_packages else 'OK')
kernel_status = 'INFO' if kernel_pve_updates_packages else 'OK'
checks = {
'security_updates': {
'status': sec_status,
'detail': f'{len(security_updates_packages)} security update(s) pending' if security_updates_packages else 'No security updates pending',
'dismissable': True if security_updates_packages and not sec_dismissed else False,
'dismissed': bool(sec_dismissed)
},
'system_age': {
'status': update_age_status,
'detail': f'Last updated {last_update_days} day(s) ago' if last_update_days is not None else 'Unknown',
'dismissable': False if update_age_status == 'CRITICAL' else True if update_age_status == 'WARNING' else False
},
'pending_updates': {
'status': 'INFO' if update_count > 50 else 'OK',
'detail': f'{update_count} package(s) pending',
},
'kernel_pve': {
'status': kernel_status,
'detail': f'{len(kernel_pve_updates_packages)} kernel/PVE update(s)' if kernel_pve_updates_packages else 'Kernel/PVE up to date',
}
}
# Construct result dictionary
update_result = {
'status': status,
'count': update_count,
'checks': checks
}
if reason:
update_result['reason'] = reason
if last_update_days is not None:
update_result['days_since_update'] = last_update_days
self.cached_results[cache_key] = update_result
self.last_check_times[cache_key] = current_time
return update_result
except Exception as e:
print(f"[HealthMonitor] Updates check failed: {e}")
return {'status': 'UNKNOWN', 'reason': f'Updates check unavailable: {str(e)}', 'count': 0, 'checks': {}}
def _check_fail2ban_bans(self) -> Dict[str, Any]:
"""
Check if fail2ban is installed and if there are currently banned IPs.
Cached for 60 seconds to avoid hammering fail2ban-client.
Returns:
{'installed': bool, 'active': bool, 'status': str, 'detail': str,
'banned_count': int, 'jails': [...], 'banned_ips': [...]}
"""
cache_key = 'fail2ban_bans'
current_time = time.time()
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key, {'installed': False, 'status': 'OK', 'detail': 'Not installed'})
result = {'installed': False, 'active': False, 'status': 'OK', 'detail': 'Not installed', 'banned_count': 0, 'jails': [], 'banned_ips': []}
try:
# Check if fail2ban-client exists
which_result = subprocess.run(
['which', 'fail2ban-client'],
capture_output=True, text=True, timeout=2
)
if which_result.returncode != 0:
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
result['installed'] = True
# Check if fail2ban service is active
active_check = subprocess.run(
['systemctl', 'is-active', 'fail2ban'],
capture_output=True, text=True, timeout=2
)
if active_check.stdout.strip() != 'active':
result['detail'] = 'Fail2Ban installed but service not active'
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
result['active'] = True
# Get list of active jails
jails_result = subprocess.run(
['fail2ban-client', 'status'],
capture_output=True, text=True, timeout=3
)
jails = []
if jails_result.returncode == 0:
for line in jails_result.stdout.split('\n'):
if 'Jail list:' in line:
jail_str = line.split('Jail list:')[1].strip()
jails = [j.strip() for j in jail_str.split(',') if j.strip()]
break
if not jails:
result['detail'] = 'Fail2Ban active, no jails configured'
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
result['jails'] = jails
# Check each jail for banned IPs
total_banned = 0
all_banned_ips = []
jails_with_bans = []
for jail in jails:
try:
jail_result = subprocess.run(
['fail2ban-client', 'status', jail],
capture_output=True, text=True, timeout=2
)
if jail_result.returncode == 0:
for line in jail_result.stdout.split('\n'):
if 'Currently banned:' in line:
try:
count = int(line.split('Currently banned:')[1].strip())
if count > 0:
total_banned += count
jails_with_bans.append(jail)
except (ValueError, IndexError):
pass
elif 'Banned IP list:' in line:
ips_str = line.split('Banned IP list:')[1].strip()
if ips_str:
ips = [ip.strip() for ip in ips_str.split() if ip.strip()]
all_banned_ips.extend(ips[:10]) # Limit to 10 IPs per jail
except Exception:
pass
result['banned_count'] = total_banned
result['banned_ips'] = all_banned_ips[:20] # Max 20 total
if total_banned > 0:
jails_str = ', '.join(jails_with_bans)
msg = f'{total_banned} IP(s) currently banned by Fail2Ban (jails: {jails_str})'
result['status'] = 'WARNING'
result['detail'] = msg
# Record in persistence (dismissable)
health_persistence.record_error(
error_key='fail2ban',
category='security',
severity='WARNING',
reason=msg,
details={
'banned_count': total_banned,
'jails': jails_with_bans,
'banned_ips': all_banned_ips[:5],
'dismissable': True
}
)
else:
result['detail'] = f'Fail2Ban active ({len(jails)} jail(s), no current bans)'
# Auto-resolve if previously banned IPs are now gone
if health_persistence.is_error_active('fail2ban'):
health_persistence.clear_error('fail2ban')
except Exception as e:
result['detail'] = f'Unable to check Fail2Ban: {str(e)[:50]}'
self.cached_results[cache_key] = result
self.last_check_times[cache_key] = current_time
return result
def _check_security(self) -> Dict[str, Any]:
"""
Check security-related items with detailed sub-item breakdown:
- Uptime check: >1 year without kernel update indicates vulnerability
- SSL certificates: PVE certificate expiration
- Login attempts: Excessive failed logins (brute force detection)
- Fail2Ban: Currently banned IPs (if fail2ban is installed)
Returns a result with 'checks' dict containing per-item status.
"""
try:
issues = []
checks = {
'uptime': {'status': 'OK', 'detail': ''},
'certificates': {'status': 'OK', 'detail': ''},
'login_attempts': {'status': 'OK', 'detail': ''},
}
# Sub-check 1: Uptime for potential kernel vulnerabilities
try:
uptime_seconds = time.time() - psutil.boot_time()
uptime_days = uptime_seconds / 86400
if uptime_days > 365:
updates_data = self.cached_results.get('updates_check')
if updates_data and updates_data.get('days_since_update', 9999) > 365:
msg = f'Uptime {int(uptime_days)} days (>1 year, consider updating kernel/system)'
issues.append(msg)
checks['uptime'] = {'status': 'WARNING', 'detail': msg, 'days': int(uptime_days), 'dismissable': True}
else:
checks['uptime'] = {'status': 'OK', 'detail': f'Uptime {int(uptime_days)} days, system recently updated'}
else:
checks['uptime'] = {'status': 'OK', 'detail': f'Uptime {int(uptime_days)} days'}
except Exception:
checks['uptime'] = {'status': 'OK', 'detail': 'Unable to determine uptime'}
# Sub-check 2: SSL certificates
cert_status = self._check_certificates()
if cert_status:
cert_sev = cert_status.get('status', 'OK')
cert_reason = cert_status.get('reason', '')
checks['certificates'] = {
'status': cert_sev,
'detail': cert_reason if cert_reason else 'Certificate valid',
'dismissable': True if cert_sev not in ['OK', 'INFO'] else False
}
if cert_sev not in ['OK', 'INFO']:
issues.append(cert_reason or 'Certificate issue')
# Sub-check 3: Failed login attempts (brute force detection)
try:
result = subprocess.run(
['journalctl', '--since', '24 hours ago', '--no-pager'],
capture_output=True,
text=True,
timeout=3
)
failed_logins = 0
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
if 'authentication failure' in line_lower or 'failed password' in line_lower or 'invalid user' in line_lower:
failed_logins += 1
if failed_logins > 50:
msg = f'{failed_logins} failed login attempts in 24h'
issues.append(msg)
checks['login_attempts'] = {'status': 'WARNING', 'detail': msg, 'count': failed_logins, 'dismissable': True}
elif failed_logins > 0:
checks['login_attempts'] = {'status': 'OK', 'detail': f'{failed_logins} failed attempts in 24h (within threshold)', 'count': failed_logins}
else:
checks['login_attempts'] = {'status': 'OK', 'detail': 'No failed login attempts in 24h', 'count': 0}
except Exception:
checks['login_attempts'] = {'status': 'OK', 'detail': 'Unable to check login attempts'}
# Sub-check 4: Fail2Ban ban detection (only show if installed)
try:
f2b = self._check_fail2ban_bans()
if f2b.get('installed', False):
f2b_status = f2b.get('status', 'OK')
checks['fail2ban'] = {
'status': f2b_status,
'dismissable': True if f2b_status not in ['OK'] else False,
'detail': f2b.get('detail', ''),
'installed': True,
'banned_count': f2b.get('banned_count', 0)
}
if f2b.get('status') == 'WARNING':
issues.append(f2b.get('detail', 'Fail2Ban bans detected'))
# If not installed, simply don't add it to checks
except Exception:
pass
# Determine overall security status
if issues:
# Check if any sub-check is CRITICAL
has_critical = any(c.get('status') == 'CRITICAL' for c in checks.values())
overall_status = 'CRITICAL' if has_critical else 'WARNING'
return {
'status': overall_status,
'reason': '; '.join(issues[:2]),
'checks': checks
}
return {
'status': 'OK',
'checks': checks
}
except Exception as e:
print(f"[HealthMonitor] Security check failed: {e}")
return {'status': 'UNKNOWN', 'reason': f'Security check unavailable: {str(e)}', 'checks': {}}
def _check_certificates(self) -> Optional[Dict[str, Any]]:
"""
Check SSL certificate expiration for PVE's default certificate.
INFO: Self-signed or no cert configured (normal for internal servers)
WARNING: Expires <30 days
CRITICAL: Expired
"""
cache_key = 'certificates'
current_time = time.time()
# Cache for 1 day (86400 seconds)
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 86400:
return self.cached_results.get(cache_key)
try:
cert_path = '/etc/pve/local/pve-ssl.pem'
if not os.path.exists(cert_path):
cert_result = {
'status': 'INFO',
'reason': 'Self-signed or default PVE certificate'
}
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
# Use openssl to get the expiry date
result = subprocess.run(
['openssl', 'x509', '-enddate', '-noout', '-in', cert_path],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
date_str = result.stdout.strip().replace('notAfter=', '')
try:
# Parse the date string (format can vary, e.g., 'Jun 15 10:00:00 2024 GMT')
# Attempt common formats
exp_date = None
try:
# Try more detailed format first
exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
except ValueError:
# Fallback to simpler format if needed
try:
exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y')
except ValueError:
# Fallback for "notAfter=..." string itself being the issue
if 'notAfter=' in date_str: # If it's the raw string itself
pass # Will result in 'INFO' status
if exp_date:
days_until_expiry = (exp_date - datetime.now()).days
if days_until_expiry < 0:
status = 'CRITICAL'
reason = 'Certificate expired'
elif days_until_expiry < 30:
status = 'WARNING'
reason = f'Certificate expires in {days_until_expiry} days'
else:
status = 'OK'
reason = None
cert_result = {'status': status}
if reason:
cert_result['reason'] = reason
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
except Exception as e:
print(f"[HealthMonitor] Error parsing certificate expiry date '{date_str}': {e}")
# Fall through to return INFO if parsing fails
# If openssl command failed or date parsing failed
return {'status': 'INFO', 'reason': 'Certificate check inconclusive'}
except Exception as e:
print(f"[HealthMonitor] Error checking certificates: {e}")
return {'status': 'OK'} # Return OK on exception
def _check_disk_health_from_events(self) -> Dict[str, Any]:
"""
Check for disk health warnings/errors from system logs (journalctl).
Looks for SMART warnings and specific disk errors.
Returns dict of disk issues found.
"""
disk_issues = {}
try:
# Check journalctl for warnings/errors related to disks in the last hour
result = subprocess.run(
['journalctl', '--since', '1 hour ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
# Check for SMART warnings/errors
if 'smart' in line_lower and ('warning' in line_lower or 'error' in line_lower or 'fail' in line_lower):
# Extract disk name using regex for common disk identifiers
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+|hd\d+)', line)
if disk_match:
disk_name = disk_match.group(1)
# Prioritize CRITICAL if already warned, otherwise set to WARNING
if disk_name not in disk_issues or disk_issues[f'/dev/{disk_name}']['status'] != 'CRITICAL':
disk_issues[f'/dev/{disk_name}'] = {
'status': 'WARNING',
'reason': 'SMART warning detected'
}
# Check for specific disk I/O or medium errors
if any(keyword in line_lower for keyword in ['disk error', 'ata error', 'medium error', 'io error']):
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+|hd\d+)', line)
if disk_match:
disk_name = disk_match.group(1)
disk_issues[f'/dev/{disk_name}'] = {
'status': 'CRITICAL',
'reason': 'Disk error detected'
}
except Exception as e:
print(f"[HealthMonitor] Error checking disk health from events: {e}")
# Return empty dict on error, as this check isn't system-critical itself
pass
return disk_issues
def _check_zfs_pool_health(self) -> Dict[str, Any]:
"""
Check ZFS pool health status using 'zpool status' command.
Returns dict of pools with non-ONLINE status (DEGRADED, FAULTED, UNAVAIL, etc.).
"""
zfs_issues = {}
try:
# First check if 'zpool' command exists to avoid errors on non-ZFS systems
result_which = subprocess.run(
['which', 'zpool'],
capture_output=True,
text=True,
timeout=1
)
if result_which.returncode != 0:
# ZFS is not installed or 'zpool' command not in PATH, so no ZFS issues to report.
return zfs_issues
# Get list of all pools and their health status
result = subprocess.run(
['zpool', 'list', '-H', '-o', 'name,health'], # -H for no header
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
for line in lines:
if not line.strip():
continue
parts = line.split()
if len(parts) >= 2:
pool_name = parts[0]
pool_health = parts[1].upper() # Ensure uppercase for consistent comparison
# 'ONLINE' is the healthy state. Any other status indicates a problem.
if pool_health != 'ONLINE':
if pool_health in ['DEGRADED', 'FAULTED', 'UNAVAIL', 'REMOVED']:
# These are critical states
status = 'CRITICAL'
reason = f'ZFS pool {pool_health.lower()}'
else:
# Any other non-ONLINE state is at least a warning
status = 'WARNING'
reason = f'ZFS pool status: {pool_health.lower()}'
# Use a unique key for each pool issue
zfs_issues[f'zpool_{pool_name}'] = {
'status': status,
'reason': reason,
'pool_name': pool_name,
'health': pool_health
}
except Exception as e:
print(f"[HealthMonitor] Error checking ZFS pool health: {e}")
# If 'zpool status' command itself fails, we can't report ZFS issues.
# Return empty dict as no specific ZFS issues were detected by this check.
pass
return zfs_issues
def _check_proxmox_storage(self) -> Optional[Dict[str, Any]]:
"""
Check Proxmox storage status using the proxmox_storage_monitor module.
Detects unavailable storages configured in PVE.
Returns CRITICAL if any configured storage is unavailable.
Returns None if the module is not available.
"""
if not PROXMOX_STORAGE_AVAILABLE:
return None
try:
# Reload configuration to ensure we have the latest storage definitions
proxmox_storage_monitor.reload_configuration()
# Get the current status of all configured storages
storage_status = proxmox_storage_monitor.get_storage_status()
unavailable_storages = storage_status.get('unavailable', [])
if not unavailable_storages:
# All storages are available. We should also clear any previously recorded storage errors.
active_errors = health_persistence.get_active_errors()
for error in active_errors:
if error.get('category') == 'storage' and error.get('error_key', '').startswith('storage_unavailable_'):
health_persistence.clear_error(error['error_key'])
# Build checks from all configured storages for descriptive display
available_storages = storage_status.get('available', [])
checks = {}
for st in available_storages:
st_name = st.get('name', 'unknown')
st_type = st.get('type', 'unknown')
checks[st_name] = {
'status': 'OK',
'detail': f'{st_type} storage available'
}
if not checks:
checks['proxmox_storages'] = {'status': 'OK', 'detail': 'All storages available'}
return {'status': 'OK', 'checks': checks}
storage_details = {}
for storage in unavailable_storages:
storage_name = storage['name']
error_key = f'storage_unavailable_{storage_name}'
status_detail = storage.get('status_detail', 'unavailable')
# Formulate a descriptive reason for the issue
if status_detail == 'not_found':
reason = f"Storage '{storage_name}' is configured but not found on the server."
elif status_detail == 'unavailable':
reason = f"Storage '{storage_name}' is not available (connection error or backend issue)."
else:
reason = f"Storage '{storage_name}' has status: {status_detail}."
# Record a persistent CRITICAL error for each unavailable storage
health_persistence.record_error(
error_key=error_key,
category='storage',
severity='CRITICAL',
reason=reason,
details={
'storage_name': storage_name,
'storage_type': storage.get('type', 'unknown'),
'status_detail': status_detail,
'dismissable': False
}
)
# Add to details dict with dismissable false for frontend
storage_details[storage_name] = {
'reason': reason,
'type': storage.get('type', 'unknown'),
'status': status_detail,
'dismissable': False
}
# Build checks from storage_details
checks = {}
for st_name, st_info in storage_details.items():
checks[st_name] = {
'status': 'CRITICAL',
'detail': st_info.get('reason', 'Unavailable'),
'dismissable': False
}
# Also add available storages
available_list = storage_status.get('available', [])
unavail_names = {s['name'] for s in unavailable_storages}
for st in available_list:
if st.get('name') not in unavail_names and st.get('name') not in checks:
checks[st['name']] = {
'status': 'OK',
'detail': f'{st.get("type", "unknown")} storage available'
}
return {
'status': 'CRITICAL',
'reason': f'{len(unavailable_storages)} Proxmox storage(s) unavailable',
'details': storage_details,
'checks': checks
}
except Exception as e:
print(f"[HealthMonitor] Error checking Proxmox storage: {e}")
# Return None on exception to indicate the check could not be performed, not necessarily a failure.
return None
def get_health_status(self) -> Dict[str, Any]:
"""
Main function to get the comprehensive health status.
This function orchestrates all individual checks and aggregates results.
"""
# Trigger all checks, including those with caching
detailed_status = self.get_detailed_status()
overall_status = self.get_overall_status()
system_info = self.get_system_info()
return {
'system_info': system_info,
'overall_health': overall_status,
'detailed_health': detailed_status,
'timestamp': datetime.now().isoformat()
}
# Duplicate get_detailed_status was removed during refactor (v1.1)
# Global instance
health_monitor = HealthMonitor()