diff --git a/AppImage/scripts/hardware_monitor.py b/AppImage/scripts/hardware_monitor.py index 8015c743..11bc41d3 100644 --- a/AppImage/scripts/hardware_monitor.py +++ b/AppImage/scripts/hardware_monitor.py @@ -1,7 +1,12 @@ #!/usr/bin/env python3 """ Hardware Monitor - Detección exhaustiva de hardware -Agrega CPU (RAPL), RAM, Placa base, GPU (Intel/NVIDIA/AMD), IPMI y UPS. +Fusiona: +1. Consumo de CPU (RAPL) +2. Detección de GPU (Intel/NVIDIA/AMD) y métricas detalladas +3. Controladoras HBA/RAID y sus temperaturas +4. Sensores IPMI (Ventiladores/Energía) y UPS (NUT) +5. Información base (CPU, RAM, Placa base) """ import os @@ -25,8 +30,7 @@ def identify_gpu_type(name, vendor=None, bus=None, driver=None): n = (name or "").lower() v = (vendor or "").lower() d = (driver or "").lower() - b = (bus or "") - + bmc_keywords = ['aspeed', 'ast', 'matrox g200', 'g200e', 'mgag200'] if any(k in n for k in bmc_keywords) or v in ['aspeed', 'matrox']: return 'Integrated' @@ -83,37 +87,55 @@ def get_intel_gpu_processes_from_text(): # --- Funciones Principales de GPU --- -def get_gpu_info(): - """Detecta GPUs instaladas usando lspci y sensors.""" - gpus = [] - - # 1. Detección por lspci +def get_pci_gpu_map() -> Dict[str, Dict[str, str]]: + """ + Obtiene un mapa detallado de GPUs desde lspci. + Útil para enriquecer datos con nombres completos de dispositivos. + """ + gpu_map = {} try: - result = subprocess.run(['lspci'], capture_output=True, text=True, timeout=5) + result = subprocess.run(['lspci', '-nn'], capture_output=True, text=True, timeout=5) if result.returncode == 0: for line in result.stdout.split('\n'): if any(k in line for k in ['VGA compatible', '3D controller', 'Display controller']): - parts = line.split(' ', 1) - if len(parts) >= 2: - slot = parts[0].strip() - rest = parts[1] - name = rest.split(':', 1)[1].strip() if ':' in rest else rest.strip() - + match = re.match(r'^([0-9a-f]{2}:[0-9a-f]{2}\.[0-9a-f])\s+.*:\s+(.+?)\s+\[([0-9a-f]{4}):([0-9a-f]{4})\]', line) + if match: + pci = match.group(1) + name = match.group(2).strip() vendor = 'Unknown' if 'NVIDIA' in name.upper(): vendor = 'NVIDIA' elif 'AMD' in name.upper() or 'ATI' in name.upper(): vendor = 'AMD' elif 'INTEL' in name.upper(): vendor = 'Intel' - gpus.append({ - 'slot': slot, - 'name': name, - 'vendor': vendor, - 'type': identify_gpu_type(name, vendor) - }) + gpu_map[pci] = {'vendor': vendor, 'name': name, 'full_name': line} + except Exception: pass + return gpu_map + +def get_gpu_info(): + """Detecta GPUs instaladas para la API.""" + gpus = [] + try: + res = subprocess.run(['lspci'], capture_output=True, text=True) + for line in res.stdout.split('\n'): + if any(x in line for x in ['VGA', '3D', 'Display']): + parts = line.split(' ', 1) + if len(parts) >= 2: + slot = parts[0] + rest = parts[1] + name = rest.split(':', 1)[1].strip() if ':' in rest else rest.strip() + + vendor = 'Unknown' + if 'NVIDIA' in name.upper(): vendor = 'NVIDIA' + elif 'AMD' in name.upper(): vendor = 'AMD' + elif 'INTEL' in name.upper(): vendor = 'Intel' + + gpus.append({ + 'slot': slot, + 'name': name, + 'vendor': vendor, + 'type': identify_gpu_type(name, vendor) + }) except: pass - - # 2. Enriquecer con datos básicos de sensores (temperatura/fan) si están disponibles via 'sensors' - # (Lógica simplificada para no extender demasiado el código) return gpus def get_detailed_gpu_info(gpu): @@ -138,25 +160,19 @@ def get_detailed_gpu_info(gpu): gpu_elem = root.find('gpu') if gpu_elem: info['has_monitoring_tool'] = True - # Temp temp = gpu_elem.find('.//temperature/gpu_temp') if temp is not None: info['temperature'] = int(temp.text.replace(' C', '')) - # Fan fan = gpu_elem.find('.//fan_speed') if fan is not None and fan.text != 'N/A': info['fan_speed'] = int(fan.text.replace(' %', '')) - # Power power = gpu_elem.find('.//gpu_power_readings/instant_power_draw') if power is not None and power.text != 'N/A': info['power_draw'] = power.text - # Util util = gpu_elem.find('.//utilization/gpu_util') if util is not None: info['utilization_gpu'] = util.text - # Mem mem_used = gpu_elem.find('.//fb_memory_usage/used') if mem_used is not None: info['memory_used'] = mem_used.text mem_total = gpu_elem.find('.//fb_memory_usage/total') if mem_total is not None: info['memory_total'] = mem_total.text - # Processes procs = gpu_elem.find('.//processes') if procs is not None: for p in procs.findall('process_info'): @@ -203,20 +219,16 @@ def get_detailed_gpu_info(gpu): data = json_objs[-1] info['has_monitoring_tool'] = True - # Motores if 'engines' in data: - # Calcular uso total (maximo de cualquier motor) max_usage = 0.0 for k, v in data['engines'].items(): val = float(v.get('busy', 0)) if val > max_usage: max_usage = val info['utilization_gpu'] = f"{max_usage:.1f}%" - # Power (Package) if 'power' in data: info['power_draw'] = f"{data['power'].get('Package', 0):.2f} W" - # Frequency if 'frequency' in data: info['clock_graphics'] = f"{data['frequency'].get('actual', 0)} MHz" except: @@ -241,80 +253,9 @@ def get_gpu_realtime_data(slot): return target return None -# --- IPMI, UPS y RAPL --- +# --- RAPL Power (CPU) --- -def get_ipmi_fans(): - """Obtiene ventiladores via ipmitool.""" - fans = [] - if shutil.which('ipmitool'): - try: - res = subprocess.run(['ipmitool', 'sensor'], capture_output=True, text=True, timeout=5) - if res.returncode == 0: - for line in res.stdout.split('\n'): - if 'fan' in line.lower() and '|' in line: - p = line.split('|') - if len(p) >= 3: - try: - val = float(p[1].strip()) - fans.append({'name': p[0].strip(), 'speed': val, 'unit': p[2].strip()}) - except: continue - except: pass - return fans - -def get_ipmi_power(): - """Obtiene datos de energía via ipmitool.""" - power = {'supplies': [], 'meter': None} - if shutil.which('ipmitool'): - try: - res = subprocess.run(['ipmitool', 'sensor'], capture_output=True, text=True, timeout=5) - for line in res.stdout.split('\n'): - lower = line.lower() - if ('power supply' in lower or 'power meter' in lower) and '|' in line: - p = line.split('|') - try: - val = float(p[1].strip()) - unit = p[2].strip() if len(p) > 2 else '' - if 'power meter' in lower: - power['meter'] = {'name': p[0].strip(), 'watts': val, 'unit': unit} - else: - power['supplies'].append({'name': p[0].strip(), 'watts': val, 'unit': unit}) - except: continue - except: pass - return power - -def get_ups_info(): - """Obtiene datos de SAI/UPS via NUT (upsc).""" - ups_list = [] - if shutil.which('upsc'): - try: - # Detectar UPS configurados - res = subprocess.run(['upsc', '-l'], capture_output=True, text=True, timeout=5) - if res.returncode == 0: - for ups_name in res.stdout.strip().split('\n'): - if not ups_name: continue - - data = {'name': ups_name, 'connection_type': 'Local'} - det_res = subprocess.run(['upsc', ups_name], capture_output=True, text=True, timeout=5) - if det_res.returncode == 0: - for line in det_res.stdout.split('\n'): - if ':' in line: - k, v = line.split(':', 1) - k, v = k.strip(), v.strip() - - if k == 'device.model': data['model'] = v - elif k == 'device.mfr': data['manufacturer'] = v - elif k == 'battery.charge': data['battery_charge'] = f"{v}%" - elif k == 'ups.load': data['load_percent'] = f"{v}%" - elif k == 'ups.status': data['status'] = v - elif k == 'battery.runtime': - try: data['time_left'] = f"{int(v)//60} min" - except: data['time_left'] = v - - ups_list.append(data) - except: pass - return ups_list - -def get_power_info(): +def get_power_info() -> Optional[Dict[str, Any]]: """Obtiene consumo de CPU Intel via RAPL.""" global _last_energy_reading rapl_path = '/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj' @@ -328,35 +269,147 @@ def get_power_info(): if _last_energy_reading['energy_uj'] and _last_energy_reading['timestamp']: tdiff = current_time - _last_energy_reading['timestamp'] ediff = current_uj - _last_energy_reading['energy_uj'] - if tdiff > 0 and ediff >= 0: + if tdiff > 0: + if ediff < 0: ediff = current_uj # Overflow handling watts = round((ediff / tdiff) / 1000000, 2) _last_energy_reading = {'energy_uj': current_uj, 'timestamp': current_time} - return {'name': 'CPU RAPL', 'watts': watts, 'adapter': 'Intel RAPL'} + + cpu_vendor = 'CPU' + try: + with open('/proc/cpuinfo', 'r') as f: + if 'GenuineIntel' in f.read(): cpu_vendor = 'Intel' + else: cpu_vendor = 'AMD' + except: pass + + return {'name': 'CPU Power', 'watts': watts, 'adapter': f'{cpu_vendor} RAPL'} except: pass return None -def get_hba_temperatures(): - """Detecta temperaturas de controladoras HBA (LSI/Broadcom).""" - # Implementación simplificada - return [] +# --- HBA / RAID Logic --- -# --- Función Agregadora Principal --- +def get_hba_info() -> list[Dict[str, Any]]: + """Detecta controladoras HBA/RAID.""" + hba_list = [] + try: + result = subprocess.run(['lspci', '-nn'], capture_output=True, text=True, timeout=5) + if result.returncode == 0: + controller_id = 0 + for line in result.stdout.split('\n'): + if any(k in line for k in ['RAID bus controller', 'SCSI storage controller', 'Serial Attached SCSI']): + match = re.match(r'^([0-9a-f]{2}:[0-9a-f]{2}\.[0-9a-f])\s+.*:\s+(.+?)\s+\[([0-9a-f]{4}):([0-9a-f]{4})\]', line) + if match: + pci = match.group(1) + name = match.group(2).strip() + vendor = 'Unknown' + if 'LSI' in name.upper() or 'BROADCOM' in name.upper() or 'AVAGO' in name.upper(): vendor = 'LSI/Broadcom' + elif 'ADAPTEC' in name.upper(): vendor = 'Adaptec' + elif 'HP' in name.upper(): vendor = 'HP' + elif 'DELL' in name.upper(): vendor = 'Dell' + + model = name + for v in ['Broadcom / LSI', 'Broadcom', 'LSI Logic', 'LSI']: + if model.startswith(v): model = model[len(v):].strip() + + hba_list.append({ + 'pci_address': pci, 'vendor': vendor, 'model': model, + 'controller_id': controller_id, 'full_name': name + }) + controller_id += 1 + except: pass + return hba_list + +def get_hba_temperatures() -> list[Dict[str, Any]]: + """Obtiene temperaturas de HBA (storcli/megacli).""" + temperatures = [] + storcli_paths = ['/usr/sbin/storcli64', '/opt/MegaRAID/storcli/storcli64', 'storcli64'] + storcli = next((p for p in storcli_paths if shutil.which(p) or os.path.exists(p)), None) + + if storcli: + try: + # Intenta leer el controlador 0 como ejemplo básico + res = subprocess.run([storcli, '/c0', 'show', 'temperature'], capture_output=True, text=True, timeout=5) + for line in res.stdout.split('\n'): + if 'ROC temperature' in line or 'Controller Temp' in line: + match = re.search(r'(\d+)\s*C', line) + if match: + temperatures.append({ + 'name': 'HBA Controller 0', + 'temperature': int(match.group(1)), + 'adapter': 'LSI/Broadcom' + }) + except: pass + return temperatures + +# --- IPMI & UPS --- + +def get_ipmi_fans(): + """Obtiene ventiladores via ipmitool.""" + fans = [] + if shutil.which('ipmitool'): + try: + res = subprocess.run(['ipmitool', 'sensor'], capture_output=True, text=True, timeout=5) + for line in res.stdout.split('\n'): + if 'fan' in line.lower() and '|' in line: + p = line.split('|') + try: fans.append({'name': p[0].strip(), 'speed': float(p[1].strip()), 'unit': p[2].strip()}) + except: continue + except: pass + return fans + +def get_ipmi_power(): + """Obtiene datos de energía IPMI.""" + power = {'supplies': [], 'meter': None} + if shutil.which('ipmitool'): + try: + res = subprocess.run(['ipmitool', 'sensor'], capture_output=True, text=True, timeout=5) + for line in res.stdout.split('\n'): + lower = line.lower() + if ('power supply' in lower or 'power meter' in lower) and '|' in line: + p = line.split('|') + try: + val = float(p[1].strip()) + unit = p[2].strip() + if 'power meter' in lower: + power['meter'] = {'name': p[0].strip(), 'watts': val, 'unit': unit} + else: + power['supplies'].append({'name': p[0].strip(), 'watts': val, 'unit': unit}) + except: continue + except: pass + return power + +def get_ups_info(): + """Obtiene datos de UPS via NUT.""" + ups_list = [] + if shutil.which('upsc'): + try: + res = subprocess.run(['upsc', '-l'], capture_output=True, text=True, timeout=5) + for ups in res.stdout.strip().split('\n'): + if ups: + data = {'name': ups, 'connection_type': 'Local'} + d_res = subprocess.run(['upsc', ups], capture_output=True, text=True, timeout=5) + for line in d_res.stdout.split('\n'): + if ':' in line: + k, v = line.split(':', 1) + data[k.strip()] = v.strip() + ups_list.append(data) + except: pass + return ups_list + +# --- Main Hardware Aggregator --- def get_hardware_info(): - """ - Retorna un objeto JSON completo con todo el hardware detectado. - Usado por la ruta /api/hardware. - """ + """Agrega toda la información de hardware para la API.""" data = { 'cpu': {}, 'motherboard': {}, 'memory_modules': [], - 'storage_devices': [], 'pci_devices': [], + 'storage_devices': [], 'pci_devices': [], 'gpus': get_gpu_info(), 'ipmi_fans': get_ipmi_fans(), 'ipmi_power': get_ipmi_power(), 'ups': get_ups_info(), 'power_meter': get_power_info(), - 'sensors': {'fans': [], 'temperatures': []} + 'hba': get_hba_info(), + 'sensors': {'fans': [], 'temperatures': get_hba_temperatures()} } # CPU Info @@ -367,7 +420,7 @@ def get_hardware_info(): if 'Socket(s):' in line: data['cpu']['sockets'] = line.split(':', 1)[1].strip() except: pass - # Motherboard Info + # Motherboard try: res = subprocess.run(['dmidecode', '-t', 'baseboard'], capture_output=True, text=True) for line in res.stdout.split('\n'): @@ -375,7 +428,7 @@ def get_hardware_info(): if 'Manufacturer:' in line: data['motherboard']['manufacturer'] = line.split(':', 1)[1].strip() except: pass - # RAM Info + # RAM try: res = subprocess.run(['dmidecode', '-t', 'memory'], capture_output=True, text=True) mod = {} @@ -396,9 +449,7 @@ def get_hardware_info(): if mod.get('size', 0) > 0: data['memory_modules'].append(mod) except: pass - # Enriquecer GPUs con datos detallados (solo si hay pocas para no bloquear) - # Para la vista general, a veces es mejor no llamar a nvidia-smi por cada tarjeta si hay muchas - # Aquí lo hacemos porque suele ser rápido. + # Enrich GPUs with details for gpu in data['gpus']: gpu.update(get_detailed_gpu_info(gpu))