Files
ProxMenux/AppImage/scripts/health_monitor.py
2025-11-27 12:34:51 +01:00

2076 lines
92 KiB
Python

"""
ProxMenux Health Monitor Module
Provides comprehensive, lightweight health checks for Proxmox systems.
Optimized for minimal system impact with intelligent thresholds and hysteresis.
Author: MacRimi
Version: 1.2 (Always returns all 10 categories)
"""
import psutil
import subprocess
import json
import time
import os
from typing import Dict, List, Any, Tuple, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import re
from health_persistence import health_persistence
try:
from proxmox_storage_monitor import proxmox_storage_monitor
PROXMOX_STORAGE_AVAILABLE = True
except ImportError:
PROXMOX_STORAGE_AVAILABLE = False
class HealthMonitor:
"""
Monitors system health across multiple components with minimal impact.
Implements hysteresis, intelligent caching, progressive escalation, and persistent error tracking.
Always returns all 10 health categories.
"""
# CPU Thresholds
CPU_WARNING = 85
CPU_CRITICAL = 95
CPU_RECOVERY = 75
CPU_WARNING_DURATION = 300 # 5 minutes sustained
CPU_CRITICAL_DURATION = 300 # 5 minutes sustained
CPU_RECOVERY_DURATION = 120
# Memory Thresholds
MEMORY_WARNING = 85
MEMORY_CRITICAL = 95
MEMORY_DURATION = 60
SWAP_WARNING_DURATION = 300
SWAP_CRITICAL_PERCENT = 5
SWAP_CRITICAL_DURATION = 120
# Storage Thresholds
STORAGE_WARNING = 85
STORAGE_CRITICAL = 95
# Temperature Thresholds
TEMP_WARNING = 80
TEMP_CRITICAL = 90
# Network Thresholds
NETWORK_LATENCY_WARNING = 100
NETWORK_LATENCY_CRITICAL = 300
NETWORK_TIMEOUT = 0.9
NETWORK_INACTIVE_DURATION = 600
# Log Thresholds
LOG_ERRORS_WARNING = 5
LOG_ERRORS_CRITICAL = 10
LOG_WARNINGS_WARNING = 15
LOG_WARNINGS_CRITICAL = 30
LOG_CHECK_INTERVAL = 300
# Updates Thresholds
UPDATES_WARNING = 365 # Only warn after 1 year without updates
UPDATES_CRITICAL = 730 # Critical after 2 years
# Known benign errors from Proxmox that should not trigger alerts
BENIGN_ERROR_PATTERNS = [
r'got inotify poll request in wrong process',
r'auth key pair too old, rotating',
r'proxy detected vanished client connection',
r'worker \d+ finished',
r'connection timed out',
r'disconnect peer',
]
CRITICAL_LOG_KEYWORDS = [
'out of memory', 'oom_kill', 'kernel panic',
'filesystem read-only', 'cannot mount',
'raid.*failed', 'md.*device failed',
'ext4-fs error', 'xfs.*corruption',
'lvm activation failed',
'hardware error', 'mce:',
'segfault', 'general protection fault'
]
WARNING_LOG_KEYWORDS = [
'i/o error', 'ata error', 'scsi error',
'task hung', 'blocked for more than',
'failed to start', 'service.*failed',
'disk.*offline', 'disk.*removed'
]
# PVE Critical Services
PVE_SERVICES = ['pveproxy', 'pvedaemon', 'pvestatd', 'pve-cluster']
def __init__(self):
"""Initialize health monitor with state tracking"""
self.state_history = defaultdict(list)
self.last_check_times = {}
self.cached_results = {}
self.network_baseline = {}
self.io_error_history = defaultdict(list)
self.failed_vm_history = set() # Track VMs that failed to start
try:
health_persistence.cleanup_old_errors()
except Exception as e:
print(f"[HealthMonitor] Cleanup warning: {e}")
def get_system_info(self) -> Dict[str, Any]:
"""
Get lightweight system info for header display.
Returns: hostname, uptime, and cached health status.
This is extremely lightweight and uses cached health status.
"""
try:
# Get hostname
hostname = os.uname().nodename
# Get uptime (very cheap operation)
uptime_seconds = time.time() - psutil.boot_time()
# Get cached health status (no expensive checks)
health_status = self.get_cached_health_status()
return {
'hostname': hostname,
'uptime_seconds': int(uptime_seconds),
'uptime': self._format_uptime(uptime_seconds),
'health': health_status,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'hostname': 'unknown',
'uptime_seconds': 0,
'uptime': 'Unknown',
'health': {'status': 'UNKNOWN', 'summary': f'Error: {str(e)}'},
'timestamp': datetime.now().isoformat()
}
def _format_uptime(self, seconds: float) -> str:
"""Format uptime in human-readable format"""
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def get_cached_health_status(self) -> Dict[str, str]:
"""
Get cached health status without running expensive checks.
Returns the last calculated status or triggers a check if too old.
"""
cache_key = 'overall_health'
current_time = time.time()
# If cache exists and is less than 60 seconds old, return it
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key, {'status': 'OK', 'summary': 'System operational'})
# Otherwise, calculate and cache
status = self.get_overall_status()
self.cached_results[cache_key] = {
'status': status['status'],
'summary': status['summary']
}
self.last_check_times[cache_key] = current_time
return self.cached_results[cache_key]
def get_overall_status(self) -> Dict[str, Any]:
"""Get overall health status summary with minimal overhead"""
details = self.get_detailed_status()
overall_status = details.get('overall', 'OK')
summary = details.get('summary', '')
# Count statuses
critical_count = 0
warning_count = 0
ok_count = 0
for category, data in details.get('details', {}).items():
if isinstance(data, dict):
status = data.get('status', 'OK')
if status == 'CRITICAL':
critical_count += 1
elif status == 'WARNING':
warning_count += 1
elif status == 'OK':
ok_count += 1
return {
'status': overall_status,
'summary': summary,
'critical_count': critical_count,
'warning_count': warning_count,
'ok_count': ok_count,
'timestamp': datetime.now().isoformat()
}
def get_detailed_status(self) -> Dict[str, Any]:
"""
Get comprehensive health status with all checks.
Returns JSON structure with ALL 10 categories always present.
Now includes persistent error tracking.
"""
active_errors = health_persistence.get_active_errors()
# No need to create persistent_issues dict here, it's implicitly handled by the checks
details = {
'cpu': {'status': 'OK'},
'memory': {'status': 'OK'},
'storage': {'status': 'OK'}, # This will be overwritten by specific storage checks
'disks': {'status': 'OK'}, # This will be overwritten by disk/filesystem checks
'network': {'status': 'OK'},
'vms': {'status': 'OK'},
'services': {'status': 'OK'},
'logs': {'status': 'OK'},
'updates': {'status': 'OK'},
'security': {'status': 'OK'}
}
critical_issues = []
warning_issues = []
info_issues = [] # Added info_issues to track INFO separately
# --- Priority Order of Checks ---
# Priority 1: Critical PVE Services
services_status = self._check_pve_services()
details['services'] = services_status
if services_status['status'] == 'CRITICAL':
critical_issues.append(f"PVE Services: {services_status.get('reason', 'Service failure')}")
elif services_status['status'] == 'WARNING':
warning_issues.append(f"PVE Services: {services_status.get('reason', 'Service issue')}")
# Priority 1.5: Proxmox Storage Check (External Module)
proxmox_storage_result = self._check_proxmox_storage()
if proxmox_storage_result: # Only process if the check ran (module available)
details['storage'] = proxmox_storage_result
if proxmox_storage_result.get('status') == 'CRITICAL':
critical_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage unavailable'))
elif proxmox_storage_result.get('status') == 'WARNING':
warning_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage issue'))
# Priority 2: Disk/Filesystem Health (Internal checks: usage, ZFS, SMART, IO errors)
storage_status = self._check_storage_optimized()
details['disks'] = storage_status # Use 'disks' for filesystem/disk specific issues
if storage_status.get('status') == 'CRITICAL':
critical_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage failure')}")
elif storage_status.get('status') == 'WARNING':
warning_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage issue')}")
# Priority 3: VMs/CTs Status (with persistence)
vms_status = self._check_vms_cts_with_persistence()
details['vms'] = vms_status
if vms_status.get('status') == 'CRITICAL':
critical_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT failure')}")
elif vms_status.get('status') == 'WARNING':
warning_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT issue')}")
# Priority 4: Network Connectivity
network_status = self._check_network_optimized()
details['network'] = network_status
if network_status.get('status') == 'CRITICAL':
critical_issues.append(f"Network: {network_status.get('reason', 'Network failure')}")
elif network_status.get('status') == 'WARNING':
warning_issues.append(f"Network: {network_status.get('reason', 'Network issue')}")
# Priority 5: CPU Usage (with hysteresis)
cpu_status = self._check_cpu_with_hysteresis()
details['cpu'] = cpu_status
if cpu_status.get('status') == 'CRITICAL':
critical_issues.append(f"CPU: {cpu_status.get('reason', 'CPU critical')}")
elif cpu_status.get('status') == 'WARNING':
warning_issues.append(f"CPU: {cpu_status.get('reason', 'CPU high')}")
# Priority 6: Memory Usage (RAM and Swap)
memory_status = self._check_memory_comprehensive()
details['memory'] = memory_status
if memory_status.get('status') == 'CRITICAL':
critical_issues.append(f"Memory: {memory_status.get('reason', 'Memory critical')}")
elif memory_status.get('status') == 'WARNING':
warning_issues.append(f"Memory: {memory_status.get('reason', 'Memory high')}")
# Priority 7: Log Analysis (with persistence)
logs_status = self._check_logs_with_persistence()
details['logs'] = logs_status
if logs_status.get('status') == 'CRITICAL':
critical_issues.append(f"Logs: {logs_status.get('reason', 'Critical log errors')}")
elif logs_status.get('status') == 'WARNING':
warning_issues.append(f"Logs: {logs_status.get('reason', 'Log warnings')}")
# Priority 8: System Updates
updates_status = self._check_updates()
details['updates'] = updates_status
if updates_status.get('status') == 'CRITICAL':
critical_issues.append(f"Updates: {updates_status.get('reason', 'System not updated')}")
elif updates_status.get('status') == 'WARNING':
warning_issues.append(f"Updates: {updates_status.get('reason', 'Updates pending')}")
elif updates_status.get('status') == 'INFO':
info_issues.append(f"Updates: {updates_status.get('reason', 'Informational update notice')}")
# Priority 9: Security Checks
security_status = self._check_security()
details['security'] = security_status
if security_status.get('status') == 'WARNING':
warning_issues.append(f"Security: {security_status.get('reason', 'Security issue')}")
elif security_status.get('status') == 'INFO':
info_issues.append(f"Security: {security_status.get('reason', 'Security information')}")
# --- Determine Overall Status ---
# Use a fixed order of severity: CRITICAL > WARNING > INFO > OK
if critical_issues:
overall = 'CRITICAL'
summary = '; '.join(critical_issues[:3]) # Limit summary to 3 issues
elif warning_issues:
overall = 'WARNING'
summary = '; '.join(warning_issues[:3])
elif info_issues:
overall = 'OK' # INFO statuses don't degrade overall health
summary = '; '.join(info_issues[:3])
else:
overall = 'OK'
summary = 'All systems operational'
return {
'overall': overall,
'summary': summary,
'details': details,
'timestamp': datetime.now().isoformat()
}
def _check_cpu_with_hysteresis(self) -> Dict[str, Any]:
"""Check CPU with hysteresis to avoid flapping alerts - requires 5min sustained high usage"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
current_time = time.time()
state_key = 'cpu_usage'
self.state_history[state_key].append({
'value': cpu_percent,
'time': current_time
})
self.state_history[state_key] = [
entry for entry in self.state_history[state_key]
if current_time - entry['time'] < 360
]
critical_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.CPU_CRITICAL and
current_time - entry['time'] <= self.CPU_CRITICAL_DURATION
]
warning_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.CPU_WARNING and
current_time - entry['time'] <= self.CPU_WARNING_DURATION
]
recovery_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] < self.CPU_RECOVERY and
current_time - entry['time'] <= self.CPU_RECOVERY_DURATION
]
if len(critical_samples) >= 3:
status = 'CRITICAL'
reason = f'CPU >{self.CPU_CRITICAL}% sustained for {self.CPU_CRITICAL_DURATION}s'
elif len(warning_samples) >= 3 and len(recovery_samples) < 2:
status = 'WARNING'
reason = f'CPU >{self.CPU_WARNING}% sustained for {self.CPU_WARNING_DURATION}s'
else:
status = 'OK'
reason = None
temp_status = self._check_cpu_temperature()
result = {
'status': status,
'usage': round(cpu_percent, 1),
'cores': psutil.cpu_count()
}
if reason:
result['reason'] = reason
if temp_status and temp_status.get('status') != 'UNKNOWN':
result['temperature'] = temp_status
if temp_status.get('status') == 'CRITICAL':
result['status'] = 'CRITICAL'
result['reason'] = temp_status.get('reason')
elif temp_status.get('status') == 'WARNING' and status == 'OK':
result['status'] = 'WARNING'
result['reason'] = temp_status.get('reason')
return result
except Exception as e:
return {'status': 'UNKNOWN', 'reason': f'CPU check failed: {str(e)}'}
def _check_cpu_temperature(self) -> Optional[Dict[str, Any]]:
"""Check CPU temperature with hysteresis (5 min sustained) - cached, max 1 check per minute"""
cache_key = 'cpu_temp'
current_time = time.time()
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key)
try:
result = subprocess.run(
['sensors', '-A', '-u'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
temps = []
for line in result.stdout.split('\n'):
if 'temp' in line.lower() and '_input' in line:
try:
temp = float(line.split(':')[1].strip())
temps.append(temp)
except:
continue
if temps:
max_temp = max(temps)
state_key = 'cpu_temp_history'
self.state_history[state_key].append({
'value': max_temp,
'time': current_time
})
# Keep last 6 minutes of data
self.state_history[state_key] = [
entry for entry in self.state_history[state_key]
if current_time - entry['time'] < 360
]
# Check sustained high temperature (5 minutes)
critical_temp_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.TEMP_CRITICAL and
current_time - entry['time'] <= 300
]
warning_temp_samples = [
entry for entry in self.state_history[state_key]
if entry['value'] >= self.TEMP_WARNING and
current_time - entry['time'] <= 300
]
# Require at least 3 samples over 5 minutes to trigger alert
if len(critical_temp_samples) >= 3:
status = 'CRITICAL'
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_CRITICAL}°C sustained >5min'
elif len(warning_temp_samples) >= 3:
status = 'WARNING'
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_WARNING}°C sustained >5min'
else:
status = 'OK'
reason = None
temp_result = {
'status': status,
'value': round(max_temp, 1),
'unit': '°C'
}
if reason:
temp_result['reason'] = reason
self.cached_results[cache_key] = temp_result
self.last_check_times[cache_key] = current_time
return temp_result
return None
except Exception:
return None
def _check_memory_comprehensive(self) -> Dict[str, Any]:
"""
Check memory including RAM and swap with realistic thresholds.
Only alerts on truly problematic memory situations.
"""
try:
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
current_time = time.time()
mem_percent = memory.percent
swap_percent = swap.percent if swap.total > 0 else 0
swap_vs_ram = (swap.used / memory.total * 100) if memory.total > 0 else 0
state_key = 'memory_usage'
self.state_history[state_key].append({
'mem_percent': mem_percent,
'swap_percent': swap_percent,
'swap_vs_ram': swap_vs_ram,
'time': current_time
})
self.state_history[state_key] = [
entry for entry in self.state_history[state_key]
if current_time - entry['time'] < 600
]
mem_critical = sum(
1 for entry in self.state_history[state_key]
if entry['mem_percent'] >= 90 and
current_time - entry['time'] <= self.MEMORY_DURATION
)
mem_warning = sum(
1 for entry in self.state_history[state_key]
if entry['mem_percent'] >= self.MEMORY_WARNING and
current_time - entry['time'] <= self.MEMORY_DURATION
)
swap_critical = sum(
1 for entry in self.state_history[state_key]
if entry['swap_vs_ram'] > 20 and
current_time - entry['time'] <= self.SWAP_CRITICAL_DURATION
)
if mem_critical >= 2:
status = 'CRITICAL'
reason = f'RAM >90% for {self.MEMORY_DURATION}s'
elif swap_critical >= 2:
status = 'CRITICAL'
reason = f'Swap >20% of RAM ({swap_vs_ram:.1f}%)'
elif mem_warning >= 2:
status = 'WARNING'
reason = f'RAM >{self.MEMORY_WARNING}% for {self.MEMORY_DURATION}s'
else:
status = 'OK'
reason = None
result = {
'status': status,
'ram_percent': round(mem_percent, 1),
'ram_available_gb': round(memory.available / (1024**3), 2),
'swap_percent': round(swap_percent, 1),
'swap_used_gb': round(swap.used / (1024**3), 2)
}
if reason:
result['reason'] = reason
return result
except Exception as e:
return {'status': 'UNKNOWN', 'reason': f'Memory check failed: {str(e)}'}
def _check_storage_optimized(self) -> Dict[str, Any]:
"""
Optimized storage check - monitors Proxmox storages from pvesm status.
Checks for inactive storages, disk health from SMART/events, and ZFS pool health.
"""
issues = []
storage_details = {}
# Check disk usage and mount status first for critical mounts
critical_mounts = ['/']
for mount_point in critical_mounts:
try:
result = subprocess.run(
['mountpoint', '-q', mount_point],
capture_output=True,
timeout=2
)
if result.returncode != 0:
issues.append(f'{mount_point}: Not mounted')
storage_details[mount_point] = {
'status': 'CRITICAL',
'reason': 'Not mounted'
}
continue
# Check if read-only
with open('/proc/mounts', 'r') as f:
for line in f:
parts = line.split()
if len(parts) >= 4 and parts[1] == mount_point:
options = parts[3].split(',')
if 'ro' in options:
issues.append(f'{mount_point}: Mounted read-only')
storage_details[mount_point] = {
'status': 'CRITICAL',
'reason': 'Mounted read-only'
}
break # Found it, no need to check further for this mountpoint
# Check filesystem usage only if not already flagged as critical
if mount_point not in storage_details or storage_details[mount_point].get('status') == 'OK':
fs_status = self._check_filesystem(mount_point)
if fs_status['status'] != 'OK':
issues.append(f"{mount_point}: {fs_status['reason']}")
storage_details[mount_point] = fs_status
except Exception:
pass # Silently skip if mountpoint check fails
# Check ZFS pool health status
zfs_pool_issues = self._check_zfs_pool_health()
if zfs_pool_issues:
for pool_name, pool_info in zfs_pool_issues.items():
issues.append(f'{pool_name}: {pool_info["reason"]}')
storage_details[pool_name] = pool_info
# Check disk health from Proxmox task log or system logs (SMART, etc.)
disk_health_issues = self._check_disk_health_from_events()
if disk_health_issues:
for disk, issue in disk_health_issues.items():
# Only add if not already covered by critical mountpoint issues
if disk not in storage_details or storage_details[disk].get('status') == 'OK':
issues.append(f'{disk}: {issue["reason"]}')
storage_details[disk] = issue
# Check LVM status
lvm_status = self._check_lvm()
if lvm_status.get('status') == 'WARNING':
# LVM volumes might be okay but indicate potential issues
issues.append(f"LVM check: {lvm_status.get('reason')}")
storage_details['lvm_check'] = lvm_status
if not issues:
return {'status': 'OK'}
# Determine overall status
has_critical = any(d.get('status') == 'CRITICAL' for d in storage_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': storage_details
}
def _check_filesystem(self, mount_point: str) -> Dict[str, Any]:
"""Check individual filesystem for space and mount status"""
try:
usage = psutil.disk_usage(mount_point)
percent = usage.percent
if percent >= self.STORAGE_CRITICAL:
status = 'CRITICAL'
reason = f'{percent:.1f}% full (≥{self.STORAGE_CRITICAL}%)'
elif percent >= self.STORAGE_WARNING:
status = 'WARNING'
reason = f'{percent:.1f}% full (≥{self.STORAGE_WARNING}%)'
else:
status = 'OK'
reason = None
result = {
'status': status,
'usage_percent': round(percent, 1)
}
if reason:
result['reason'] = reason
return result
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Check failed: {str(e)}'
}
def _check_lvm(self) -> Dict[str, Any]:
"""Check LVM volumes - improved detection"""
try:
# Check if lvs command is available
result_which = subprocess.run(
['which', 'lvs'],
capture_output=True,
text=True,
timeout=1
)
if result_which.returncode != 0:
return {'status': 'OK'} # LVM not installed
result = subprocess.run(
['lvs', '--noheadings', '--options', 'lv_name,vg_name,lv_attr'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode != 0:
return {'status': 'WARNING', 'reason': 'lvs command failed'}
volumes = []
for line in result.stdout.strip().split('\n'):
if line.strip():
parts = line.split()
if len(parts) >= 2:
lv_name = parts[0].strip()
vg_name = parts[1].strip()
# Check for 'a' attribute indicating active/available
if 'a' in parts[2]:
volumes.append(f'{vg_name}/{lv_name}')
# If LVM is configured but no active volumes are found, it might be an issue or just not used
if not volumes:
# Check if any VGs exist to determine if LVM is truly unconfigured or just inactive
vg_result = subprocess.run(
['vgs', '--noheadings', '--options', 'vg_name'],
capture_output=True,
text=True,
timeout=3
)
if vg_result.returncode == 0 and vg_result.stdout.strip():
return {'status': 'WARNING', 'reason': 'No active LVM volumes detected'}
else:
return {'status': 'OK'} # No VGs found, LVM not in use
return {'status': 'OK', 'volumes': len(volumes)}
except Exception:
return {'status': 'OK'}
# This function is no longer used in get_detailed_status, but kept for reference if needed.
# The new _check_proxmox_storage function handles this logic better.
def _check_proxmox_storages(self) -> Dict[str, Any]:
"""Check Proxmox-specific storages (only report problems)"""
storages = {}
try:
if os.path.exists('/etc/pve/storage.cfg'):
with open('/etc/pve/storage.cfg', 'r') as f:
current_storage = None
storage_type = None
for line in f:
line = line.strip()
if line.startswith('dir:') or line.startswith('nfs:') or \
line.startswith('cifs:') or line.startswith('pbs:') or \
line.startswith('rbd:') or line.startswith('cephfs:') or \
line.startswith('zfs:') or line.startswith('zfs-send:'):
parts = line.split(':', 1)
storage_type = parts[0]
current_storage = parts[1].strip()
elif line.startswith('path ') and current_storage:
path = line.split(None, 1)[1]
if storage_type == 'dir':
if not os.path.exists(path):
storages[f'storage_{current_storage}'] = {
'status': 'CRITICAL',
'reason': 'Directory does not exist',
'type': 'dir',
'path': path
}
current_storage = None
storage_type = None
except Exception:
pass
return storages
def _check_disks_optimized(self) -> Dict[str, Any]:
"""
Optimized disk check - always returns status.
Checks dmesg for I/O errors and SMART status.
NOTE: This function is now largely covered by _check_storage_optimized,
but kept for potential specific disk-level reporting if needed.
Currently, its primary function is to detect recent I/O errors.
"""
current_time = time.time()
disk_issues = {}
try:
# Check dmesg for I/O errors in the last 5 minutes
result = subprocess.run(
['dmesg', '-T', '--level=err,warn', '--since', '5 minutes ago'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
if any(keyword in line_lower for keyword in ['i/o error', 'ata error', 'scsi error', 'medium error']):
# Try to extract disk name
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+)', line)
if disk_match:
disk_name = disk_match.group(1)
self.io_error_history[disk_name].append(current_time)
# Clean old history (keep errors from the last 5 minutes)
for disk in list(self.io_error_history.keys()):
self.io_error_history[disk] = [
t for t in self.io_error_history[disk]
if current_time - t < 300
]
error_count = len(self.io_error_history[disk])
# Report based on recent error count
if error_count >= 3:
disk_issues[f'/dev/{disk}'] = {
'status': 'CRITICAL',
'reason': f'{error_count} I/O errors in 5 minutes'
}
elif error_count >= 1:
disk_issues[f'/dev/{disk}'] = {
'status': 'WARNING',
'reason': f'{error_count} I/O error(s) in 5 minutes'
}
if not disk_issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in disk_issues.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': f"{len(disk_issues)} disk(s) with recent errors",
'details': disk_issues
}
except Exception:
# If dmesg check fails, return OK as it's not a critical system failure
return {'status': 'OK'}
def _check_network_optimized(self) -> Dict[str, Any]:
"""
Optimized network check - always returns status.
Checks interface status and basic latency.
"""
try:
issues = []
interface_details = {}
net_if_stats = psutil.net_if_stats()
for interface, stats in net_if_stats.items():
if interface == 'lo':
continue
# Check if important interface is down
if not stats.isup:
# Consider common PVE bridge interfaces and physical NICs as important
if interface.startswith('vmbr') or interface.startswith('eth') or interface.startswith('ens') or interface.startswith('enp'):
issues.append(f'{interface} is DOWN')
interface_details[interface] = {
'status': 'CRITICAL',
'reason': 'Interface DOWN'
}
# Check connectivity (latency)
latency_status = self._check_network_latency()
if latency_status and latency_status.get('status') not in ['OK', 'INFO', 'UNKNOWN']:
issues.append(latency_status.get('reason', 'Network latency issue'))
interface_details['connectivity'] = latency_status
if not issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in interface_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:2]),
'details': interface_details
}
except Exception:
return {'status': 'OK'}
def _check_network_latency(self) -> Optional[Dict[str, Any]]:
"""Check network latency to 1.1.1.1 (cached)"""
cache_key = 'network_latency'
current_time = time.time()
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key)
try:
result = subprocess.run(
['ping', '-c', '1', '-W', '1', '1.1.1.1'],
capture_output=True,
text=True,
timeout=self.NETWORK_TIMEOUT
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if 'time=' in line:
try:
latency_str = line.split('time=')[1].split()[0]
latency = float(latency_str)
if latency > self.NETWORK_LATENCY_CRITICAL:
status = 'CRITICAL'
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_CRITICAL}ms'
elif latency > self.NETWORK_LATENCY_WARNING:
status = 'WARNING'
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_WARNING}ms'
else:
status = 'OK'
reason = None
latency_result = {
'status': status,
'latency_ms': round(latency, 1)
}
if reason:
latency_result['reason'] = reason
self.cached_results[cache_key] = latency_result
self.last_check_times[cache_key] = current_time
return latency_result
except:
pass
# If ping failed (timeout, unreachable)
packet_loss_result = {
'status': 'CRITICAL',
'reason': 'Packet loss or timeout to 1.1.1.1'
}
self.cached_results[cache_key] = packet_loss_result
self.last_check_times[cache_key] = current_time
return packet_loss_result
except Exception:
return {'status': 'UNKNOWN', 'reason': 'Ping command failed'}
def _check_vms_cts_optimized(self) -> Dict[str, Any]:
"""
Optimized VM/CT check - detects qmp failures and startup errors from logs.
Improved detection of container and VM errors from journalctl.
"""
try:
issues = []
vm_details = {}
result = subprocess.run(
['journalctl', '--since', '10 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower)
if vm_qmp_match:
vmid = vm_qmp_match.group(1)
key = f'vm_{vmid}'
if key not in vm_details:
issues.append(f'VM {vmid}: Communication issue')
vm_details[key] = {
'status': 'WARNING',
'reason': 'QMP command timeout',
'id': vmid,
'type': 'VM'
}
continue
ct_error_match = re.search(r'(?:ct|container|lxc)\s+(\d+)', line_lower)
if ct_error_match and ('error' in line_lower or 'fail' in line_lower or 'device' in line_lower):
ctid = ct_error_match.group(1)
key = f'ct_{ctid}'
if key not in vm_details:
if 'device' in line_lower and 'does not exist' in line_lower:
device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower)
if device_match:
reason = f'Device {device_match.group(1)} missing'
else:
reason = 'Device error'
elif 'failed to start' in line_lower:
reason = 'Failed to start'
else:
reason = 'Container error'
issues.append(f'CT {ctid}: {reason}')
vm_details[key] = {
'status': 'WARNING' if 'device' in reason.lower() else 'CRITICAL',
'reason': reason,
'id': ctid,
'type': 'CT'
}
continue
vzstart_match = re.search(r'vzstart:(\d+):', line)
if vzstart_match and ('error' in line_lower or 'fail' in line_lower or 'does not exist' in line_lower):
ctid = vzstart_match.group(1)
key = f'ct_{ctid}'
if key not in vm_details:
# Extraer mensaje de error
if 'device' in line_lower and 'does not exist' in line_lower:
device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower)
if device_match:
reason = f'Device {device_match.group(1)} missing'
else:
reason = 'Device error'
else:
reason = 'Startup error'
issues.append(f'CT {ctid}: {reason}')
vm_details[key] = {
'status': 'WARNING',
'reason': reason,
'id': ctid,
'type': 'CT'
}
continue
if any(keyword in line_lower for keyword in ['failed to start', 'cannot start', 'activation failed', 'start error']):
id_match = re.search(r'\b(\d{3,4})\b', line)
if id_match:
vmid = id_match.group(1)
key = f'vmct_{vmid}'
if key not in vm_details:
issues.append(f'VM/CT {vmid}: Failed to start')
vm_details[key] = {
'status': 'CRITICAL',
'reason': 'Failed to start',
'id': vmid,
'type': 'VM/CT'
}
if not issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': vm_details
}
except Exception:
return {'status': 'OK'}
# Modified to use persistence
def _check_vms_cts_with_persistence(self) -> Dict[str, Any]:
"""
Check VMs/CTs with persistent error tracking.
Errors persist until VM starts or 48h elapsed.
"""
try:
issues = []
vm_details = {}
# Get persistent errors first
persistent_errors = health_persistence.get_active_errors('vms')
# Check if any persistent VMs/CTs have started
for error in persistent_errors:
error_key = error['error_key']
if error_key.startswith('vm_') or error_key.startswith('ct_'):
vm_id = error_key.split('_')[1]
# Check if VM is running using persistence helper
if health_persistence.check_vm_running(vm_id):
continue # Error auto-resolved if VM is now running
# Still active, add to details
vm_details[error_key] = {
'status': error['severity'],
'reason': error['reason'],
'id': error.get('details', {}).get('id', 'unknown'),
'type': error.get('details', {}).get('type', 'VM/CT'),
'first_seen': error['first_seen']
}
issues.append(f"{error.get('details', {}).get('type', 'VM')} {error.get('details', {}).get('id', '')}: {error['reason']}")
# Check for new errors in logs
# Using 'warning' priority to catch potential startup issues
result = subprocess.run(
['journalctl', '--since', '10 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
# VM QMP errors
vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower)
if vm_qmp_match:
vmid = vm_qmp_match.group(1)
error_key = f'vm_{vmid}'
if error_key not in vm_details:
# Record persistent error
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='WARNING',
reason='QMP command timeout',
details={'id': vmid, 'type': 'VM'}
)
issues.append(f'VM {vmid}: Communication issue')
vm_details[error_key] = {
'status': 'WARNING',
'reason': 'QMP command timeout',
'id': vmid,
'type': 'VM'
}
continue
# Container errors (including startup issues via vzstart)
vzstart_match = re.search(r'vzstart:(\d+):', line)
if vzstart_match and ('error' in line_lower or 'fail' in line_lower or 'does not exist' in line_lower):
ctid = vzstart_match.group(1)
error_key = f'ct_{ctid}'
if error_key not in vm_details:
if 'device' in line_lower and 'does not exist' in line_lower:
device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower)
if device_match:
reason = f'Device {device_match.group(1)} missing'
else:
reason = 'Device error'
else:
reason = 'Startup error'
# Record persistent error
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='WARNING',
reason=reason,
details={'id': ctid, 'type': 'CT'}
)
issues.append(f'CT {ctid}: {reason}')
vm_details[error_key] = {
'status': 'WARNING',
'reason': reason,
'id': ctid,
'type': 'CT'
}
# Generic failed to start for VMs and CTs
if any(keyword in line_lower for keyword in ['failed to start', 'cannot start', 'activation failed', 'start error']):
id_match = re.search(r'\b(\d{3,5})\b', line) # Increased digit count for wider match
if id_match:
vmid_ctid = id_match.group(1)
# Determine if it's a VM or CT based on context, if possible
if 'vm' in line_lower or 'qemu' in line_lower:
error_key = f'vm_{vmid_ctid}'
vm_type = 'VM'
elif 'ct' in line_lower or 'lxc' in line_lower:
error_key = f'ct_{vmid_ctid}'
vm_type = 'CT'
else:
# Fallback if type is unclear
error_key = f'vmct_{vmid_ctid}'
vm_type = 'VM/CT'
if error_key not in vm_details:
reason = 'Failed to start'
# Record persistent error
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='CRITICAL',
reason=reason,
details={'id': vmid_ctid, 'type': vm_type}
)
issues.append(f'{vm_type} {vmid_ctid}: {reason}')
vm_details[error_key] = {
'status': 'CRITICAL',
'reason': reason,
'id': vmid_ctid,
'type': vm_type
}
if not issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': '; '.join(issues[:3]),
'details': vm_details
}
except Exception:
return {'status': 'OK'}
def _check_pve_services(self) -> Dict[str, Any]:
"""Check critical Proxmox services"""
try:
failed_services = []
for service in self.PVE_SERVICES:
try:
result = subprocess.run(
['systemctl', 'is-active', service],
capture_output=True,
text=True,
timeout=2
)
if result.returncode != 0 or result.stdout.strip() != 'active':
failed_services.append(service)
except Exception:
# If systemctl fails (e.g., command not found or service doesn't exist), treat as failed
failed_services.append(service)
if failed_services:
return {
'status': 'CRITICAL',
'reason': f'Services inactive: {", ".join(failed_services)}',
'failed': failed_services
}
return {'status': 'OK'}
except Exception as e:
# If the entire systemctl check fails
return {
'status': 'WARNING',
'reason': f'Service check command failed: {str(e)}'
}
def _is_benign_error(self, line: str) -> bool:
"""Check if log line matches benign error patterns"""
line_lower = line.lower()
for pattern in self.BENIGN_ERROR_PATTERNS:
if re.search(pattern, line_lower):
return True
return False
def _classify_log_severity(self, line: str) -> Optional[str]:
"""
Classify log line severity intelligently.
Returns: 'CRITICAL', 'WARNING', or None (benign/info)
"""
line_lower = line.lower()
# Check if benign first
if self._is_benign_error(line):
return None
# Check critical keywords
for keyword in self.CRITICAL_LOG_KEYWORDS:
if re.search(keyword, line_lower):
return 'CRITICAL'
# Check warning keywords
for keyword in self.WARNING_LOG_KEYWORDS:
if re.search(keyword, line_lower):
return 'WARNING'
# Generic error/warning classification based on common terms
if 'critical' in line_lower or 'fatal' in line_lower or 'panic' in line_lower:
return 'CRITICAL'
elif 'error' in line_lower or 'fail' in line_lower:
return 'WARNING'
elif 'warning' in line_lower or 'warn' in line_lower:
return None # Generic warnings are often informational and not critical
return None
def _check_logs_with_persistence(self) -> Dict[str, Any]:
"""
Intelligent log checking with cascade detection and persistence.
Focuses on detecting significant error patterns rather than transient warnings.
"""
cache_key = 'logs_analysis'
current_time = time.time()
# Cache the result for 5 minutes to avoid excessive journalctl calls
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < self.LOG_CHECK_INTERVAL:
# Check persistent log errors recorded by health_persistence
persistent_errors = health_persistence.get_active_errors('logs')
if persistent_errors:
return {
'status': 'WARNING', # Or CRITICAL depending on severity of persistent errors
'reason': f'{len(persistent_errors)} persistent log issues detected'
}
return self.cached_results.get(cache_key, {'status': 'OK'})
try:
# Fetch logs from the last 3 minutes for immediate issue detection
result_recent = subprocess.run(
['journalctl', '--since', '3 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
# Fetch logs from the previous 3-minute interval to detect spikes/cascades
result_previous = subprocess.run(
['journalctl', '--since', '6 minutes ago', '--until', '3 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result_recent.returncode == 0:
recent_lines = result_recent.stdout.strip().split('\n')
previous_lines = result_previous.stdout.strip().split('\n') if result_previous.returncode == 0 else []
recent_patterns = defaultdict(int)
previous_patterns = defaultdict(int)
critical_errors_found = {} # To store unique critical error lines for persistence
for line in recent_lines:
if not line.strip():
continue
# Skip benign errors
if self._is_benign_error(line):
continue
# Classify severity
severity = self._classify_log_severity(line)
if severity is None: # Skip informational or classified benign lines
continue
# Normalize to a pattern for grouping
pattern = self._normalize_log_pattern(line)
if severity == 'CRITICAL':
# If this critical pattern is new or we haven't logged it recently
error_key = f'log_critical_{abs(hash(pattern)) % 10000}'
if pattern not in critical_errors_found:
critical_errors_found[pattern] = line
# Record persistent error if it's not already active and within recent persistence
if not health_persistence.is_error_active(error_key, category='logs'):
health_persistence.record_error(
error_key=error_key,
category='logs',
severity='CRITICAL',
reason=line[:100], # Truncate reason for brevity
details={'pattern': pattern}
)
recent_patterns[pattern] += 1
for line in previous_lines:
if not line.strip() or self._is_benign_error(line):
continue
severity = self._classify_log_severity(line)
if severity is None:
continue
pattern = self._normalize_log_pattern(line)
previous_patterns[pattern] += 1
# Detect cascades: ≥10 errors of same type in 3 min
cascading_errors = {
pattern: count for pattern, count in recent_patterns.items()
if count >= 10 and self._classify_log_severity(pattern) in ['WARNING', 'CRITICAL']
}
# Detect spikes: ≥3 errors now AND ≥3x increase from previous period
spike_errors = {}
for pattern, recent_count in recent_patterns.items():
prev_count = previous_patterns.get(pattern, 0)
if recent_count >= 3 and recent_count >= prev_count * 3:
spike_errors[pattern] = recent_count
unique_critical_count = len(critical_errors_found)
cascade_count = len(cascading_errors)
spike_count = len(spike_errors)
if unique_critical_count > 0:
status = 'CRITICAL'
# Get a representative critical error reason
representative_error = next(iter(critical_errors_found.values()))
reason = f'Critical error detected: {representative_error[:100]}'
elif cascade_count > 0:
status = 'WARNING'
reason = f'Error cascade detected: {cascade_count} pattern(s) repeating ≥10 times in 3min'
elif spike_count > 0:
status = 'WARNING'
reason = f'Error spike detected: {spike_count} pattern(s) increased 3x'
else:
# No significant issues found
status = 'OK'
reason = None
log_result = {'status': status}
if reason:
log_result['reason'] = reason
self.cached_results[cache_key] = log_result
self.last_check_times[cache_key] = current_time
return log_result
# If journalctl command failed or returned no data
ok_result = {'status': 'OK'}
self.cached_results[cache_key] = ok_result
self.last_check_times[cache_key] = current_time
return ok_result
except Exception as e:
# Log the exception but return OK to avoid alert storms on check failure
print(f"[HealthMonitor] Error checking logs: {e}")
return {'status': 'OK'}
def _normalize_log_pattern(self, line: str) -> str:
"""
Normalize log line to a pattern for grouping similar errors.
Removes timestamps, PIDs, IDs, paths, and other variables.
"""
# Remove standard syslog timestamp and process info if present
pattern = re.sub(r'^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s+\S+(\s+\[\d+\])?:\s+', '', line)
pattern = re.sub(r'\d{4}-\d{2}-\d{2}', '', pattern) # Remove dates
pattern = re.sub(r'\d{2}:\d{2}:\d{2}', '', pattern) # Remove times
pattern = re.sub(r'pid[:\s]+\d+', 'pid:XXX', pattern.lower()) # Normalize PIDs
pattern = re.sub(r'\b\d{3,6}\b', 'ID', pattern) # Normalize IDs (common for container/VM IDs)
pattern = re.sub(r'/dev/\S+', '/dev/XXX', pattern) # Normalize device paths
pattern = re.sub(r'/\S+/\S+', '/PATH/', pattern) # Normalize general paths
pattern = re.sub(r'0x[0-9a-f]+', '0xXXX', pattern) # Normalize hex values
pattern = re.sub(r'\b(uuid|guid|hash)[:=]\s*[\w-]+\b', r'\1=XXX', pattern.lower()) # Normalize UUIDs/GUIDs
pattern = re.sub(r'\s+', ' ', pattern).strip() # Normalize whitespace
return pattern[:150] # Keep first 150 characters to avoid overly long patterns
def _check_updates(self) -> Optional[Dict[str, Any]]:
"""
Check for pending system updates.
- WARNING: If security updates are available.
- CRITICAL: If system not updated in >2 years.
- INFO: If 1-2 years without updates, or many non-security updates.
"""
cache_key = 'updates_check'
current_time = time.time()
# Cache for 10 minutes
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 600:
return self.cached_results.get(cache_key)
try:
apt_history_path = '/var/log/apt/history.log'
last_update_days = None
if os.path.exists(apt_history_path):
try:
mtime = os.path.getmtime(apt_history_path)
days_since_update = (current_time - mtime) / 86400
last_update_days = int(days_since_update)
except Exception:
pass # Ignore if mtime fails
# Perform a dry run of apt-get upgrade to see pending packages
result = subprocess.run(
['apt-get', 'upgrade', '--dry-run'],
capture_output=True,
text=True,
timeout=5 # Increased timeout for safety
)
status = 'OK'
reason = None
update_count = 0
security_updates_packages = []
kernel_pve_updates_packages = []
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
for line in lines:
# 'Inst' indicates a package will be installed/upgraded
if line.startswith('Inst '):
update_count += 1
line_lower = line.lower()
package_name = line.split()[1].split(':')[0] # Get package name, strip arch if present
# Check for security updates (common pattern in repo names)
if 'security' in line_lower or 'debian-security' in line_lower:
security_updates_packages.append(package_name)
# Check for kernel or critical PVE updates
if any(pkg in line_lower for pkg in ['linux-image', 'pve-kernel', 'pve-manager', 'proxmox-ve', 'qemu-server', 'pve-api-core']):
kernel_pve_updates_packages.append(package_name)
# Determine overall status based on findings
if security_updates_packages:
status = 'WARNING'
reason = f'{len(security_updates_packages)} security update(s) available'
# Record persistent error for security updates to ensure it's visible
health_persistence.record_error(
error_key='updates_security',
category='updates',
severity='WARNING',
reason=reason,
details={'count': len(security_updates_packages), 'packages': security_updates_packages[:5]}
)
elif last_update_days and last_update_days >= 730:
# 2+ years without updates - CRITICAL
status = 'CRITICAL'
reason = f'System not updated in {last_update_days} days (>2 years)'
health_persistence.record_error(
error_key='updates_730days',
category='updates',
severity='CRITICAL',
reason=reason,
details={'days': last_update_days, 'update_count': update_count}
)
elif last_update_days and last_update_days >= 365:
# 1+ year without updates - WARNING
status = 'WARNING'
reason = f'System not updated in {last_update_days} days (>1 year)'
health_persistence.record_error(
error_key='updates_365days',
category='updates',
severity='WARNING',
reason=reason,
details={'days': last_update_days, 'update_count': update_count}
)
elif kernel_pve_updates_packages:
# Informational: Kernel or critical PVE components need update
status = 'INFO'
reason = f'{len(kernel_pve_updates_packages)} kernel/PVE update(s) available'
elif update_count > 50:
# Informational: Large number of pending updates
status = 'INFO'
reason = f'{update_count} updates pending (consider maintenance window)'
# If apt-get upgrade --dry-run failed
elif result.returncode != 0:
status = 'WARNING'
reason = 'Failed to check for updates (apt-get error)'
# Construct result dictionary
update_result = {
'status': status,
'count': update_count
}
if reason:
update_result['reason'] = reason
if last_update_days is not None: # Only add if we could determine days_since_update
update_result['days_since_update'] = last_update_days
self.cached_results[cache_key] = update_result
self.last_check_times[cache_key] = current_time
return update_result
except Exception as e:
print(f"[HealthMonitor] Error checking updates: {e}")
# Return OK on exception to avoid false alerts
return {'status': 'OK', 'count': 0}
def _check_security(self) -> Dict[str, Any]:
"""
Check security-related items:
- Uptime > 1 year (indicates potential kernel vulnerability if not updated)
- SSL certificate expiration (non-INFO certs)
- Excessive failed login attempts
"""
try:
issues = []
# Check uptime for potential kernel vulnerabilities (if not updated)
try:
uptime_seconds = time.time() - psutil.boot_time()
uptime_days = uptime_seconds / 86400
# If uptime is over a year and no recent updates, it's a warning
if uptime_days > 365:
# Check if updates check shows recent activity
updates_data = self.cached_results.get('updates_check')
if updates_data and updates_data.get('days_since_update', 9999) > 365:
issues.append(f'Uptime {int(uptime_days)} days (>1 year, consider updating kernel/system)')
except Exception:
pass # Ignore if uptime calculation fails
# Check SSL certificates (only report non-OK statuses)
cert_status = self._check_certificates()
if cert_status and cert_status.get('status') not in ['OK', 'INFO']:
issues.append(cert_status.get('reason', 'Certificate issue'))
# Check for excessive failed login attempts in the last 24 hours
try:
result = subprocess.run(
['journalctl', '--since', '24 hours ago', '--no-pager'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
failed_logins = 0
for line in result.stdout.split('\n'):
# Common patterns for failed logins in journald
if 'authentication failure' in line.lower() or 'failed password' in line.lower() or 'invalid user' in line.lower():
failed_logins += 1
if failed_logins > 50: # Threshold for significant failed attempts
issues.append(f'{failed_logins} failed login attempts in 24h')
except Exception:
pass # Ignore if journalctl fails
if issues:
return {
'status': 'WARNING', # Security issues are typically warnings
'reason': '; '.join(issues[:2]) # Show up to 2 issues
}
return {'status': 'OK'}
except Exception as e:
print(f"[HealthMonitor] Error checking security: {e}")
return {'status': 'OK'}
def _check_certificates(self) -> Optional[Dict[str, Any]]:
"""
Check SSL certificate expiration for PVE's default certificate.
INFO: Self-signed or no cert configured (normal for internal servers)
WARNING: Expires <30 days
CRITICAL: Expired
"""
cache_key = 'certificates'
current_time = time.time()
# Cache for 1 day (86400 seconds)
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 86400:
return self.cached_results.get(cache_key)
try:
cert_path = '/etc/pve/local/pve-ssl.pem'
if not os.path.exists(cert_path):
cert_result = {
'status': 'INFO',
'reason': 'Self-signed or default PVE certificate'
}
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
# Use openssl to get the expiry date
result = subprocess.run(
['openssl', 'x509', '-enddate', '-noout', '-in', cert_path],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
date_str = result.stdout.strip().replace('notAfter=', '')
try:
# Parse the date string (format can vary, e.g., 'Jun 15 10:00:00 2024 GMT')
# Attempt common formats
exp_date = None
try:
# Try more detailed format first
exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
except ValueError:
# Fallback to simpler format if needed
try:
exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y')
except ValueError:
# Fallback for "notAfter=..." string itself being the issue
if 'notAfter=' in date_str: # If it's the raw string itself
pass # Will result in 'INFO' status
if exp_date:
days_until_expiry = (exp_date - datetime.now()).days
if days_until_expiry < 0:
status = 'CRITICAL'
reason = 'Certificate expired'
elif days_until_expiry < 30:
status = 'WARNING'
reason = f'Certificate expires in {days_until_expiry} days'
else:
status = 'OK'
reason = None
cert_result = {'status': status}
if reason:
cert_result['reason'] = reason
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
except Exception as e:
print(f"[HealthMonitor] Error parsing certificate expiry date '{date_str}': {e}")
# Fall through to return INFO if parsing fails
# If openssl command failed or date parsing failed
return {'status': 'INFO', 'reason': 'Certificate check inconclusive'}
except Exception as e:
print(f"[HealthMonitor] Error checking certificates: {e}")
return {'status': 'OK'} # Return OK on exception
def _check_disk_health_from_events(self) -> Dict[str, Any]:
"""
Check for disk health warnings/errors from system logs (journalctl).
Looks for SMART warnings and specific disk errors.
Returns dict of disk issues found.
"""
disk_issues = {}
try:
# Check journalctl for warnings/errors related to disks in the last hour
result = subprocess.run(
['journalctl', '--since', '1 hour ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
# Check for SMART warnings/errors
if 'smart' in line_lower and ('warning' in line_lower or 'error' in line_lower or 'fail' in line_lower):
# Extract disk name using regex for common disk identifiers
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+|hd\d+)', line)
if disk_match:
disk_name = disk_match.group(1)
# Prioritize CRITICAL if already warned, otherwise set to WARNING
if disk_name not in disk_issues or disk_issues[f'/dev/{disk_name}']['status'] != 'CRITICAL':
disk_issues[f'/dev/{disk_name}'] = {
'status': 'WARNING',
'reason': 'SMART warning detected'
}
# Check for specific disk I/O or medium errors
if any(keyword in line_lower for keyword in ['disk error', 'ata error', 'medium error', 'io error']):
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+|hd\d+)', line)
if disk_match:
disk_name = disk_match.group(1)
disk_issues[f'/dev/{disk_name}'] = {
'status': 'CRITICAL',
'reason': 'Disk error detected'
}
except Exception as e:
print(f"[HealthMonitor] Error checking disk health from events: {e}")
# Return empty dict on error, as this check isn't system-critical itself
pass
return disk_issues
def _check_zfs_pool_health(self) -> Dict[str, Any]:
"""
Check ZFS pool health status using 'zpool status' command.
Returns dict of pools with non-ONLINE status (DEGRADED, FAULTED, UNAVAIL, etc.).
"""
zfs_issues = {}
try:
# First check if 'zpool' command exists to avoid errors on non-ZFS systems
result_which = subprocess.run(
['which', 'zpool'],
capture_output=True,
text=True,
timeout=1
)
if result_which.returncode != 0:
# ZFS is not installed or 'zpool' command not in PATH, so no ZFS issues to report.
return zfs_issues
# Get list of all pools and their health status
result = subprocess.run(
['zpool', 'list', '-H', '-o', 'name,health'], # -H for no header
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
for line in lines:
if not line.strip():
continue
parts = line.split()
if len(parts) >= 2:
pool_name = parts[0]
pool_health = parts[1].upper() # Ensure uppercase for consistent comparison
# 'ONLINE' is the healthy state. Any other status indicates a problem.
if pool_health != 'ONLINE':
if pool_health in ['DEGRADED', 'FAULTED', 'UNAVAIL', 'REMOVED']:
# These are critical states
status = 'CRITICAL'
reason = f'ZFS pool {pool_health.lower()}'
else:
# Any other non-ONLINE state is at least a warning
status = 'WARNING'
reason = f'ZFS pool status: {pool_health.lower()}'
# Use a unique key for each pool issue
zfs_issues[f'zpool_{pool_name}'] = {
'status': status,
'reason': reason,
'pool_name': pool_name,
'health': pool_health
}
except Exception as e:
print(f"[HealthMonitor] Error checking ZFS pool health: {e}")
# If 'zpool status' command itself fails, we can't report ZFS issues.
# Return empty dict as no specific ZFS issues were detected by this check.
pass
return zfs_issues
def _check_proxmox_storage(self) -> Optional[Dict[str, Any]]:
"""
Check Proxmox storage status using the proxmox_storage_monitor module.
Detects unavailable storages configured in PVE.
Returns CRITICAL if any configured storage is unavailable.
Returns None if the module is not available.
"""
if not PROXMOX_STORAGE_AVAILABLE:
return None
try:
# Reload configuration to ensure we have the latest storage definitions
proxmox_storage_monitor.reload_configuration()
# Get the current status of all configured storages
storage_status = proxmox_storage_monitor.get_storage_status()
unavailable_storages = storage_status.get('unavailable', [])
if not unavailable_storages:
# All storages are available. We should also clear any previously recorded storage errors.
active_errors = health_persistence.get_active_errors()
for error in active_errors:
# Target errors related to storage unavailability
if error.get('category') == 'storage' and error.get('error_key', '').startswith('storage_unavailable_'):
health_persistence.clear_error(error['error_key'])
return {'status': 'OK'}
storage_details = {}
for storage in unavailable_storages:
storage_name = storage['name']
error_key = f'storage_unavailable_{storage_name}'
status_detail = storage.get('status_detail', 'unavailable')
# Formulate a descriptive reason for the issue
if status_detail == 'not_found':
reason = f"Storage '{storage_name}' is configured but not found on the server."
elif status_detail == 'unavailable':
reason = f"Storage '{storage_name}' is not available (connection error or backend issue)."
else:
reason = f"Storage '{storage_name}' has status: {status_detail}."
# Record a persistent CRITICAL error for each unavailable storage
health_persistence.record_error(
error_key=error_key,
category='storage',
severity='CRITICAL',
reason=reason,
details={
'storage_name': storage_name,
'storage_type': storage.get('type', 'unknown'),
'status_detail': status_detail,
'dismissable': False
}
)
# Add to details dict with dismissable false for frontend
storage_details[storage_name] = {
'reason': reason,
'type': storage.get('type', 'unknown'),
'status': status_detail,
'dismissable': False
}
return {
'status': 'CRITICAL',
'reason': f'{len(unavailable_storages)} Proxmox storage(s) unavailable',
'details': storage_details
}
except Exception as e:
print(f"[HealthMonitor] Error checking Proxmox storage: {e}")
# Return None on exception to indicate the check could not be performed, not necessarily a failure.
return None
def get_health_status(self) -> Dict[str, Any]:
"""
Main function to get the comprehensive health status.
This function orchestrates all individual checks and aggregates results.
"""
# Trigger all checks, including those with caching
detailed_status = self.get_detailed_status()
overall_status = self.get_overall_status()
system_info = self.get_system_info()
return {
'system_info': system_info,
'overall_health': overall_status,
'detailed_health': detailed_status,
'timestamp': datetime.now().isoformat()
}
# This is a duplicate of the get_detailed_status method at the top of the file.
# It's likely an oversight from copy-pasting. One of them should be removed or renamed.
# Keeping both for now to match the provided structure, but in a refactor, this would be cleaned up.
def get_detailed_status(self) -> Dict[str, Any]:
"""
Get comprehensive health status with all checks.
Returns JSON structure with ALL 10 categories always present.
Now includes persistent error tracking.
"""
active_errors = health_persistence.get_active_errors()
# No need to create persistent_issues dict here, it's implicitly handled by the checks
details = {
'cpu': {'status': 'OK'},
'memory': {'status': 'OK'},
'storage': {'status': 'OK'}, # This will be overwritten by specific storage checks
'disks': {'status': 'OK'}, # This will be overwritten by disk/filesystem checks
'network': {'status': 'OK'},
'vms': {'status': 'OK'},
'services': {'status': 'OK'},
'logs': {'status': 'OK'},
'updates': {'status': 'OK'},
'security': {'status': 'OK'}
}
critical_issues = []
warning_issues = []
info_issues = [] # Added info_issues to track INFO separately
# --- Priority Order of Checks ---
# Priority 1: Critical PVE Services
services_status = self._check_pve_services()
details['services'] = services_status
if services_status['status'] == 'CRITICAL':
critical_issues.append(f"PVE Services: {services_status.get('reason', 'Service failure')}")
elif services_status['status'] == 'WARNING':
warning_issues.append(f"PVE Services: {services_status.get('reason', 'Service issue')}")
# Priority 1.5: Proxmox Storage Check (External Module)
proxmox_storage_result = self._check_proxmox_storage()
if proxmox_storage_result: # Only process if the check ran (module available)
details['storage'] = proxmox_storage_result
if proxmox_storage_result.get('status') == 'CRITICAL':
critical_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage unavailable'))
elif proxmox_storage_result.get('status') == 'WARNING':
warning_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage issue'))
# Priority 2: Disk/Filesystem Health (Internal checks: usage, ZFS, SMART, IO errors)
storage_status = self._check_storage_optimized()
details['disks'] = storage_status # Use 'disks' for filesystem/disk specific issues
if storage_status.get('status') == 'CRITICAL':
critical_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage failure')}")
elif storage_status.get('status') == 'WARNING':
warning_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage issue')}")
# Priority 3: VMs/CTs Status (with persistence)
vms_status = self._check_vms_cts_with_persistence()
details['vms'] = vms_status
if vms_status.get('status') == 'CRITICAL':
critical_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT failure')}")
elif vms_status.get('status') == 'WARNING':
warning_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT issue')}")
# Priority 4: Network Connectivity
network_status = self._check_network_optimized()
details['network'] = network_status
if network_status.get('status') == 'CRITICAL':
critical_issues.append(f"Network: {network_status.get('reason', 'Network failure')}")
elif network_status.get('status') == 'WARNING':
warning_issues.append(f"Network: {network_status.get('reason', 'Network issue')}")
# Priority 5: CPU Usage (with hysteresis)
cpu_status = self._check_cpu_with_hysteresis()
details['cpu'] = cpu_status
if cpu_status.get('status') == 'CRITICAL':
critical_issues.append(f"CPU: {cpu_status.get('reason', 'CPU critical')}")
elif cpu_status.get('status') == 'WARNING':
warning_issues.append(f"CPU: {cpu_status.get('reason', 'CPU high')}")
# Priority 6: Memory Usage (RAM and Swap)
memory_status = self._check_memory_comprehensive()
details['memory'] = memory_status
if memory_status.get('status') == 'CRITICAL':
critical_issues.append(f"Memory: {memory_status.get('reason', 'Memory critical')}")
elif memory_status.get('status') == 'WARNING':
warning_issues.append(f"Memory: {memory_status.get('reason', 'Memory high')}")
# Priority 7: Log Analysis (with persistence)
logs_status = self._check_logs_with_persistence()
details['logs'] = logs_status
if logs_status.get('status') == 'CRITICAL':
critical_issues.append(f"Logs: {logs_status.get('reason', 'Critical log errors')}")
elif logs_status.get('status') == 'WARNING':
warning_issues.append(f"Logs: {logs_status.get('reason', 'Log warnings')}")
# Priority 8: System Updates
updates_status = self._check_updates()
details['updates'] = updates_status
if updates_status.get('status') == 'CRITICAL':
critical_issues.append(f"Updates: {updates_status.get('reason', 'System not updated')}")
elif updates_status.get('status') == 'WARNING':
warning_issues.append(f"Updates: {updates_status.get('reason', 'Updates pending')}")
elif updates_status.get('status') == 'INFO':
info_issues.append(f"Updates: {updates_status.get('reason', 'Informational update notice')}")
# Priority 9: Security Checks
security_status = self._check_security()
details['security'] = security_status
if security_status.get('status') == 'WARNING':
warning_issues.append(f"Security: {security_status.get('reason', 'Security issue')}")
elif security_status.get('status') == 'INFO':
info_issues.append(f"Security: {security_status.get('reason', 'Security information')}")
# --- Determine Overall Status ---
# Use a fixed order of severity: CRITICAL > WARNING > INFO > OK
if critical_issues:
overall = 'CRITICAL'
summary = '; '.join(critical_issues[:3]) # Limit summary to 3 issues
elif warning_issues:
overall = 'WARNING'
summary = '; '.join(warning_issues[:3])
elif info_issues:
overall = 'OK' # INFO statuses don't degrade overall health
summary = '; '.join(info_issues[:3])
else:
overall = 'OK'
summary = 'All systems operational'
return {
'overall': overall,
'summary': summary,
'details': details,
'timestamp': datetime.now().isoformat()
}
# Global instance
health_monitor = HealthMonitor()