From 5b0d55c1a24ffcb3380b6a4493ccaf6a1e6a216f Mon Sep 17 00:00:00 2001 From: MacRimi Date: Wed, 5 Nov 2025 18:30:31 +0100 Subject: [PATCH] Update health_monitor.py --- AppImage/scripts/health_monitor.py | 1479 +++++++++++++++++++++------- 1 file changed, 1124 insertions(+), 355 deletions(-) diff --git a/AppImage/scripts/health_monitor.py b/AppImage/scripts/health_monitor.py index 39f72be..7eea8f1 100644 --- a/AppImage/scripts/health_monitor.py +++ b/AppImage/scripts/health_monitor.py @@ -1,407 +1,1176 @@ """ -Health Monitor Module -Provides comprehensive health checks for the Proxmox system including: -- CPU and Memory usage -- Storage health (pools, disks, remote storage) -- Network health (interface errors) -- VM status -- System events/logs errors +ProxMenux Health Monitor Module +Provides comprehensive, lightweight health checks for Proxmox systems. +Optimized for minimal system impact with intelligent thresholds and hysteresis. + +Author: MacRimi +Version: 1.0 (Light Health Logic) """ import psutil import subprocess import json -from typing import Dict, List, Any +import time +import os +from typing import Dict, List, Any, Tuple +from datetime import datetime, timedelta +from collections import defaultdict class HealthMonitor: - """Monitors system health across multiple components""" + """ + Monitors system health across multiple components with minimal impact. + Implements hysteresis, intelligent caching, and progressive escalation. + """ - # Thresholds - CPU_WARNING = 75 - CPU_CRITICAL = 90 - MEMORY_WARNING = 75 - MEMORY_CRITICAL = 90 + # CPU Thresholds + CPU_WARNING = 85 + CPU_CRITICAL = 95 + CPU_RECOVERY = 75 + CPU_WARNING_DURATION = 60 # seconds + CPU_CRITICAL_DURATION = 120 # seconds + CPU_RECOVERY_DURATION = 120 # seconds + + # Memory Thresholds + MEMORY_WARNING = 85 + MEMORY_CRITICAL = 95 + MEMORY_DURATION = 60 # seconds + SWAP_WARNING_DURATION = 300 # 5 minutes + SWAP_CRITICAL_PERCENT = 5 # 5% of RAM + SWAP_CRITICAL_DURATION = 120 # 2 minutes + + # Storage Thresholds + STORAGE_WARNING = 85 + STORAGE_CRITICAL = 95 + + # Temperature Thresholds + TEMP_WARNING = 80 + TEMP_CRITICAL = 90 + + # Network Thresholds + NETWORK_LATENCY_WARNING = 100 # ms + NETWORK_LATENCY_CRITICAL = 300 # ms + NETWORK_TIMEOUT = 0.9 # seconds + NETWORK_INACTIVE_DURATION = 600 # 10 minutes + + # Log Thresholds + LOG_ERRORS_WARNING = 5 + LOG_ERRORS_CRITICAL = 6 + LOG_WARNINGS_WARNING = 10 + LOG_WARNINGS_CRITICAL = 30 + LOG_CHECK_INTERVAL = 300 # 5 minutes + + # Critical keywords for immediate escalation + CRITICAL_LOG_KEYWORDS = [ + 'I/O error', 'EXT4-fs error', 'XFS', 'LVM activation failed', + 'md/raid: device failed', 'Out of memory', 'kernel panic', + 'filesystem read-only', 'cannot mount' + ] + + # PVE Critical Services + PVE_SERVICES = ['pveproxy', 'pvedaemon', 'pvestatd', 'pve-cluster'] def __init__(self): - self.checks = [] - + """Initialize health monitor with state tracking""" + self.state_history = defaultdict(list) # For hysteresis + self.last_check_times = {} # Cache check times + self.cached_results = {} # Cache results + self.network_baseline = {} # Network traffic baseline + self.io_error_history = defaultdict(list) # I/O error tracking + def get_overall_status(self) -> Dict[str, Any]: - """Get overall health status summary""" - checks = self.run_all_checks() + """Get overall health status summary with minimal overhead""" + details = self.get_detailed_status() - # Determine overall status - critical_count = sum(1 for c in checks if c['status'] == 'critical') - warning_count = sum(1 for c in checks if c['status'] == 'warning') + overall_status = details.get('overall', 'OK') + summary = details.get('summary', '') - if critical_count > 0: - overall_status = 'critical' - elif warning_count > 0: - overall_status = 'warning' - else: - overall_status = 'healthy' + # Count statuses + critical_count = 0 + warning_count = 0 + ok_count = 0 + + for category, data in details.get('details', {}).items(): + if isinstance(data, dict): + status = data.get('status', 'OK') + if status == 'CRITICAL': + critical_count += 1 + elif status == 'WARNING': + warning_count += 1 + elif status == 'OK': + ok_count += 1 return { 'status': overall_status, + 'summary': summary, 'critical_count': critical_count, 'warning_count': warning_count, - 'healthy_count': len(checks) - critical_count - warning_count, - 'total_checks': len(checks), - 'timestamp': psutil.boot_time() + 'ok_count': ok_count, + 'timestamp': datetime.now().isoformat() } def get_detailed_status(self) -> Dict[str, Any]: - """Get detailed health status with all checks""" - checks = self.run_all_checks() - overall = self.get_overall_status() + """ + Get comprehensive health status with all checks. + Returns JSON structure matching the specification. + """ + details = {} + critical_issues = [] + warning_issues = [] + + # Priority 1: Services PVE / FS / Storage + services_status = self._check_pve_services() + details['services'] = services_status + if services_status['status'] == 'CRITICAL': + critical_issues.append(services_status.get('reason', 'Service failure')) + elif services_status['status'] == 'WARNING': + warning_issues.append(services_status.get('reason', 'Service issue')) + + storage_status = self._check_storage_comprehensive() + details['storage'] = storage_status + for storage_name, storage_data in storage_status.items(): + if isinstance(storage_data, dict): + if storage_data.get('status') == 'CRITICAL': + critical_issues.append(f"{storage_name}: {storage_data.get('reason', 'Storage failure')}") + elif storage_data.get('status') == 'WARNING': + warning_issues.append(f"{storage_name}: {storage_data.get('reason', 'Storage issue')}") + + # Priority 2: Disks / I/O + disks_status = self._check_disks_io() + details['disks'] = disks_status + for disk_name, disk_data in disks_status.items(): + if isinstance(disk_data, dict): + if disk_data.get('status') == 'CRITICAL': + critical_issues.append(f"{disk_name}: {disk_data.get('reason', 'Disk failure')}") + elif disk_data.get('status') == 'WARNING': + warning_issues.append(f"{disk_name}: {disk_data.get('reason', 'Disk issue')}") + + # Priority 3: VM/CT + vms_status = self._check_vms_cts() + details['vms'] = vms_status + if vms_status.get('status') == 'CRITICAL': + critical_issues.append(vms_status.get('reason', 'VM/CT failure')) + elif vms_status.get('status') == 'WARNING': + warning_issues.append(vms_status.get('reason', 'VM/CT issue')) + + # Priority 4: Network + network_status = self._check_network_comprehensive() + details['network'] = network_status + if network_status.get('status') == 'CRITICAL': + critical_issues.append(network_status.get('reason', 'Network failure')) + elif network_status.get('status') == 'WARNING': + warning_issues.append(network_status.get('reason', 'Network issue')) + + # Priority 5: CPU/RAM + cpu_status = self._check_cpu_with_hysteresis() + details['cpu'] = cpu_status + if cpu_status.get('status') == 'WARNING': + warning_issues.append(cpu_status.get('reason', 'CPU high')) + + memory_status = self._check_memory_comprehensive() + details['memory'] = memory_status + if memory_status.get('status') == 'CRITICAL': + critical_issues.append(memory_status.get('reason', 'Memory critical')) + elif memory_status.get('status') == 'WARNING': + warning_issues.append(memory_status.get('reason', 'Memory high')) + + # Priority 6: Logs + logs_status = self._check_logs_lightweight() + details['logs'] = logs_status + if logs_status.get('status') == 'CRITICAL': + critical_issues.append(logs_status.get('reason', 'Critical log errors')) + elif logs_status.get('status') == 'WARNING': + warning_issues.append(logs_status.get('reason', 'Log warnings')) + + # Priority 7: Extras (Security, Certificates, Uptime) + security_status = self._check_security() + details['security'] = security_status + if security_status.get('status') == 'WARNING': + warning_issues.append(security_status.get('reason', 'Security issue')) + + # Determine overall status + if critical_issues: + overall = 'CRITICAL' + summary = '; '.join(critical_issues[:3]) # Top 3 critical issues + elif warning_issues: + overall = 'WARNING' + summary = '; '.join(warning_issues[:3]) # Top 3 warnings + else: + overall = 'OK' + summary = 'All systems operational' return { 'overall': overall, - 'checks': checks + 'summary': summary, + 'details': details, + 'timestamp': datetime.now().isoformat() } - def run_all_checks(self) -> List[Dict[str, Any]]: - """Run all health checks and return results""" - checks = [] - - # CPU Check - checks.append(self.check_cpu()) - - # Memory Check - checks.append(self.check_memory()) - - # Storage Checks - checks.extend(self.check_storage()) - - # Network Checks - checks.extend(self.check_network()) - - # VM Checks - checks.extend(self.check_vms()) - - # Events/Logs Check - checks.append(self.check_events()) - - return checks - - def check_cpu(self) -> Dict[str, Any]: - """Check CPU usage""" - cpu_percent = psutil.cpu_percent(interval=1) - - if cpu_percent >= self.CPU_CRITICAL: - status = 'critical' - message = f'CPU usage is critically high at {cpu_percent:.1f}%' - elif cpu_percent >= self.CPU_WARNING: - status = 'warning' - message = f'CPU usage is elevated at {cpu_percent:.1f}%' - else: - status = 'healthy' - message = f'CPU usage is normal at {cpu_percent:.1f}%' - - return { - 'category': 'System', - 'name': 'CPU Usage', - 'status': status, - 'value': f'{cpu_percent:.1f}%', - 'message': message, - 'details': { - 'usage': cpu_percent, - 'cores': psutil.cpu_count(), - 'warning_threshold': self.CPU_WARNING, - 'critical_threshold': self.CPU_CRITICAL - } - } - - def check_memory(self) -> Dict[str, Any]: - """Check memory usage""" - memory = psutil.virtual_memory() - mem_percent = memory.percent - - if mem_percent >= self.MEMORY_CRITICAL: - status = 'critical' - message = f'Memory usage is critically high at {mem_percent:.1f}%' - elif mem_percent >= self.MEMORY_WARNING: - status = 'warning' - message = f'Memory usage is elevated at {mem_percent:.1f}%' - else: - status = 'healthy' - message = f'Memory usage is normal at {mem_percent:.1f}%' - - return { - 'category': 'System', - 'name': 'Memory Usage', - 'status': status, - 'value': f'{mem_percent:.1f}%', - 'message': message, - 'details': { - 'usage': mem_percent, - 'total': memory.total, - 'available': memory.available, - 'used': memory.used, - 'warning_threshold': self.MEMORY_WARNING, - 'critical_threshold': self.MEMORY_CRITICAL - } - } - - def check_storage(self) -> List[Dict[str, Any]]: - """Check storage health including ZFS pools and disks""" - checks = [] - - # Check ZFS pools + def _check_cpu_with_hysteresis(self) -> Dict[str, Any]: + """ + Check CPU with hysteresis to avoid flapping alerts. + Requires sustained high usage before triggering. + """ try: - result = subprocess.run(['zpool', 'status'], capture_output=True, text=True, timeout=5) - if result.returncode == 0: - output = result.stdout - - # Parse pool status - pools = self._parse_zpool_status(output) - for pool in pools: - if pool['state'] == 'DEGRADED': - status = 'critical' - message = f"Pool '{pool['name']}' is degraded" - elif pool['state'] == 'FAULTED': - status = 'critical' - message = f"Pool '{pool['name']}' is faulted" - elif pool['state'] == 'OFFLINE': - status = 'critical' - message = f"Pool '{pool['name']}' is offline" - elif pool['errors'] > 0: - status = 'warning' - message = f"Pool '{pool['name']}' has {pool['errors']} errors" - else: - status = 'healthy' - message = f"Pool '{pool['name']}' is healthy" - - checks.append({ - 'category': 'Storage', - 'name': f"ZFS Pool: {pool['name']}", - 'status': status, - 'value': pool['state'], - 'message': message, - 'details': pool - }) - except Exception as e: - checks.append({ - 'category': 'Storage', - 'name': 'ZFS Pools', - 'status': 'warning', - 'value': 'Unknown', - 'message': f'Could not check ZFS pools: {str(e)}', - 'details': {'error': str(e)} - }) - - # Check disk partitions - partitions = psutil.disk_partitions() - for partition in partitions: - try: - usage = psutil.disk_usage(partition.mountpoint) - percent = usage.percent - - if percent >= 95: - status = 'critical' - message = f"Disk '{partition.mountpoint}' is critically full at {percent:.1f}%" - elif percent >= 85: - status = 'warning' - message = f"Disk '{partition.mountpoint}' is getting full at {percent:.1f}%" - else: - status = 'healthy' - message = f"Disk '{partition.mountpoint}' has sufficient space ({percent:.1f}% used)" - - checks.append({ - 'category': 'Storage', - 'name': f"Disk: {partition.mountpoint}", - 'status': status, - 'value': f'{percent:.1f}%', - 'message': message, - 'details': { - 'device': partition.device, - 'mountpoint': partition.mountpoint, - 'fstype': partition.fstype, - 'total': usage.total, - 'used': usage.used, - 'free': usage.free, - 'percent': percent - } - }) - except PermissionError: - continue - - return checks - - def check_network(self) -> List[Dict[str, Any]]: - """Check network interface health (errors, not inactive interfaces)""" - checks = [] - - # Get network interface stats - net_io = psutil.net_io_counters(pernic=True) - net_if_stats = psutil.net_if_stats() - - for interface, stats in net_io.items(): - # Skip loopback - if interface == 'lo': - continue + # Get CPU usage (1 second sample to minimize impact) + cpu_percent = psutil.cpu_percent(interval=1) + current_time = time.time() - # Only check active interfaces - if interface in net_if_stats and net_if_stats[interface].isup: - errors = stats.errin + stats.errout - drops = stats.dropin + stats.dropout - - if errors > 100 or drops > 100: - status = 'critical' - message = f"Interface '{interface}' has {errors} errors and {drops} dropped packets" - elif errors > 10 or drops > 10: - status = 'warning' - message = f"Interface '{interface}' has {errors} errors and {drops} dropped packets" - else: - status = 'healthy' - message = f"Interface '{interface}' is operating normally" - - checks.append({ - 'category': 'Network', - 'name': f"Interface: {interface}", - 'status': status, - 'value': 'Active', - 'message': message, - 'details': { - 'errors_in': stats.errin, - 'errors_out': stats.errout, - 'drops_in': stats.dropin, - 'drops_out': stats.dropout, - 'bytes_sent': stats.bytes_sent, - 'bytes_recv': stats.bytes_recv - } - }) - - return checks - - def check_vms(self) -> List[Dict[str, Any]]: - """Check VM status""" - checks = [] - - try: - # Get VM list from qm - result = subprocess.run(['qm', 'list'], capture_output=True, text=True, timeout=5) - if result.returncode == 0: - lines = result.stdout.strip().split('\n')[1:] # Skip header - - running_count = 0 - stopped_count = 0 - error_count = 0 - - for line in lines: - if line.strip(): - parts = line.split() - if len(parts) >= 3: - vm_status = parts[2] - if vm_status == 'running': - running_count += 1 - elif vm_status == 'stopped': - stopped_count += 1 - else: - error_count += 1 - - if error_count > 0: - status = 'warning' - message = f'{error_count} VMs in unexpected state' - else: - status = 'healthy' - message = f'{running_count} running, {stopped_count} stopped' - - checks.append({ - 'category': 'Virtual Machines', - 'name': 'VM Status', - 'status': status, - 'value': f'{running_count + stopped_count} total', - 'message': message, - 'details': { - 'running': running_count, - 'stopped': stopped_count, - 'errors': error_count - } - }) - except Exception as e: - checks.append({ - 'category': 'Virtual Machines', - 'name': 'VM Status', - 'status': 'warning', - 'value': 'Unknown', - 'message': f'Could not check VM status: {str(e)}', - 'details': {'error': str(e)} + # Track state history + state_key = 'cpu_usage' + self.state_history[state_key].append({ + 'value': cpu_percent, + 'time': current_time }) - - return checks + + # Keep only recent history (last 5 minutes) + self.state_history[state_key] = [ + entry for entry in self.state_history[state_key] + if current_time - entry['time'] < 300 + ] + + # Check for sustained high usage + critical_duration = sum( + 1 for entry in self.state_history[state_key] + if entry['value'] >= self.CPU_CRITICAL and + current_time - entry['time'] <= self.CPU_CRITICAL_DURATION + ) + + warning_duration = sum( + 1 for entry in self.state_history[state_key] + if entry['value'] >= self.CPU_WARNING and + current_time - entry['time'] <= self.CPU_WARNING_DURATION + ) + + recovery_duration = sum( + 1 for entry in self.state_history[state_key] + if entry['value'] < self.CPU_RECOVERY and + current_time - entry['time'] <= self.CPU_RECOVERY_DURATION + ) + + # Determine status with hysteresis + if critical_duration >= 2: # 2+ readings in critical range + status = 'CRITICAL' + reason = f'CPU >{self.CPU_CRITICAL}% for {self.CPU_CRITICAL_DURATION}s' + elif warning_duration >= 2 and recovery_duration < 2: + status = 'WARNING' + reason = f'CPU >{self.CPU_WARNING}% for {self.CPU_WARNING_DURATION}s' + else: + status = 'OK' + reason = None + + # Get temperature if available (checked once per minute max) + temp_status = self._check_cpu_temperature() + + result = { + 'status': status, + 'usage': round(cpu_percent, 1), + 'cores': psutil.cpu_count() + } + + if reason: + result['reason'] = reason + + if temp_status: + result['temperature'] = temp_status + if temp_status.get('status') == 'CRITICAL': + result['status'] = 'CRITICAL' + result['reason'] = temp_status.get('reason') + elif temp_status.get('status') == 'WARNING' and status == 'OK': + result['status'] = 'WARNING' + result['reason'] = temp_status.get('reason') + + return result + + except Exception as e: + return {'status': 'UNKNOWN', 'reason': f'CPU check failed: {str(e)}'} - def check_events(self) -> Dict[str, Any]: - """Check system events/logs for errors""" + def _check_cpu_temperature(self) -> Dict[str, Any]: + """Check CPU temperature (cached, max 1 check per minute)""" + cache_key = 'cpu_temp' + current_time = time.time() + + # Check cache + if cache_key in self.last_check_times: + if current_time - self.last_check_times[cache_key] < 60: + return self.cached_results.get(cache_key, {}) + try: - # Check journalctl for recent errors + # Try lm-sensors first result = subprocess.run( - ['journalctl', '-p', 'err', '-n', '100', '--no-pager'], + ['sensors', '-A', '-u'], capture_output=True, text=True, - timeout=5 + timeout=2 ) if result.returncode == 0: - error_lines = [line for line in result.stdout.split('\n') if line.strip()] - error_count = len(error_lines) + temps = [] + for line in result.stdout.split('\n'): + if 'temp' in line.lower() and '_input' in line: + try: + temp = float(line.split(':')[1].strip()) + temps.append(temp) + except: + continue - if error_count > 50: - status = 'critical' - message = f'{error_count} errors in recent logs' - elif error_count > 10: - status = 'warning' - message = f'{error_count} errors in recent logs' - else: - status = 'healthy' - message = f'{error_count} errors in recent logs (normal)' - - return { - 'category': 'System Events', - 'name': 'Error Logs', - 'status': status, - 'value': f'{error_count} errors', - 'message': message, - 'details': { - 'error_count': error_count, - 'recent_errors': error_lines[:5] # Last 5 errors + if temps: + max_temp = max(temps) + + if max_temp >= self.TEMP_CRITICAL: + status = 'CRITICAL' + reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_CRITICAL}°C' + elif max_temp >= self.TEMP_WARNING: + status = 'WARNING' + reason = f'CPU temperature {max_temp}°C ≥{self.TEMP_WARNING}°C' + else: + status = 'OK' + reason = None + + temp_result = { + 'status': status, + 'value': round(max_temp, 1), + 'unit': '°C' } + if reason: + temp_result['reason'] = reason + + self.cached_results[cache_key] = temp_result + self.last_check_times[cache_key] = current_time + return temp_result + + # If sensors not available, return UNKNOWN (doesn't penalize) + unknown_result = {'status': 'UNKNOWN', 'reason': 'No temperature sensors available'} + self.cached_results[cache_key] = unknown_result + self.last_check_times[cache_key] = current_time + return unknown_result + + except Exception: + unknown_result = {'status': 'UNKNOWN', 'reason': 'Temperature check unavailable'} + self.cached_results[cache_key] = unknown_result + self.last_check_times[cache_key] = current_time + return unknown_result + + def _check_memory_comprehensive(self) -> Dict[str, Any]: + """Check memory including RAM and swap with sustained thresholds""" + try: + memory = psutil.virtual_memory() + swap = psutil.swap_memory() + current_time = time.time() + + mem_percent = memory.percent + swap_percent = swap.percent if swap.total > 0 else 0 + swap_vs_ram = (swap.used / memory.total * 100) if memory.total > 0 else 0 + + # Track memory state + state_key = 'memory_usage' + self.state_history[state_key].append({ + 'mem_percent': mem_percent, + 'swap_percent': swap_percent, + 'swap_vs_ram': swap_vs_ram, + 'time': current_time + }) + + # Keep only recent history + self.state_history[state_key] = [ + entry for entry in self.state_history[state_key] + if current_time - entry['time'] < 600 + ] + + # Check sustained high memory + mem_critical = sum( + 1 for entry in self.state_history[state_key] + if entry['mem_percent'] >= self.MEMORY_CRITICAL and + current_time - entry['time'] <= self.MEMORY_DURATION + ) + + mem_warning = sum( + 1 for entry in self.state_history[state_key] + if entry['mem_percent'] >= self.MEMORY_WARNING and + current_time - entry['time'] <= self.MEMORY_DURATION + ) + + # Check swap usage + swap_critical = sum( + 1 for entry in self.state_history[state_key] + if entry['swap_vs_ram'] > self.SWAP_CRITICAL_PERCENT and + current_time - entry['time'] <= self.SWAP_CRITICAL_DURATION + ) + + swap_warning = sum( + 1 for entry in self.state_history[state_key] + if entry['swap_percent'] > 0 and + current_time - entry['time'] <= self.SWAP_WARNING_DURATION + ) + + # Determine status + if mem_critical >= 2: + status = 'CRITICAL' + reason = f'RAM >{self.MEMORY_CRITICAL}% for {self.MEMORY_DURATION}s' + elif swap_critical >= 2: + status = 'CRITICAL' + reason = f'Swap >{self.SWAP_CRITICAL_PERCENT}% of RAM for {self.SWAP_CRITICAL_DURATION}s' + elif mem_warning >= 2: + status = 'WARNING' + reason = f'RAM >{self.MEMORY_WARNING}% for {self.MEMORY_DURATION}s' + elif swap_warning >= 2: + status = 'WARNING' + reason = f'Swap active for >{self.SWAP_WARNING_DURATION}s' + else: + status = 'OK' + reason = None + + result = { + 'status': status, + 'ram_percent': round(mem_percent, 1), + 'ram_available_gb': round(memory.available / (1024**3), 2), + 'swap_percent': round(swap_percent, 1), + 'swap_used_gb': round(swap.used / (1024**3), 2) + } + + if reason: + result['reason'] = reason + + return result + + except Exception as e: + return {'status': 'UNKNOWN', 'reason': f'Memory check failed: {str(e)}'} + + def _check_storage_comprehensive(self) -> Dict[str, Any]: + """ + Comprehensive storage check including filesystems, mount points, + LVM, and Proxmox storages. + """ + storage_results = {} + + # Check critical filesystems + critical_mounts = ['/', '/var', '/var/lib/vz'] + + for mount_point in critical_mounts: + if os.path.exists(mount_point): + fs_status = self._check_filesystem(mount_point) + storage_results[mount_point] = fs_status + + # Check all mounted filesystems + try: + partitions = psutil.disk_partitions() + for partition in partitions: + if partition.mountpoint not in critical_mounts: + try: + fs_status = self._check_filesystem(partition.mountpoint) + storage_results[partition.mountpoint] = fs_status + except PermissionError: + continue + except Exception as e: + storage_results['partitions_error'] = { + 'status': 'WARNING', + 'reason': f'Could not enumerate partitions: {str(e)}' + } + + # Check LVM (especially local-lvm) + lvm_status = self._check_lvm() + if lvm_status: + storage_results['lvm'] = lvm_status + + # Check Proxmox storages + pve_storages = self._check_proxmox_storages() + if pve_storages: + storage_results.update(pve_storages) + + return storage_results + + def _check_filesystem(self, mount_point: str) -> Dict[str, Any]: + """Check individual filesystem for space and mount status""" + try: + # Check if mounted + result = subprocess.run( + ['mountpoint', '-q', mount_point], + capture_output=True, + timeout=2 + ) + + if result.returncode != 0: + return { + 'status': 'CRITICAL', + 'reason': f'Not mounted' } + + # Check if read-only + with open('/proc/mounts', 'r') as f: + for line in f: + parts = line.split() + if len(parts) >= 4 and parts[1] == mount_point: + options = parts[3].split(',') + if 'ro' in options: + return { + 'status': 'CRITICAL', + 'reason': 'Mounted read-only' + } + + # Check disk usage + usage = psutil.disk_usage(mount_point) + percent = usage.percent + + if percent >= self.STORAGE_CRITICAL: + status = 'CRITICAL' + reason = f'{percent:.1f}% full (≥{self.STORAGE_CRITICAL}%)' + elif percent >= self.STORAGE_WARNING: + status = 'WARNING' + reason = f'{percent:.1f}% full (≥{self.STORAGE_WARNING}%)' + else: + status = 'OK' + reason = None + + result = { + 'status': status, + 'usage_percent': round(percent, 1), + 'free_gb': round(usage.free / (1024**3), 2), + 'total_gb': round(usage.total / (1024**3), 2) + } + + if reason: + result['reason'] = reason + + return result + except Exception as e: return { - 'category': 'System Events', - 'name': 'Error Logs', - 'status': 'warning', - 'value': 'Unknown', - 'message': f'Could not check system logs: {str(e)}', - 'details': {'error': str(e)} + 'status': 'WARNING', + 'reason': f'Check failed: {str(e)}' } - def _parse_zpool_status(self, output: str) -> List[Dict[str, Any]]: - """Parse zpool status output""" - pools = [] - current_pool = None - - for line in output.split('\n'): - line = line.strip() + def _check_lvm(self) -> Dict[str, Any]: + """Check LVM volumes, especially local-lvm""" + try: + result = subprocess.run( + ['lvs', '--noheadings', '--options', 'lv_name,vg_name,lv_attr'], + capture_output=True, + text=True, + timeout=3 + ) - if line.startswith('pool:'): - if current_pool: - pools.append(current_pool) - current_pool = {'name': line.split(':')[1].strip(), 'state': 'UNKNOWN', 'errors': 0} - elif line.startswith('state:') and current_pool: - current_pool['state'] = line.split(':')[1].strip() - elif 'errors:' in line.lower() and current_pool: + if result.returncode != 0: + return { + 'status': 'WARNING', + 'reason': 'LVM not available or no volumes' + } + + volumes = [] + local_lvm_found = False + + for line in result.stdout.strip().split('\n'): + if line.strip(): + parts = line.split() + if len(parts) >= 2: + lv_name = parts[0].strip() + vg_name = parts[1].strip() + volumes.append(f'{vg_name}/{lv_name}') + + if 'local-lvm' in lv_name or 'local-lvm' in vg_name: + local_lvm_found = True + + if not local_lvm_found and volumes: + return { + 'status': 'CRITICAL', + 'reason': 'local-lvm volume not found', + 'volumes': volumes + } + + return { + 'status': 'OK', + 'volumes': volumes + } + + except Exception as e: + return { + 'status': 'WARNING', + 'reason': f'LVM check failed: {str(e)}' + } + + def _check_proxmox_storages(self) -> Dict[str, Any]: + """Check Proxmox-specific storages (NFS, CIFS, PBS)""" + storages = {} + + try: + # Read Proxmox storage configuration + if os.path.exists('/etc/pve/storage.cfg'): + with open('/etc/pve/storage.cfg', 'r') as f: + current_storage = None + storage_type = None + + for line in f: + line = line.strip() + + if line.startswith('dir:') or line.startswith('nfs:') or \ + line.startswith('cifs:') or line.startswith('pbs:'): + parts = line.split(':', 1) + storage_type = parts[0] + current_storage = parts[1].strip() + elif line.startswith('path ') and current_storage: + path = line.split(None, 1)[1] + + if storage_type == 'dir': + if os.path.exists(path): + storages[f'storage_{current_storage}'] = { + 'status': 'OK', + 'type': 'dir', + 'path': path + } + else: + storages[f'storage_{current_storage}'] = { + 'status': 'CRITICAL', + 'reason': 'Directory does not exist', + 'type': 'dir', + 'path': path + } + + current_storage = None + storage_type = None + except Exception as e: + storages['pve_storage_config'] = { + 'status': 'WARNING', + 'reason': f'Could not read storage config: {str(e)}' + } + + return storages + + def _check_disks_io(self) -> Dict[str, Any]: + """Check disk I/O errors from dmesg (lightweight)""" + disks = {} + current_time = time.time() + + try: + # Only check dmesg for recent errors (last 2 seconds of kernel log) + result = subprocess.run( + ['dmesg', '-T', '--level=err,warn', '--since', '5 minutes ago'], + capture_output=True, + text=True, + timeout=2 + ) + + if result.returncode == 0: + io_errors = defaultdict(int) + + for line in result.stdout.split('\n'): + line_lower = line.lower() + if any(keyword in line_lower for keyword in ['i/o error', 'ata error', 'scsi error']): + # Extract disk name + for part in line.split(): + if part.startswith('sd') or part.startswith('nvme') or part.startswith('hd'): + disk_name = part.rstrip(':,') + io_errors[disk_name] += 1 + + # Track in history + self.io_error_history[disk_name].append(current_time) + + # Clean old history (keep last 5 minutes) + for disk in list(self.io_error_history.keys()): + self.io_error_history[disk] = [ + t for t in self.io_error_history[disk] + if current_time - t < 300 + ] + + error_count = len(self.io_error_history[disk]) + + if error_count >= 3: + disks[f'/dev/{disk}'] = { + 'status': 'CRITICAL', + 'reason': f'{error_count} I/O errors in 5 minutes' + } + elif error_count >= 1: + disks[f'/dev/{disk}'] = { + 'status': 'WARNING', + 'reason': f'{error_count} I/O error(s) in 5 minutes' + } + + # If no errors found, report OK + if not disks: + disks['status'] = 'OK' + + return disks + + except Exception as e: + return { + 'status': 'WARNING', + 'reason': f'Disk I/O check failed: {str(e)}' + } + + def _check_network_comprehensive(self) -> Dict[str, Any]: + """Check network interfaces, bridges, and connectivity""" + try: + issues = [] + interface_details = {} + + # Check interface status + net_if_stats = psutil.net_if_stats() + net_io = psutil.net_io_counters(pernic=True) + current_time = time.time() + + for interface, stats in net_if_stats.items(): + if interface == 'lo': + continue + + # Check if interface is down (excluding administratively down) + if not stats.isup: + # Check if it's a bridge or important interface + if interface.startswith('vmbr') or interface.startswith('eth') or interface.startswith('ens'): + issues.append(f'{interface} is DOWN') + interface_details[interface] = { + 'status': 'CRITICAL', + 'reason': 'Interface DOWN' + } + continue + + # Check bridge traffic (if no traffic for 10 minutes) + if interface.startswith('vmbr') and interface in net_io: + io_stats = net_io[interface] + + # Initialize baseline if not exists + if interface not in self.network_baseline: + self.network_baseline[interface] = { + 'rx_bytes': io_stats.bytes_recv, + 'tx_bytes': io_stats.bytes_sent, + 'time': current_time + } + else: + baseline = self.network_baseline[interface] + time_diff = current_time - baseline['time'] + + if time_diff >= self.NETWORK_INACTIVE_DURATION: + rx_diff = io_stats.bytes_recv - baseline['rx_bytes'] + tx_diff = io_stats.bytes_sent - baseline['tx_bytes'] + + if rx_diff == 0 and tx_diff == 0: + issues.append(f'{interface} no traffic for 10+ minutes') + interface_details[interface] = { + 'status': 'WARNING', + 'reason': 'No traffic for 10+ minutes' + } + + # Update baseline + self.network_baseline[interface] = { + 'rx_bytes': io_stats.bytes_recv, + 'tx_bytes': io_stats.bytes_sent, + 'time': current_time + } + + # Check gateway/DNS latency (lightweight, cached) + latency_status = self._check_network_latency() + if latency_status.get('status') != 'OK': + issues.append(latency_status.get('reason', 'Network latency issue')) + interface_details['connectivity'] = latency_status + + # Determine overall network status + if any('CRITICAL' in str(detail.get('status')) for detail in interface_details.values()): + status = 'CRITICAL' + reason = '; '.join(issues[:2]) + elif issues: + status = 'WARNING' + reason = '; '.join(issues[:2]) + else: + status = 'OK' + reason = None + + result = {'status': status} + if reason: + result['reason'] = reason + if interface_details: + result['interfaces'] = interface_details + + return result + + except Exception as e: + return { + 'status': 'WARNING', + 'reason': f'Network check failed: {str(e)}' + } + + def _check_network_latency(self) -> Dict[str, Any]: + """Check network latency to gateway/DNS (cached, max 1 check per minute)""" + cache_key = 'network_latency' + current_time = time.time() + + # Check cache + if cache_key in self.last_check_times: + if current_time - self.last_check_times[cache_key] < 60: + return self.cached_results.get(cache_key, {'status': 'OK'}) + + try: + # Ping default gateway or 1.1.1.1 + result = subprocess.run( + ['ping', '-c', '1', '-W', '1', '1.1.1.1'], + capture_output=True, + text=True, + timeout=self.NETWORK_TIMEOUT + ) + + if result.returncode == 0: + # Extract latency + for line in result.stdout.split('\n'): + if 'time=' in line: + try: + latency_str = line.split('time=')[1].split()[0] + latency = float(latency_str) + + if latency > self.NETWORK_LATENCY_CRITICAL: + status = 'CRITICAL' + reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_CRITICAL}ms' + elif latency > self.NETWORK_LATENCY_WARNING: + status = 'WARNING' + reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_WARNING}ms' + else: + status = 'OK' + reason = None + + latency_result = { + 'status': status, + 'latency_ms': round(latency, 1) + } + if reason: + latency_result['reason'] = reason + + self.cached_results[cache_key] = latency_result + self.last_check_times[cache_key] = current_time + return latency_result + except: + pass + + # Ping failed + packet_loss_result = { + 'status': 'CRITICAL', + 'reason': 'Packet loss or timeout' + } + self.cached_results[cache_key] = packet_loss_result + self.last_check_times[cache_key] = current_time + return packet_loss_result + + except Exception as e: + error_result = { + 'status': 'WARNING', + 'reason': f'Latency check failed: {str(e)}' + } + self.cached_results[cache_key] = error_result + self.last_check_times[cache_key] = current_time + return error_result + + def _check_vms_cts(self) -> Dict[str, Any]: + """Check VM and CT status for unexpected stops""" + try: + issues = [] + vm_details = {} + + # Check VMs + try: + result = subprocess.run( + ['qm', 'list'], + capture_output=True, + text=True, + timeout=3 + ) + + if result.returncode == 0: + for line in result.stdout.strip().split('\n')[1:]: + if line.strip(): + parts = line.split() + if len(parts) >= 3: + vmid = parts[0] + vm_status = parts[2] + + if vm_status == 'stopped': + # Check if unexpected (this is simplified, would need autostart config) + vm_details[f'vm_{vmid}'] = { + 'status': 'WARNING', + 'reason': 'VM stopped' + } + issues.append(f'VM {vmid} stopped') + except Exception as e: + vm_details['vms_check'] = { + 'status': 'WARNING', + 'reason': f'Could not check VMs: {str(e)}' + } + + # Check CTs + try: + result = subprocess.run( + ['pct', 'list'], + capture_output=True, + text=True, + timeout=3 + ) + + if result.returncode == 0: + for line in result.stdout.strip().split('\n')[1:]: + if line.strip(): + parts = line.split() + if len(parts) >= 2: + ctid = parts[0] + ct_status = parts[1] + + if ct_status == 'stopped': + vm_details[f'ct_{ctid}'] = { + 'status': 'WARNING', + 'reason': 'CT stopped' + } + issues.append(f'CT {ctid} stopped') + except Exception as e: + vm_details['cts_check'] = { + 'status': 'WARNING', + 'reason': f'Could not check CTs: {str(e)}' + } + + # Determine overall status + if issues: + status = 'WARNING' + reason = '; '.join(issues[:3]) + else: + status = 'OK' + reason = None + + result = {'status': status} + if reason: + result['reason'] = reason + if vm_details: + result['details'] = vm_details + + return result + + except Exception as e: + return { + 'status': 'WARNING', + 'reason': f'VM/CT check failed: {str(e)}' + } + + def _check_pve_services(self) -> Dict[str, Any]: + """Check critical Proxmox services""" + try: + failed_services = [] + + for service in self.PVE_SERVICES: try: - error_part = line.split(':')[1].strip() - if error_part.lower() != 'no known data errors': - current_pool['errors'] = int(error_part.split()[0]) - except: - pass + result = subprocess.run( + ['systemctl', 'is-active', service], + capture_output=True, + text=True, + timeout=2 + ) + + if result.returncode != 0 or result.stdout.strip() != 'active': + failed_services.append(service) + except Exception: + failed_services.append(service) + + if failed_services: + return { + 'status': 'CRITICAL', + 'reason': f'Services inactive: {", ".join(failed_services)}', + 'failed': failed_services + } + + return {'status': 'OK'} + + except Exception as e: + return { + 'status': 'WARNING', + 'reason': f'Service check failed: {str(e)}' + } + + def _check_logs_lightweight(self) -> Dict[str, Any]: + """Lightweight log analysis (cached, checked every 5 minutes)""" + cache_key = 'logs_analysis' + current_time = time.time() - if current_pool: - pools.append(current_pool) + # Check cache + if cache_key in self.last_check_times: + if current_time - self.last_check_times[cache_key] < self.LOG_CHECK_INTERVAL: + return self.cached_results.get(cache_key, {'status': 'OK'}) - return pools + try: + # Check journalctl for recent errors and warnings + result = subprocess.run( + ['journalctl', '--since', '5 minutes ago', '--no-pager', '-p', 'warning'], + capture_output=True, + text=True, + timeout=3 + ) + + if result.returncode == 0: + lines = result.stdout.strip().split('\n') + + errors_5m = 0 + warnings_5m = 0 + critical_keywords_found = [] + + for line in lines: + line_lower = line.lower() + + # Check for critical keywords + for keyword in self.CRITICAL_LOG_KEYWORDS: + if keyword.lower() in line_lower: + critical_keywords_found.append(keyword) + errors_5m += 1 + break + else: + # Count errors and warnings + if 'error' in line_lower or 'critical' in line_lower or 'fatal' in line_lower: + errors_5m += 1 + elif 'warning' in line_lower or 'warn' in line_lower: + warnings_5m += 1 + + # Determine status + if critical_keywords_found: + status = 'CRITICAL' + reason = f'Critical errors: {", ".join(set(critical_keywords_found[:3]))}' + elif errors_5m >= self.LOG_ERRORS_CRITICAL: + status = 'CRITICAL' + reason = f'{errors_5m} errors in 5 minutes (≥{self.LOG_ERRORS_CRITICAL})' + elif warnings_5m >= self.LOG_WARNINGS_CRITICAL: + status = 'CRITICAL' + reason = f'{warnings_5m} warnings in 5 minutes (≥{self.LOG_WARNINGS_CRITICAL})' + elif errors_5m >= self.LOG_ERRORS_WARNING: + status = 'WARNING' + reason = f'{errors_5m} errors in 5 minutes' + elif warnings_5m >= self.LOG_WARNINGS_WARNING: + status = 'WARNING' + reason = f'{warnings_5m} warnings in 5 minutes' + else: + status = 'OK' + reason = None + + log_result = { + 'status': status, + 'errors_5m': errors_5m, + 'warnings_5m': warnings_5m + } + if reason: + log_result['reason'] = reason + + self.cached_results[cache_key] = log_result + self.last_check_times[cache_key] = current_time + return log_result + + ok_result = {'status': 'OK'} + self.cached_results[cache_key] = ok_result + self.last_check_times[cache_key] = current_time + return ok_result + + except Exception as e: + error_result = { + 'status': 'WARNING', + 'reason': f'Log check failed: {str(e)}' + } + self.cached_results[cache_key] = error_result + self.last_check_times[cache_key] = current_time + return error_result + + def _check_security(self) -> Dict[str, Any]: + """Check security-related items (fail2ban, certificates, uptime)""" + try: + issues = [] + + # Check fail2ban + try: + result = subprocess.run( + ['systemctl', 'is-active', 'fail2ban'], + capture_output=True, + text=True, + timeout=2 + ) + + if result.returncode != 0 or result.stdout.strip() != 'active': + issues.append('fail2ban inactive') + except Exception: + pass + + # Check uptime (warning if >180 days) + try: + uptime_seconds = time.time() - psutil.boot_time() + uptime_days = uptime_seconds / 86400 + + if uptime_days > 180: + issues.append(f'Uptime {int(uptime_days)} days (>180)') + except Exception: + pass + + # Check SSL certificates (cached, checked once per day) + cert_status = self._check_certificates() + if cert_status.get('status') != 'OK': + issues.append(cert_status.get('reason', 'Certificate issue')) + + if issues: + return { + 'status': 'WARNING', + 'reason': '; '.join(issues[:2]) + } + + return {'status': 'OK'} + + except Exception as e: + return { + 'status': 'WARNING', + 'reason': f'Security check failed: {str(e)}' + } + + def _check_certificates(self) -> Dict[str, Any]: + """Check SSL certificate expiration (cached, checked once per day)""" + cache_key = 'certificates' + current_time = time.time() + + # Check cache (24 hours) + if cache_key in self.last_check_times: + if current_time - self.last_check_times[cache_key] < 86400: + return self.cached_results.get(cache_key, {'status': 'OK'}) + + try: + # Check PVE certificate + cert_path = '/etc/pve/local/pve-ssl.pem' + + if os.path.exists(cert_path): + result = subprocess.run( + ['openssl', 'x509', '-enddate', '-noout', '-in', cert_path], + capture_output=True, + text=True, + timeout=2 + ) + + if result.returncode == 0: + # Parse expiration date + date_str = result.stdout.strip().replace('notAfter=', '') + + try: + from datetime import datetime + exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z') + days_until_expiry = (exp_date - datetime.now()).days + + if days_until_expiry < 0: + status = 'CRITICAL' + reason = 'Certificate expired' + elif days_until_expiry < 15: + status = 'WARNING' + reason = f'Certificate expires in {days_until_expiry} days' + else: + status = 'OK' + reason = None + + cert_result = {'status': status} + if reason: + cert_result['reason'] = reason + + self.cached_results[cache_key] = cert_result + self.last_check_times[cache_key] = current_time + return cert_result + except Exception: + pass + + ok_result = {'status': 'OK'} + self.cached_results[cache_key] = ok_result + self.last_check_times[cache_key] = current_time + return ok_result + + except Exception: + ok_result = {'status': 'OK'} + self.cached_results[cache_key] = ok_result + self.last_check_times[cache_key] = current_time + return ok_result + # Global instance health_monitor = HealthMonitor()