""" Health Monitor Module Provides comprehensive health checks for the Proxmox system including: - CPU and Memory usage - Storage health (pools, disks, remote storage) - Network health (interface errors) - VM status - System events/logs errors """ import psutil import subprocess import json from typing import Dict, List, Any class HealthMonitor: """Monitors system health across multiple components""" # Thresholds CPU_WARNING = 75 CPU_CRITICAL = 90 MEMORY_WARNING = 75 MEMORY_CRITICAL = 90 def __init__(self): self.checks = [] def get_overall_status(self) -> Dict[str, Any]: """Get overall health status summary""" checks = self.run_all_checks() # Determine overall status critical_count = sum(1 for c in checks if c['status'] == 'critical') warning_count = sum(1 for c in checks if c['status'] == 'warning') if critical_count > 0: overall_status = 'critical' elif warning_count > 0: overall_status = 'warning' else: overall_status = 'healthy' return { 'status': overall_status, 'critical_count': critical_count, 'warning_count': warning_count, 'healthy_count': len(checks) - critical_count - warning_count, 'total_checks': len(checks), 'timestamp': psutil.boot_time() } def get_detailed_status(self) -> Dict[str, Any]: """Get detailed health status with all checks""" checks = self.run_all_checks() overall = self.get_overall_status() return { 'overall': overall, 'checks': checks } def run_all_checks(self) -> List[Dict[str, Any]]: """Run all health checks and return results""" checks = [] # CPU Check checks.append(self.check_cpu()) # Memory Check checks.append(self.check_memory()) # Storage Checks checks.extend(self.check_storage()) # Network Checks checks.extend(self.check_network()) # VM Checks checks.extend(self.check_vms()) # Events/Logs Check checks.append(self.check_events()) return checks def check_cpu(self) -> Dict[str, Any]: """Check CPU usage""" cpu_percent = psutil.cpu_percent(interval=1) if cpu_percent >= self.CPU_CRITICAL: status = 'critical' message = f'CPU usage is critically high at {cpu_percent:.1f}%' elif cpu_percent >= self.CPU_WARNING: status = 'warning' message = f'CPU usage is elevated at {cpu_percent:.1f}%' else: status = 'healthy' message = f'CPU usage is normal at {cpu_percent:.1f}%' return { 'category': 'System', 'name': 'CPU Usage', 'status': status, 'value': f'{cpu_percent:.1f}%', 'message': message, 'details': { 'usage': cpu_percent, 'cores': psutil.cpu_count(), 'warning_threshold': self.CPU_WARNING, 'critical_threshold': self.CPU_CRITICAL } } def check_memory(self) -> Dict[str, Any]: """Check memory usage""" memory = psutil.virtual_memory() mem_percent = memory.percent if mem_percent >= self.MEMORY_CRITICAL: status = 'critical' message = f'Memory usage is critically high at {mem_percent:.1f}%' elif mem_percent >= self.MEMORY_WARNING: status = 'warning' message = f'Memory usage is elevated at {mem_percent:.1f}%' else: status = 'healthy' message = f'Memory usage is normal at {mem_percent:.1f}%' return { 'category': 'System', 'name': 'Memory Usage', 'status': status, 'value': f'{mem_percent:.1f}%', 'message': message, 'details': { 'usage': mem_percent, 'total': memory.total, 'available': memory.available, 'used': memory.used, 'warning_threshold': self.MEMORY_WARNING, 'critical_threshold': self.MEMORY_CRITICAL } } def check_storage(self) -> List[Dict[str, Any]]: """Check storage health including ZFS pools and disks""" checks = [] # Check ZFS pools try: result = subprocess.run(['zpool', 'status'], capture_output=True, text=True, timeout=5) if result.returncode == 0: output = result.stdout # Parse pool status pools = self._parse_zpool_status(output) for pool in pools: if pool['state'] == 'DEGRADED': status = 'critical' message = f"Pool '{pool['name']}' is degraded" elif pool['state'] == 'FAULTED': status = 'critical' message = f"Pool '{pool['name']}' is faulted" elif pool['state'] == 'OFFLINE': status = 'critical' message = f"Pool '{pool['name']}' is offline" elif pool['errors'] > 0: status = 'warning' message = f"Pool '{pool['name']}' has {pool['errors']} errors" else: status = 'healthy' message = f"Pool '{pool['name']}' is healthy" checks.append({ 'category': 'Storage', 'name': f"ZFS Pool: {pool['name']}", 'status': status, 'value': pool['state'], 'message': message, 'details': pool }) except Exception as e: checks.append({ 'category': 'Storage', 'name': 'ZFS Pools', 'status': 'warning', 'value': 'Unknown', 'message': f'Could not check ZFS pools: {str(e)}', 'details': {'error': str(e)} }) # Check disk partitions partitions = psutil.disk_partitions() for partition in partitions: try: usage = psutil.disk_usage(partition.mountpoint) percent = usage.percent if percent >= 95: status = 'critical' message = f"Disk '{partition.mountpoint}' is critically full at {percent:.1f}%" elif percent >= 85: status = 'warning' message = f"Disk '{partition.mountpoint}' is getting full at {percent:.1f}%" else: status = 'healthy' message = f"Disk '{partition.mountpoint}' has sufficient space ({percent:.1f}% used)" checks.append({ 'category': 'Storage', 'name': f"Disk: {partition.mountpoint}", 'status': status, 'value': f'{percent:.1f}%', 'message': message, 'details': { 'device': partition.device, 'mountpoint': partition.mountpoint, 'fstype': partition.fstype, 'total': usage.total, 'used': usage.used, 'free': usage.free, 'percent': percent } }) except PermissionError: continue return checks def check_network(self) -> List[Dict[str, Any]]: """Check network interface health (errors, not inactive interfaces)""" checks = [] # Get network interface stats net_io = psutil.net_io_counters(pernic=True) net_if_stats = psutil.net_if_stats() for interface, stats in net_io.items(): # Skip loopback if interface == 'lo': continue # Only check active interfaces if interface in net_if_stats and net_if_stats[interface].isup: errors = stats.errin + stats.errout drops = stats.dropin + stats.dropout if errors > 100 or drops > 100: status = 'critical' message = f"Interface '{interface}' has {errors} errors and {drops} dropped packets" elif errors > 10 or drops > 10: status = 'warning' message = f"Interface '{interface}' has {errors} errors and {drops} dropped packets" else: status = 'healthy' message = f"Interface '{interface}' is operating normally" checks.append({ 'category': 'Network', 'name': f"Interface: {interface}", 'status': status, 'value': 'Active', 'message': message, 'details': { 'errors_in': stats.errin, 'errors_out': stats.errout, 'drops_in': stats.dropin, 'drops_out': stats.dropout, 'bytes_sent': stats.bytes_sent, 'bytes_recv': stats.bytes_recv } }) return checks def check_vms(self) -> List[Dict[str, Any]]: """Check VM status""" checks = [] try: # Get VM list from qm result = subprocess.run(['qm', 'list'], capture_output=True, text=True, timeout=5) if result.returncode == 0: lines = result.stdout.strip().split('\n')[1:] # Skip header running_count = 0 stopped_count = 0 error_count = 0 for line in lines: if line.strip(): parts = line.split() if len(parts) >= 3: vm_status = parts[2] if vm_status == 'running': running_count += 1 elif vm_status == 'stopped': stopped_count += 1 else: error_count += 1 if error_count > 0: status = 'warning' message = f'{error_count} VMs in unexpected state' else: status = 'healthy' message = f'{running_count} running, {stopped_count} stopped' checks.append({ 'category': 'Virtual Machines', 'name': 'VM Status', 'status': status, 'value': f'{running_count + stopped_count} total', 'message': message, 'details': { 'running': running_count, 'stopped': stopped_count, 'errors': error_count } }) except Exception as e: checks.append({ 'category': 'Virtual Machines', 'name': 'VM Status', 'status': 'warning', 'value': 'Unknown', 'message': f'Could not check VM status: {str(e)}', 'details': {'error': str(e)} }) return checks def check_events(self) -> Dict[str, Any]: """Check system events/logs for errors""" try: # Check journalctl for recent errors result = subprocess.run( ['journalctl', '-p', 'err', '-n', '100', '--no-pager'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: error_lines = [line for line in result.stdout.split('\n') if line.strip()] error_count = len(error_lines) if error_count > 50: status = 'critical' message = f'{error_count} errors in recent logs' elif error_count > 10: status = 'warning' message = f'{error_count} errors in recent logs' else: status = 'healthy' message = f'{error_count} errors in recent logs (normal)' return { 'category': 'System Events', 'name': 'Error Logs', 'status': status, 'value': f'{error_count} errors', 'message': message, 'details': { 'error_count': error_count, 'recent_errors': error_lines[:5] # Last 5 errors } } except Exception as e: return { 'category': 'System Events', 'name': 'Error Logs', 'status': 'warning', 'value': 'Unknown', 'message': f'Could not check system logs: {str(e)}', 'details': {'error': str(e)} } def _parse_zpool_status(self, output: str) -> List[Dict[str, Any]]: """Parse zpool status output""" pools = [] current_pool = None for line in output.split('\n'): line = line.strip() if line.startswith('pool:'): if current_pool: pools.append(current_pool) current_pool = {'name': line.split(':')[1].strip(), 'state': 'UNKNOWN', 'errors': 0} elif line.startswith('state:') and current_pool: current_pool['state'] = line.split(':')[1].strip() elif 'errors:' in line.lower() and current_pool: try: error_part = line.split(':')[1].strip() if error_part.lower() != 'no known data errors': current_pool['errors'] = int(error_part.split()[0]) except: pass if current_pool: pools.append(current_pool) return pools # Global instance health_monitor = HealthMonitor()