Files
ProxMenux/AppImage/scripts/health_monitor.py
2025-11-05 18:30:31 +01:00

1177 lines
46 KiB
Python

"""
ProxMenux Health Monitor Module
Provides comprehensive, lightweight health checks for Proxmox systems.
Optimized for minimal system impact with intelligent thresholds and hysteresis.
Author: MacRimi
Version: 1.0 (Light Health Logic)
"""
import psutil
import subprocess
import json
import time
import os
from typing import Dict, List, Any, Tuple
from datetime import datetime, timedelta
from collections import defaultdict
class HealthMonitor:
"""
Monitors system health across multiple components with minimal impact.
Implements hysteresis, intelligent caching, and progressive escalation.
"""
# CPU Thresholds
CPU_WARNING = 85
CPU_CRITICAL = 95
CPU_RECOVERY = 75
CPU_WARNING_DURATION = 60 # seconds
CPU_CRITICAL_DURATION = 120 # seconds
CPU_RECOVERY_DURATION = 120 # seconds
# Memory Thresholds
MEMORY_WARNING = 85
MEMORY_CRITICAL = 95
MEMORY_DURATION = 60 # seconds
SWAP_WARNING_DURATION = 300 # 5 minutes
SWAP_CRITICAL_PERCENT = 5 # 5% of RAM
SWAP_CRITICAL_DURATION = 120 # 2 minutes
# Storage Thresholds
STORAGE_WARNING = 85
STORAGE_CRITICAL = 95
# Temperature Thresholds
TEMP_WARNING = 80
TEMP_CRITICAL = 90
# Network Thresholds
NETWORK_LATENCY_WARNING = 100 # ms
NETWORK_LATENCY_CRITICAL = 300 # ms
NETWORK_TIMEOUT = 0.9 # seconds
NETWORK_INACTIVE_DURATION = 600 # 10 minutes
# Log Thresholds
LOG_ERRORS_WARNING = 5
LOG_ERRORS_CRITICAL = 6
LOG_WARNINGS_WARNING = 10
LOG_WARNINGS_CRITICAL = 30
LOG_CHECK_INTERVAL = 300 # 5 minutes
# Critical keywords for immediate escalation
CRITICAL_LOG_KEYWORDS = [
'I/O error', 'EXT4-fs error', 'XFS', 'LVM activation failed',
'md/raid: device failed', 'Out of memory', 'kernel panic',
'filesystem read-only', 'cannot mount'
]
# PVE Critical Services
PVE_SERVICES = ['pveproxy', 'pvedaemon', 'pvestatd', 'pve-cluster']
def __init__(self):
"""Initialize health monitor with state tracking"""
self.state_history = defaultdict(list) # For hysteresis
self.last_check_times = {} # Cache check times
self.cached_results = {} # Cache results
self.network_baseline = {} # Network traffic baseline
self.io_error_history = defaultdict(list) # I/O error tracking
def get_overall_status(self) -> Dict[str, Any]:
"""Get overall health status summary with minimal overhead"""
details = self.get_detailed_status()
overall_status = details.get('overall', 'OK')
summary = details.get('summary', '')
# Count statuses
critical_count = 0
warning_count = 0
ok_count = 0
for category, data in details.get('details', {}).items():
if isinstance(data, dict):
status = data.get('status', 'OK')
if status == 'CRITICAL':
critical_count += 1
elif status == 'WARNING':
warning_count += 1
elif status == 'OK':
ok_count += 1
return {
'status': overall_status,
'summary': summary,
'critical_count': critical_count,
'warning_count': warning_count,
'ok_count': ok_count,
'timestamp': datetime.now().isoformat()
}
def get_detailed_status(self) -> Dict[str, Any]:
"""
Get comprehensive health status with all checks.
Returns JSON structure matching the specification.
"""
details = {}
critical_issues = []
warning_issues = []
# Priority 1: Services PVE / FS / Storage
services_status = self._check_pve_services()
details['services'] = services_status
if services_status['status'] == 'CRITICAL':
critical_issues.append(services_status.get('reason', 'Service failure'))
elif services_status['status'] == 'WARNING':
warning_issues.append(services_status.get('reason', 'Service issue'))
storage_status = self._check_storage_comprehensive()
details['storage'] = storage_status
for storage_name, storage_data in storage_status.items():
if isinstance(storage_data, dict):
if storage_data.get('status') == 'CRITICAL':
critical_issues.append(f"{storage_name}: {storage_data.get('reason', 'Storage failure')}")
elif storage_data.get('status') == 'WARNING':
warning_issues.append(f"{storage_name}: {storage_data.get('reason', 'Storage issue')}")
# Priority 2: Disks / I/O
disks_status = self._check_disks_io()
details['disks'] = disks_status
for disk_name, disk_data in disks_status.items():
if isinstance(disk_data, dict):
if disk_data.get('status') == 'CRITICAL':
critical_issues.append(f"{disk_name}: {disk_data.get('reason', 'Disk failure')}")
elif disk_data.get('status') == 'WARNING':
warning_issues.append(f"{disk_name}: {disk_data.get('reason', 'Disk issue')}")
# Priority 3: VM/CT
vms_status = self._check_vms_cts()
details['vms'] = vms_status
if vms_status.get('status') == 'CRITICAL':
critical_issues.append(vms_status.get('reason', 'VM/CT failure'))
elif vms_status.get('status') == 'WARNING':
warning_issues.append(vms_status.get('reason', 'VM/CT issue'))
# Priority 4: Network
network_status = self._check_network_comprehensive()
details['network'] = network_status
if network_status.get('status') == 'CRITICAL':
critical_issues.append(network_status.get('reason', 'Network failure'))
elif network_status.get('status') == 'WARNING':
warning_issues.append(network_status.get('reason', 'Network issue'))
# Priority 5: CPU/RAM
cpu_status = self._check_cpu_with_hysteresis()
details['cpu'] = cpu_status
if cpu_status.get('status') == 'WARNING':
warning_issues.append(cpu_status.get('reason', 'CPU high'))
memory_status = self._check_memory_comprehensive()
details['memory'] = memory_status
if memory_status.get('status') == 'CRITICAL':
critical_issues.append(memory_status.get('reason', 'Memory critical'))
elif memory_status.get('status') == 'WARNING':
warning_issues.append(memory_status.get('reason', 'Memory high'))
# Priority 6: Logs
logs_status = self._check_logs_lightweight()
details['logs'] = logs_status
if logs_status.get('status') == 'CRITICAL':
critical_issues.append(logs_status.get('reason', 'Critical log errors'))
elif logs_status.get('status') == 'WARNING':
warning_issues.append(logs_status.get('reason', 'Log warnings'))
# Priority 7: Extras (Security, Certificates, Uptime)
security_status = self._check_security()
details['security'] = security_status
if security_status.get('status') == 'WARNING':
warning_issues.append(security_status.get('reason', 'Security issue'))
# Determine overall status
if critical_issues:
overall = 'CRITICAL'
summary = '; '.join(critical_issues[:3]) # Top 3 critical issues
elif warning_issues:
overall = 'WARNING'
summary = '; '.join(warning_issues[:3]) # Top 3 warnings
else:
overall = 'OK'
summary = 'All systems operational'
return {
'overall': overall,
'summary': summary,
'details': details,
'timestamp': datetime.now().isoformat()
}
def _check_cpu_with_hysteresis(self) -> Dict[str, Any]:
"""
Check CPU with hysteresis to avoid flapping alerts.
Requires sustained high usage before triggering.
"""
try:
# Get CPU usage (1 second sample to minimize impact)
cpu_percent = psutil.cpu_percent(interval=1)
current_time = time.time()
# Track state history
state_key = 'cpu_usage'
self.state_history[state_key].append({
'value': cpu_percent,
'time': current_time
})
# Keep only recent history (last 5 minutes)
self.state_history[state_key] = [
entry for entry in self.state_history[state_key]
if current_time - entry['time'] < 300
]
# Check for sustained high usage
critical_duration = sum(
1 for entry in self.state_history[state_key]
if entry['value'] >= self.CPU_CRITICAL and
current_time - entry['time'] <= self.CPU_CRITICAL_DURATION
)
warning_duration = sum(
1 for entry in self.state_history[state_key]
if entry['value'] >= self.CPU_WARNING and
current_time - entry['time'] <= self.CPU_WARNING_DURATION
)
recovery_duration = sum(
1 for entry in self.state_history[state_key]
if entry['value'] < self.CPU_RECOVERY and
current_time - entry['time'] <= self.CPU_RECOVERY_DURATION
)
# Determine status with hysteresis
if critical_duration >= 2: # 2+ readings in critical range
status = 'CRITICAL'
reason = f'CPU >{self.CPU_CRITICAL}% for {self.CPU_CRITICAL_DURATION}s'
elif warning_duration >= 2 and recovery_duration < 2:
status = 'WARNING'
reason = f'CPU >{self.CPU_WARNING}% for {self.CPU_WARNING_DURATION}s'
else:
status = 'OK'
reason = None
# Get temperature if available (checked once per minute max)
temp_status = self._check_cpu_temperature()
result = {
'status': status,
'usage': round(cpu_percent, 1),
'cores': psutil.cpu_count()
}
if reason:
result['reason'] = reason
if temp_status:
result['temperature'] = temp_status
if temp_status.get('status') == 'CRITICAL':
result['status'] = 'CRITICAL'
result['reason'] = temp_status.get('reason')
elif temp_status.get('status') == 'WARNING' and status == 'OK':
result['status'] = 'WARNING'
result['reason'] = temp_status.get('reason')
return result
except Exception as e:
return {'status': 'UNKNOWN', 'reason': f'CPU check failed: {str(e)}'}
def _check_cpu_temperature(self) -> Dict[str, Any]:
"""Check CPU temperature (cached, max 1 check per minute)"""
cache_key = 'cpu_temp'
current_time = time.time()
# Check cache
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key, {})
try:
# Try lm-sensors first
result = subprocess.run(
['sensors', '-A', '-u'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
temps = []
for line in result.stdout.split('\n'):
if 'temp' in line.lower() and '_input' in line:
try:
temp = float(line.split(':')[1].strip())
temps.append(temp)
except:
continue
if temps:
max_temp = max(temps)
if max_temp >= self.TEMP_CRITICAL:
status = 'CRITICAL'
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_CRITICAL}°C'
elif max_temp >= self.TEMP_WARNING:
status = 'WARNING'
reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_WARNING}°C'
else:
status = 'OK'
reason = None
temp_result = {
'status': status,
'value': round(max_temp, 1),
'unit': '°C'
}
if reason:
temp_result['reason'] = reason
self.cached_results[cache_key] = temp_result
self.last_check_times[cache_key] = current_time
return temp_result
# If sensors not available, return UNKNOWN (doesn't penalize)
unknown_result = {'status': 'UNKNOWN', 'reason': 'No temperature sensors available'}
self.cached_results[cache_key] = unknown_result
self.last_check_times[cache_key] = current_time
return unknown_result
except Exception:
unknown_result = {'status': 'UNKNOWN', 'reason': 'Temperature check unavailable'}
self.cached_results[cache_key] = unknown_result
self.last_check_times[cache_key] = current_time
return unknown_result
def _check_memory_comprehensive(self) -> Dict[str, Any]:
"""Check memory including RAM and swap with sustained thresholds"""
try:
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
current_time = time.time()
mem_percent = memory.percent
swap_percent = swap.percent if swap.total > 0 else 0
swap_vs_ram = (swap.used / memory.total * 100) if memory.total > 0 else 0
# Track memory state
state_key = 'memory_usage'
self.state_history[state_key].append({
'mem_percent': mem_percent,
'swap_percent': swap_percent,
'swap_vs_ram': swap_vs_ram,
'time': current_time
})
# Keep only recent history
self.state_history[state_key] = [
entry for entry in self.state_history[state_key]
if current_time - entry['time'] < 600
]
# Check sustained high memory
mem_critical = sum(
1 for entry in self.state_history[state_key]
if entry['mem_percent'] >= self.MEMORY_CRITICAL and
current_time - entry['time'] <= self.MEMORY_DURATION
)
mem_warning = sum(
1 for entry in self.state_history[state_key]
if entry['mem_percent'] >= self.MEMORY_WARNING and
current_time - entry['time'] <= self.MEMORY_DURATION
)
# Check swap usage
swap_critical = sum(
1 for entry in self.state_history[state_key]
if entry['swap_vs_ram'] > self.SWAP_CRITICAL_PERCENT and
current_time - entry['time'] <= self.SWAP_CRITICAL_DURATION
)
swap_warning = sum(
1 for entry in self.state_history[state_key]
if entry['swap_percent'] > 0 and
current_time - entry['time'] <= self.SWAP_WARNING_DURATION
)
# Determine status
if mem_critical >= 2:
status = 'CRITICAL'
reason = f'RAM >{self.MEMORY_CRITICAL}% for {self.MEMORY_DURATION}s'
elif swap_critical >= 2:
status = 'CRITICAL'
reason = f'Swap >{self.SWAP_CRITICAL_PERCENT}% of RAM for {self.SWAP_CRITICAL_DURATION}s'
elif mem_warning >= 2:
status = 'WARNING'
reason = f'RAM >{self.MEMORY_WARNING}% for {self.MEMORY_DURATION}s'
elif swap_warning >= 2:
status = 'WARNING'
reason = f'Swap active for >{self.SWAP_WARNING_DURATION}s'
else:
status = 'OK'
reason = None
result = {
'status': status,
'ram_percent': round(mem_percent, 1),
'ram_available_gb': round(memory.available / (1024**3), 2),
'swap_percent': round(swap_percent, 1),
'swap_used_gb': round(swap.used / (1024**3), 2)
}
if reason:
result['reason'] = reason
return result
except Exception as e:
return {'status': 'UNKNOWN', 'reason': f'Memory check failed: {str(e)}'}
def _check_storage_comprehensive(self) -> Dict[str, Any]:
"""
Comprehensive storage check including filesystems, mount points,
LVM, and Proxmox storages.
"""
storage_results = {}
# Check critical filesystems
critical_mounts = ['/', '/var', '/var/lib/vz']
for mount_point in critical_mounts:
if os.path.exists(mount_point):
fs_status = self._check_filesystem(mount_point)
storage_results[mount_point] = fs_status
# Check all mounted filesystems
try:
partitions = psutil.disk_partitions()
for partition in partitions:
if partition.mountpoint not in critical_mounts:
try:
fs_status = self._check_filesystem(partition.mountpoint)
storage_results[partition.mountpoint] = fs_status
except PermissionError:
continue
except Exception as e:
storage_results['partitions_error'] = {
'status': 'WARNING',
'reason': f'Could not enumerate partitions: {str(e)}'
}
# Check LVM (especially local-lvm)
lvm_status = self._check_lvm()
if lvm_status:
storage_results['lvm'] = lvm_status
# Check Proxmox storages
pve_storages = self._check_proxmox_storages()
if pve_storages:
storage_results.update(pve_storages)
return storage_results
def _check_filesystem(self, mount_point: str) -> Dict[str, Any]:
"""Check individual filesystem for space and mount status"""
try:
# Check if mounted
result = subprocess.run(
['mountpoint', '-q', mount_point],
capture_output=True,
timeout=2
)
if result.returncode != 0:
return {
'status': 'CRITICAL',
'reason': f'Not mounted'
}
# Check if read-only
with open('/proc/mounts', 'r') as f:
for line in f:
parts = line.split()
if len(parts) >= 4 and parts[1] == mount_point:
options = parts[3].split(',')
if 'ro' in options:
return {
'status': 'CRITICAL',
'reason': 'Mounted read-only'
}
# Check disk usage
usage = psutil.disk_usage(mount_point)
percent = usage.percent
if percent >= self.STORAGE_CRITICAL:
status = 'CRITICAL'
reason = f'{percent:.1f}% full (≥{self.STORAGE_CRITICAL}%)'
elif percent >= self.STORAGE_WARNING:
status = 'WARNING'
reason = f'{percent:.1f}% full (≥{self.STORAGE_WARNING}%)'
else:
status = 'OK'
reason = None
result = {
'status': status,
'usage_percent': round(percent, 1),
'free_gb': round(usage.free / (1024**3), 2),
'total_gb': round(usage.total / (1024**3), 2)
}
if reason:
result['reason'] = reason
return result
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Check failed: {str(e)}'
}
def _check_lvm(self) -> Dict[str, Any]:
"""Check LVM volumes, especially local-lvm"""
try:
result = subprocess.run(
['lvs', '--noheadings', '--options', 'lv_name,vg_name,lv_attr'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode != 0:
return {
'status': 'WARNING',
'reason': 'LVM not available or no volumes'
}
volumes = []
local_lvm_found = False
for line in result.stdout.strip().split('\n'):
if line.strip():
parts = line.split()
if len(parts) >= 2:
lv_name = parts[0].strip()
vg_name = parts[1].strip()
volumes.append(f'{vg_name}/{lv_name}')
if 'local-lvm' in lv_name or 'local-lvm' in vg_name:
local_lvm_found = True
if not local_lvm_found and volumes:
return {
'status': 'CRITICAL',
'reason': 'local-lvm volume not found',
'volumes': volumes
}
return {
'status': 'OK',
'volumes': volumes
}
except Exception as e:
return {
'status': 'WARNING',
'reason': f'LVM check failed: {str(e)}'
}
def _check_proxmox_storages(self) -> Dict[str, Any]:
"""Check Proxmox-specific storages (NFS, CIFS, PBS)"""
storages = {}
try:
# Read Proxmox storage configuration
if os.path.exists('/etc/pve/storage.cfg'):
with open('/etc/pve/storage.cfg', 'r') as f:
current_storage = None
storage_type = None
for line in f:
line = line.strip()
if line.startswith('dir:') or line.startswith('nfs:') or \
line.startswith('cifs:') or line.startswith('pbs:'):
parts = line.split(':', 1)
storage_type = parts[0]
current_storage = parts[1].strip()
elif line.startswith('path ') and current_storage:
path = line.split(None, 1)[1]
if storage_type == 'dir':
if os.path.exists(path):
storages[f'storage_{current_storage}'] = {
'status': 'OK',
'type': 'dir',
'path': path
}
else:
storages[f'storage_{current_storage}'] = {
'status': 'CRITICAL',
'reason': 'Directory does not exist',
'type': 'dir',
'path': path
}
current_storage = None
storage_type = None
except Exception as e:
storages['pve_storage_config'] = {
'status': 'WARNING',
'reason': f'Could not read storage config: {str(e)}'
}
return storages
def _check_disks_io(self) -> Dict[str, Any]:
"""Check disk I/O errors from dmesg (lightweight)"""
disks = {}
current_time = time.time()
try:
# Only check dmesg for recent errors (last 2 seconds of kernel log)
result = subprocess.run(
['dmesg', '-T', '--level=err,warn', '--since', '5 minutes ago'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
io_errors = defaultdict(int)
for line in result.stdout.split('\n'):
line_lower = line.lower()
if any(keyword in line_lower for keyword in ['i/o error', 'ata error', 'scsi error']):
# Extract disk name
for part in line.split():
if part.startswith('sd') or part.startswith('nvme') or part.startswith('hd'):
disk_name = part.rstrip(':,')
io_errors[disk_name] += 1
# Track in history
self.io_error_history[disk_name].append(current_time)
# Clean old history (keep last 5 minutes)
for disk in list(self.io_error_history.keys()):
self.io_error_history[disk] = [
t for t in self.io_error_history[disk]
if current_time - t < 300
]
error_count = len(self.io_error_history[disk])
if error_count >= 3:
disks[f'/dev/{disk}'] = {
'status': 'CRITICAL',
'reason': f'{error_count} I/O errors in 5 minutes'
}
elif error_count >= 1:
disks[f'/dev/{disk}'] = {
'status': 'WARNING',
'reason': f'{error_count} I/O error(s) in 5 minutes'
}
# If no errors found, report OK
if not disks:
disks['status'] = 'OK'
return disks
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Disk I/O check failed: {str(e)}'
}
def _check_network_comprehensive(self) -> Dict[str, Any]:
"""Check network interfaces, bridges, and connectivity"""
try:
issues = []
interface_details = {}
# Check interface status
net_if_stats = psutil.net_if_stats()
net_io = psutil.net_io_counters(pernic=True)
current_time = time.time()
for interface, stats in net_if_stats.items():
if interface == 'lo':
continue
# Check if interface is down (excluding administratively down)
if not stats.isup:
# Check if it's a bridge or important interface
if interface.startswith('vmbr') or interface.startswith('eth') or interface.startswith('ens'):
issues.append(f'{interface} is DOWN')
interface_details[interface] = {
'status': 'CRITICAL',
'reason': 'Interface DOWN'
}
continue
# Check bridge traffic (if no traffic for 10 minutes)
if interface.startswith('vmbr') and interface in net_io:
io_stats = net_io[interface]
# Initialize baseline if not exists
if interface not in self.network_baseline:
self.network_baseline[interface] = {
'rx_bytes': io_stats.bytes_recv,
'tx_bytes': io_stats.bytes_sent,
'time': current_time
}
else:
baseline = self.network_baseline[interface]
time_diff = current_time - baseline['time']
if time_diff >= self.NETWORK_INACTIVE_DURATION:
rx_diff = io_stats.bytes_recv - baseline['rx_bytes']
tx_diff = io_stats.bytes_sent - baseline['tx_bytes']
if rx_diff == 0 and tx_diff == 0:
issues.append(f'{interface} no traffic for 10+ minutes')
interface_details[interface] = {
'status': 'WARNING',
'reason': 'No traffic for 10+ minutes'
}
# Update baseline
self.network_baseline[interface] = {
'rx_bytes': io_stats.bytes_recv,
'tx_bytes': io_stats.bytes_sent,
'time': current_time
}
# Check gateway/DNS latency (lightweight, cached)
latency_status = self._check_network_latency()
if latency_status.get('status') != 'OK':
issues.append(latency_status.get('reason', 'Network latency issue'))
interface_details['connectivity'] = latency_status
# Determine overall network status
if any('CRITICAL' in str(detail.get('status')) for detail in interface_details.values()):
status = 'CRITICAL'
reason = '; '.join(issues[:2])
elif issues:
status = 'WARNING'
reason = '; '.join(issues[:2])
else:
status = 'OK'
reason = None
result = {'status': status}
if reason:
result['reason'] = reason
if interface_details:
result['interfaces'] = interface_details
return result
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Network check failed: {str(e)}'
}
def _check_network_latency(self) -> Dict[str, Any]:
"""Check network latency to gateway/DNS (cached, max 1 check per minute)"""
cache_key = 'network_latency'
current_time = time.time()
# Check cache
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 60:
return self.cached_results.get(cache_key, {'status': 'OK'})
try:
# Ping default gateway or 1.1.1.1
result = subprocess.run(
['ping', '-c', '1', '-W', '1', '1.1.1.1'],
capture_output=True,
text=True,
timeout=self.NETWORK_TIMEOUT
)
if result.returncode == 0:
# Extract latency
for line in result.stdout.split('\n'):
if 'time=' in line:
try:
latency_str = line.split('time=')[1].split()[0]
latency = float(latency_str)
if latency > self.NETWORK_LATENCY_CRITICAL:
status = 'CRITICAL'
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_CRITICAL}ms'
elif latency > self.NETWORK_LATENCY_WARNING:
status = 'WARNING'
reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_WARNING}ms'
else:
status = 'OK'
reason = None
latency_result = {
'status': status,
'latency_ms': round(latency, 1)
}
if reason:
latency_result['reason'] = reason
self.cached_results[cache_key] = latency_result
self.last_check_times[cache_key] = current_time
return latency_result
except:
pass
# Ping failed
packet_loss_result = {
'status': 'CRITICAL',
'reason': 'Packet loss or timeout'
}
self.cached_results[cache_key] = packet_loss_result
self.last_check_times[cache_key] = current_time
return packet_loss_result
except Exception as e:
error_result = {
'status': 'WARNING',
'reason': f'Latency check failed: {str(e)}'
}
self.cached_results[cache_key] = error_result
self.last_check_times[cache_key] = current_time
return error_result
def _check_vms_cts(self) -> Dict[str, Any]:
"""Check VM and CT status for unexpected stops"""
try:
issues = []
vm_details = {}
# Check VMs
try:
result = subprocess.run(
['qm', 'list'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n')[1:]:
if line.strip():
parts = line.split()
if len(parts) >= 3:
vmid = parts[0]
vm_status = parts[2]
if vm_status == 'stopped':
# Check if unexpected (this is simplified, would need autostart config)
vm_details[f'vm_{vmid}'] = {
'status': 'WARNING',
'reason': 'VM stopped'
}
issues.append(f'VM {vmid} stopped')
except Exception as e:
vm_details['vms_check'] = {
'status': 'WARNING',
'reason': f'Could not check VMs: {str(e)}'
}
# Check CTs
try:
result = subprocess.run(
['pct', 'list'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n')[1:]:
if line.strip():
parts = line.split()
if len(parts) >= 2:
ctid = parts[0]
ct_status = parts[1]
if ct_status == 'stopped':
vm_details[f'ct_{ctid}'] = {
'status': 'WARNING',
'reason': 'CT stopped'
}
issues.append(f'CT {ctid} stopped')
except Exception as e:
vm_details['cts_check'] = {
'status': 'WARNING',
'reason': f'Could not check CTs: {str(e)}'
}
# Determine overall status
if issues:
status = 'WARNING'
reason = '; '.join(issues[:3])
else:
status = 'OK'
reason = None
result = {'status': status}
if reason:
result['reason'] = reason
if vm_details:
result['details'] = vm_details
return result
except Exception as e:
return {
'status': 'WARNING',
'reason': f'VM/CT check failed: {str(e)}'
}
def _check_pve_services(self) -> Dict[str, Any]:
"""Check critical Proxmox services"""
try:
failed_services = []
for service in self.PVE_SERVICES:
try:
result = subprocess.run(
['systemctl', 'is-active', service],
capture_output=True,
text=True,
timeout=2
)
if result.returncode != 0 or result.stdout.strip() != 'active':
failed_services.append(service)
except Exception:
failed_services.append(service)
if failed_services:
return {
'status': 'CRITICAL',
'reason': f'Services inactive: {", ".join(failed_services)}',
'failed': failed_services
}
return {'status': 'OK'}
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Service check failed: {str(e)}'
}
def _check_logs_lightweight(self) -> Dict[str, Any]:
"""Lightweight log analysis (cached, checked every 5 minutes)"""
cache_key = 'logs_analysis'
current_time = time.time()
# Check cache
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < self.LOG_CHECK_INTERVAL:
return self.cached_results.get(cache_key, {'status': 'OK'})
try:
# Check journalctl for recent errors and warnings
result = subprocess.run(
['journalctl', '--since', '5 minutes ago', '--no-pager', '-p', 'warning'],
capture_output=True,
text=True,
timeout=3
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
errors_5m = 0
warnings_5m = 0
critical_keywords_found = []
for line in lines:
line_lower = line.lower()
# Check for critical keywords
for keyword in self.CRITICAL_LOG_KEYWORDS:
if keyword.lower() in line_lower:
critical_keywords_found.append(keyword)
errors_5m += 1
break
else:
# Count errors and warnings
if 'error' in line_lower or 'critical' in line_lower or 'fatal' in line_lower:
errors_5m += 1
elif 'warning' in line_lower or 'warn' in line_lower:
warnings_5m += 1
# Determine status
if critical_keywords_found:
status = 'CRITICAL'
reason = f'Critical errors: {", ".join(set(critical_keywords_found[:3]))}'
elif errors_5m >= self.LOG_ERRORS_CRITICAL:
status = 'CRITICAL'
reason = f'{errors_5m} errors in 5 minutes (≥{self.LOG_ERRORS_CRITICAL})'
elif warnings_5m >= self.LOG_WARNINGS_CRITICAL:
status = 'CRITICAL'
reason = f'{warnings_5m} warnings in 5 minutes (≥{self.LOG_WARNINGS_CRITICAL})'
elif errors_5m >= self.LOG_ERRORS_WARNING:
status = 'WARNING'
reason = f'{errors_5m} errors in 5 minutes'
elif warnings_5m >= self.LOG_WARNINGS_WARNING:
status = 'WARNING'
reason = f'{warnings_5m} warnings in 5 minutes'
else:
status = 'OK'
reason = None
log_result = {
'status': status,
'errors_5m': errors_5m,
'warnings_5m': warnings_5m
}
if reason:
log_result['reason'] = reason
self.cached_results[cache_key] = log_result
self.last_check_times[cache_key] = current_time
return log_result
ok_result = {'status': 'OK'}
self.cached_results[cache_key] = ok_result
self.last_check_times[cache_key] = current_time
return ok_result
except Exception as e:
error_result = {
'status': 'WARNING',
'reason': f'Log check failed: {str(e)}'
}
self.cached_results[cache_key] = error_result
self.last_check_times[cache_key] = current_time
return error_result
def _check_security(self) -> Dict[str, Any]:
"""Check security-related items (fail2ban, certificates, uptime)"""
try:
issues = []
# Check fail2ban
try:
result = subprocess.run(
['systemctl', 'is-active', 'fail2ban'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode != 0 or result.stdout.strip() != 'active':
issues.append('fail2ban inactive')
except Exception:
pass
# Check uptime (warning if >180 days)
try:
uptime_seconds = time.time() - psutil.boot_time()
uptime_days = uptime_seconds / 86400
if uptime_days > 180:
issues.append(f'Uptime {int(uptime_days)} days (>180)')
except Exception:
pass
# Check SSL certificates (cached, checked once per day)
cert_status = self._check_certificates()
if cert_status.get('status') != 'OK':
issues.append(cert_status.get('reason', 'Certificate issue'))
if issues:
return {
'status': 'WARNING',
'reason': '; '.join(issues[:2])
}
return {'status': 'OK'}
except Exception as e:
return {
'status': 'WARNING',
'reason': f'Security check failed: {str(e)}'
}
def _check_certificates(self) -> Dict[str, Any]:
"""Check SSL certificate expiration (cached, checked once per day)"""
cache_key = 'certificates'
current_time = time.time()
# Check cache (24 hours)
if cache_key in self.last_check_times:
if current_time - self.last_check_times[cache_key] < 86400:
return self.cached_results.get(cache_key, {'status': 'OK'})
try:
# Check PVE certificate
cert_path = '/etc/pve/local/pve-ssl.pem'
if os.path.exists(cert_path):
result = subprocess.run(
['openssl', 'x509', '-enddate', '-noout', '-in', cert_path],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
# Parse expiration date
date_str = result.stdout.strip().replace('notAfter=', '')
try:
from datetime import datetime
exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (exp_date - datetime.now()).days
if days_until_expiry < 0:
status = 'CRITICAL'
reason = 'Certificate expired'
elif days_until_expiry < 15:
status = 'WARNING'
reason = f'Certificate expires in {days_until_expiry} days'
else:
status = 'OK'
reason = None
cert_result = {'status': status}
if reason:
cert_result['reason'] = reason
self.cached_results[cache_key] = cert_result
self.last_check_times[cache_key] = current_time
return cert_result
except Exception:
pass
ok_result = {'status': 'OK'}
self.cached_results[cache_key] = ok_result
self.last_check_times[cache_key] = current_time
return ok_result
except Exception:
ok_result = {'status': 'OK'}
self.cached_results[cache_key] = ok_result
self.last_check_times[cache_key] = current_time
return ok_result
# Global instance
health_monitor = HealthMonitor()