#!/usr/bin/env python3
"""
ProxMenux Flask Server
Provides REST API endpoints for Proxmox monitoring data
Runs on port 8008 and serves system metrics, storage info, network stats, etc.
Also serves the Next.js dashboard as static files
"""
from flask import Flask, jsonify, request, send_from_directory, send_file
from flask_cors import CORS
import psutil
import subprocess
import json
import os
import time
import socket
from datetime import datetime, timedelta
import re # Added for regex matching
app = Flask(__name__)
CORS(app) # Enable CORS for Next.js frontend
def extract_vmid_from_interface(interface_name):
"""Extract VMID from virtual interface name (veth100i0 -> 100, tap105i0 -> 105)"""
try:
match = re.match(r'(veth|tap)(\d+)i\d+', interface_name)
if match:
vmid = int(match.group(2))
interface_type = 'lxc' if match.group(1) == 'veth' else 'vm'
return vmid, interface_type
return None, None
except Exception as e:
print(f"[v0] Error extracting VMID from {interface_name}: {e}")
return None, None
def get_vm_lxc_names():
"""Get VM and LXC names from Proxmox API"""
vm_lxc_map = {}
try:
result = subprocess.run(['pvesh', 'get', '/cluster/resources', '--type', 'vm', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
resources = json.loads(result.stdout)
for resource in resources:
vmid = resource.get('vmid')
name = resource.get('name', f'VM-{vmid}')
vm_type = resource.get('type', 'unknown') # 'qemu' or 'lxc'
status = resource.get('status', 'unknown')
if vmid:
vm_lxc_map[vmid] = {
'name': name,
'type': 'lxc' if vm_type == 'lxc' else 'vm',
'status': status
}
print(f"[v0] Found {vm_type} {vmid}: {name} ({status})")
else:
print(f"[v0] pvesh command failed: {result.stderr}")
except FileNotFoundError:
print("[v0] pvesh command not found - Proxmox not installed")
except Exception as e:
print(f"[v0] Error getting VM/LXC names: {e}")
return vm_lxc_map
@app.route('/')
def serve_dashboard():
"""Serve the main dashboard page from Next.js build"""
try:
# Detectar si estamos ejecutándose desde AppImage
appimage_root = os.environ.get('APPDIR')
if not appimage_root:
# Fallback: intentar detectar desde la ubicación del script
base_dir = os.path.dirname(os.path.abspath(__file__))
appimage_root = os.path.dirname(base_dir) # Subir un nivel desde usr/bin/
index_paths = [
os.path.join(appimage_root, 'web', 'index.html'), # Ruta principal para AppImage
os.path.join(appimage_root, 'usr', 'web', 'index.html'), # Fallback con usr/
os.path.join(appimage_root, 'web', 'out', 'index.html'), # Fallback si está en subcarpeta
os.path.join(appimage_root, 'usr', 'web', 'out', 'index.html'), # Fallback con usr/out/
]
print(f"[v0] Flask server looking for index.html in:")
for path in index_paths:
abs_path = os.path.abspath(path)
exists = os.path.exists(abs_path)
print(f"[v0] {abs_path} - {'EXISTS' if exists else 'NOT FOUND'}")
if exists:
print(f"[v0] Found index.html, serving from: {abs_path}")
return send_file(abs_path)
# If no Next.js build found, return error message with actual paths checked
actual_paths = [os.path.abspath(path) for path in index_paths]
return f'''
ProxMenux Monitor - Build Error
🚨 ProxMenux Monitor - Build Error
Next.js application not found. The AppImage may not have been built correctly.
Expected paths checked:
{''.join([f'- {path}
' for path in actual_paths])}
API endpoints are still available:
''', 500
except Exception as e:
print(f"Error serving dashboard: {e}")
return jsonify({'error': f'Dashboard not available: {str(e)}'}), 500
@app.route('/manifest.json')
def serve_manifest():
"""Serve PWA manifest"""
try:
manifest_paths = [
os.path.join(os.path.dirname(__file__), '..', 'web', 'public', 'manifest.json'),
os.path.join(os.path.dirname(__file__), '..', 'public', 'manifest.json')
]
for manifest_path in manifest_paths:
if os.path.exists(manifest_path):
return send_file(manifest_path)
# Return default manifest if not found
return jsonify({
"name": "ProxMenux Monitor",
"short_name": "ProxMenux",
"description": "Proxmox System Monitoring Dashboard",
"start_url": "/",
"display": "standalone",
"background_color": "#0a0a0a",
"theme_color": "#4f46e5",
"icons": [
{
"src": "/images/proxmenux-logo.png",
"sizes": "256x256",
"type": "image/png"
}
]
})
except Exception as e:
print(f"Error serving manifest: {e}")
return jsonify({}), 404
@app.route('/sw.js')
def serve_sw():
"""Serve service worker"""
return '''
const CACHE_NAME = 'proxmenux-v1';
const urlsToCache = [
'/',
'/api/system',
'/api/storage',
'/api/network',
'/api/health'
];
self.addEventListener('install', event => {
event.waitUntil(
caches.open(CACHE_NAME)
.then(cache => cache.addAll(urlsToCache))
);
});
self.addEventListener('fetch', event => {
event.respondWith(
caches.match(event.request)
.then(response => response || fetch(event.request))
);
});
''', 200, {'Content-Type': 'application/javascript'}
@app.route('/_next/')
def serve_next_static(filename):
"""Serve Next.js static files"""
try:
appimage_root = os.environ.get('APPDIR')
if not appimage_root:
base_dir = os.path.dirname(os.path.abspath(__file__))
appimage_root = os.path.dirname(base_dir)
static_paths = [
os.path.join(appimage_root, 'web', '_next'), # Ruta principal
os.path.join(appimage_root, 'usr', 'web', '_next'), # Fallback con usr/
os.path.join(appimage_root, 'web', 'out', '_next'), # Fallback con out/
os.path.join(appimage_root, 'usr', 'web', 'out', '_next'), # Fallback con usr/out/
]
for static_dir in static_paths:
file_path = os.path.join(static_dir, filename)
if os.path.exists(file_path):
return send_file(file_path)
return '', 404
except Exception as e:
print(f"Error serving Next.js static file {filename}: {e}")
return '', 404
@app.route('/')
def serve_static_files(filename):
"""Serve static files (icons, etc.)"""
try:
appimage_root = os.environ.get('APPDIR')
if not appimage_root:
base_dir = os.path.dirname(os.path.abspath(__file__))
appimage_root = os.path.dirname(base_dir)
public_paths = [
os.path.join(appimage_root, 'web'), # Raíz web para exportación estática
os.path.join(appimage_root, 'usr', 'web'), # Fallback con usr/
os.path.join(appimage_root, 'web', 'out'), # Fallback con out/
os.path.join(appimage_root, 'usr', 'web', 'out'), # Fallback con usr/out/
]
for public_dir in public_paths:
file_path = os.path.join(public_dir, filename)
if os.path.exists(file_path):
return send_from_directory(public_dir, filename)
return '', 404
except Exception as e:
print(f"Error serving static file {filename}: {e}")
return '', 404
@app.route('/images/')
def serve_images(filename):
"""Serve image files"""
try:
appimage_root = os.environ.get('APPDIR')
if not appimage_root:
base_dir = os.path.dirname(os.path.abspath(__file__))
appimage_root = os.path.dirname(base_dir)
image_paths = [
os.path.join(appimage_root, 'web', 'images'), # Ruta principal para exportación estática
os.path.join(appimage_root, 'usr', 'web', 'images'), # Fallback con usr/
os.path.join(appimage_root, 'web', 'public', 'images'), # Ruta con public/
os.path.join(appimage_root, 'usr', 'web', 'public', 'images'), # Fallback usr/public/
os.path.join(appimage_root, 'public', 'images'), # Ruta directa a public
os.path.join(appimage_root, 'usr', 'public', 'images'), # Fallback usr/public
]
print(f"[v0] Looking for image: {filename}")
for image_dir in image_paths:
file_path = os.path.join(image_dir, filename)
abs_path = os.path.abspath(file_path)
exists = os.path.exists(abs_path)
print(f"[v0] Checking: {abs_path} - {'FOUND' if exists else 'NOT FOUND'}")
if exists:
print(f"[v0] Serving image from: {abs_path}")
return send_from_directory(image_dir, filename)
print(f"[v0] Image not found: {filename}")
return '', 404
except Exception as e:
print(f"Error serving image {filename}: {e}")
return '', 404
def get_system_info():
"""Get basic system information"""
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
# Memory usage
memory = psutil.virtual_memory()
temp = 0
try:
if hasattr(psutil, "sensors_temperatures"):
temps = psutil.sensors_temperatures()
if temps:
# Priority order for temperature sensors
sensor_priority = ['coretemp', 'cpu_thermal', 'acpi', 'thermal_zone']
for sensor_name in sensor_priority:
if sensor_name in temps and temps[sensor_name]:
temp = temps[sensor_name][0].current
break
# If no priority sensor found, use first available
if temp == 0:
for name, entries in temps.items():
if entries:
temp = entries[0].current
break
except Exception as e:
print(f"Error reading temperature sensors: {e}")
temp = 0 # Use 0 to indicate no temperature available
# Uptime
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_str = str(timedelta(seconds=int(uptime_seconds)))
# Load average
load_avg = os.getloadavg() if hasattr(os, 'getloadavg') else [0, 0, 0]
hostname = socket.gethostname()
node_id = f"pve-{hostname}"
proxmox_version = None
try:
result = subprocess.run(['pveversion'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
# Parse output like "pve-manager/9.0.6/..."
version_line = result.stdout.strip().split('\n')[0]
if '/' in version_line:
proxmox_version = version_line.split('/')[1]
except Exception as e:
print(f"Note: pveversion not available: {e}")
kernel_version = None
try:
result = subprocess.run(['uname', '-r'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
kernel_version = result.stdout.strip()
except Exception as e:
print(f"Note: uname not available: {e}")
cpu_cores = psutil.cpu_count(logical=False) # Physical cores only
available_updates = 0
try:
result = subprocess.run(['apt', 'list', '--upgradable'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
# Count lines minus header
lines = result.stdout.strip().split('\n')
available_updates = max(0, len(lines) - 1)
except Exception as e:
print(f"Note: apt list not available: {e}")
# Try to get Proxmox node info if available
try:
result = subprocess.run(['pvesh', 'get', '/nodes', '--output-format', 'json'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
nodes = json.loads(result.stdout)
if nodes and len(nodes) > 0:
node_id = nodes[0].get('node', node_id)
except Exception as e:
print(f"Note: pvesh not available or failed: {e}")
pass # Use default if pvesh not available
response = {
'cpu_usage': round(cpu_percent, 1),
'memory_usage': round(memory.percent, 1),
'memory_total': round(memory.total / (1024**3), 1), # GB
'memory_used': round(memory.used / (1024**3), 1), # GB
'temperature': temp,
'uptime': uptime_str,
'load_average': list(load_avg),
'hostname': hostname,
'node_id': node_id,
'timestamp': datetime.now().isoformat(),
'cpu_cores': cpu_cores
}
if proxmox_version:
response['proxmox_version'] = proxmox_version
if kernel_version:
response['kernel_version'] = kernel_version
if available_updates > 0:
response['available_updates'] = available_updates
return response
except Exception as e:
print(f"Critical error getting system info: {e}")
return {
'error': f'Unable to access system information: {str(e)}',
'timestamp': datetime.now().isoformat()
}
def get_storage_info():
"""Get storage and disk information"""
try:
storage_data = {
'total': 0,
'used': 0,
'available': 0,
'disks': [],
'zfs_pools': [],
'disk_count': 0,
'healthy_disks': 0,
'warning_disks': 0,
'critical_disks': 0
}
physical_disks = {}
total_disk_size_bytes = 0
try:
# List all block devices
result = subprocess.run(['lsblk', '-b', '-d', '-n', '-o', 'NAME,SIZE,TYPE'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
parts = line.split()
if len(parts) >= 3 and parts[2] == 'disk':
disk_name = parts[0]
if disk_name.startswith('zd'):
print(f"[v0] Skipping ZFS zvol device: {disk_name}")
continue
disk_size_bytes = int(parts[1])
disk_size_gb = disk_size_bytes / (1024**3)
disk_size_tb = disk_size_bytes / (1024**4)
total_disk_size_bytes += disk_size_bytes
# Get SMART data for this disk
print(f"[v0] Getting SMART data for {disk_name}...")
smart_data = get_smart_data(disk_name)
print(f"[v0] SMART data for {disk_name}: {smart_data}")
if disk_size_tb >= 1:
size_str = f"{disk_size_tb:.1f}T"
else:
size_str = f"{disk_size_gb:.1f}G"
physical_disks[disk_name] = {
'name': disk_name,
'size': size_str,
'size_bytes': disk_size_bytes,
'temperature': smart_data.get('temperature', 0),
'health': smart_data.get('health', 'unknown'),
'power_on_hours': smart_data.get('power_on_hours', 0),
'smart_status': smart_data.get('smart_status', 'unknown'),
'model': smart_data.get('model', 'Unknown'),
'serial': smart_data.get('serial', 'Unknown'),
'reallocated_sectors': smart_data.get('reallocated_sectors', 0),
'pending_sectors': smart_data.get('pending_sectors', 0),
'crc_errors': smart_data.get('crc_errors', 0),
'rotation_rate': smart_data.get('rotation_rate', 0), # Added
'power_cycles': smart_data.get('power_cycles', 0) # Added
}
storage_data['disk_count'] += 1
health = smart_data.get('health', 'unknown').lower()
if health == 'healthy':
storage_data['healthy_disks'] += 1
elif health == 'warning':
storage_data['warning_disks'] += 1
elif health in ['critical', 'failed']:
storage_data['critical_disks'] += 1
except Exception as e:
print(f"Error getting disk list: {e}")
storage_data['total'] = round(total_disk_size_bytes / (1024**4), 1)
# Get disk usage for mounted partitions
try:
disk_partitions = psutil.disk_partitions()
total_used = 0
total_available = 0
for partition in disk_partitions:
try:
# Skip special filesystems
if partition.fstype in ['tmpfs', 'devtmpfs', 'squashfs', 'overlay']:
continue
partition_usage = psutil.disk_usage(partition.mountpoint)
total_used += partition_usage.used
total_available += partition_usage.free
# Extract disk name from partition device
device_name = partition.device.replace('/dev/', '')
if device_name[-1].isdigit():
if 'nvme' in device_name or 'mmcblk' in device_name:
base_disk = device_name.rsplit('p', 1)[0]
else:
base_disk = device_name.rstrip('0123456789')
else:
base_disk = device_name
# Find corresponding physical disk
disk_info = physical_disks.get(base_disk)
if disk_info and 'mountpoint' not in disk_info:
disk_info['mountpoint'] = partition.mountpoint
disk_info['fstype'] = partition.fstype
disk_info['total'] = round(partition_usage.total / (1024**3), 1)
disk_info['used'] = round(partition_usage.used / (1024**3), 1)
disk_info['available'] = round(partition_usage.free / (1024**3), 1)
disk_info['usage_percent'] = round(partition_usage.percent, 1)
except PermissionError:
continue
except Exception as e:
print(f"Error accessing partition {partition.device}: {e}")
continue
storage_data['used'] = round(total_used / (1024**3), 1)
storage_data['available'] = round(total_available / (1024**3), 1)
except Exception as e:
print(f"Error getting partition info: {e}")
storage_data['disks'] = list(physical_disks.values())
try:
result = subprocess.run(['zpool', 'list', '-H', '-o', 'name,size,alloc,free,health'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split('\t')
if len(parts) >= 5:
pool_info = {
'name': parts[0],
'size': parts[1],
'allocated': parts[2],
'free': parts[3],
'health': parts[4]
}
storage_data['zfs_pools'].append(pool_info)
except FileNotFoundError:
print("Note: ZFS not installed")
except Exception as e:
print(f"Note: ZFS not available or no pools: {e}")
return storage_data
except Exception as e:
print(f"Error getting storage info: {e}")
return {
'error': f'Unable to access storage information: {str(e)}',
'total': 0,
'used': 0,
'available': 0,
'disks': [],
'zfs_pools': [],
'disk_count': 0,
'healthy_disks': 0,
'warning_disks': 0,
'critical_disks': 0
}
def get_smart_data(disk_name):
"""Get SMART data for a specific disk - Enhanced with multiple device type attempts"""
smart_data = {
'temperature': 0,
'health': 'unknown',
'power_on_hours': 0,
'smart_status': 'unknown',
'model': 'Unknown',
'serial': 'Unknown',
'reallocated_sectors': 0,
'pending_sectors': 0,
'crc_errors': 0,
'rotation_rate': 0, # Added rotation rate (RPM)
'power_cycles': 0, # Added power cycle count
}
print(f"[v0] ===== Starting SMART data collection for /dev/{disk_name} =====")
try:
commands_to_try = [
['smartctl', '-a', '-j', f'/dev/{disk_name}'], # JSON output (preferred)
['smartctl', '-a', '-j', '-d', 'ata', f'/dev/{disk_name}'], # JSON with ATA device type
['smartctl', '-a', '-j', '-d', 'sat', f'/dev/{disk_name}'], # JSON with SAT device type
['smartctl', '-a', f'/dev/{disk_name}'], # Text output (fallback)
['smartctl', '-a', '-d', 'ata', f'/dev/{disk_name}'], # Text with ATA device type
['smartctl', '-a', '-d', 'sat', f'/dev/{disk_name}'], # Text with SAT device type
['smartctl', '-i', '-H', '-A', f'/dev/{disk_name}'], # Info + Health + Attributes
['smartctl', '-i', '-H', '-A', '-d', 'ata', f'/dev/{disk_name}'], # With ATA
['smartctl', '-i', '-H', '-A', '-d', 'sat', f'/dev/{disk_name}'], # With SAT
['smartctl', '-a', '-j', '-d', 'scsi', f'/dev/{disk_name}'], # JSON with SCSI device type
['smartctl', '-a', '-j', '-d', 'sat,12', f'/dev/{disk_name}'], # SAT with 12-byte commands
['smartctl', '-a', '-j', '-d', 'sat,16', f'/dev/{disk_name}'], # SAT with 16-byte commands
['smartctl', '-a', '-d', 'sat,12', f'/dev/{disk_name}'], # Text SAT with 12-byte commands
['smartctl', '-a', '-d', 'sat,16', f'/dev/{disk_name}'], # Text SAT with 16-byte commands
]
for cmd_index, cmd in enumerate(commands_to_try):
print(f"[v0] Attempt {cmd_index + 1}/{len(commands_to_try)}: Running command: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
print(f"[v0] Command return code: {result.returncode}")
if result.stderr:
stderr_preview = result.stderr[:200].replace('\n', ' ')
print(f"[v0] stderr: {stderr_preview}")
has_output = result.stdout and len(result.stdout.strip()) > 50
if has_output:
print(f"[v0] Got output ({len(result.stdout)} bytes), attempting to parse...")
# Try JSON parsing first (if -j flag was used)
if '-j' in cmd:
try:
print(f"[v0] Attempting JSON parse...")
data = json.loads(result.stdout)
print(f"[v0] JSON parse successful!")
# Extract model
if 'model_name' in data:
smart_data['model'] = data['model_name']
print(f"[v0] Model: {smart_data['model']}")
elif 'model_family' in data:
smart_data['model'] = data['model_family']
print(f"[v0] Model family: {smart_data['model']}")
# Extract serial
if 'serial_number' in data:
smart_data['serial'] = data['serial_number']
print(f"[v0] Serial: {smart_data['serial']}")
if 'rotation_rate' in data:
smart_data['rotation_rate'] = data['rotation_rate']
print(f"[v0] Rotation Rate: {smart_data['rotation_rate']} RPM")
# Extract SMART status
if 'smart_status' in data and 'passed' in data['smart_status']:
smart_data['smart_status'] = 'passed' if data['smart_status']['passed'] else 'failed'
smart_data['health'] = 'healthy' if data['smart_status']['passed'] else 'critical'
print(f"[v0] SMART status: {smart_data['smart_status']}, health: {smart_data['health']}")
# Extract temperature
if 'temperature' in data and 'current' in data['temperature']:
smart_data['temperature'] = data['temperature']['current']
print(f"[v0] Temperature: {smart_data['temperature']}°C")
# Parse ATA SMART attributes
if 'ata_smart_attributes' in data and 'table' in data['ata_smart_attributes']:
print(f"[v0] Parsing ATA SMART attributes...")
for attr in data['ata_smart_attributes']['table']:
attr_id = attr.get('id')
raw_value = attr.get('raw', {}).get('value', 0)
if attr_id == 9: # Power_On_Hours
smart_data['power_on_hours'] = raw_value
print(f"[v0] Power On Hours (ID 9): {raw_value}")
elif attr_id == 12: # Power_Cycle_Count
smart_data['power_cycles'] = raw_value
print(f"[v0] Power Cycles (ID 12): {raw_value}")
elif attr_id == 194: # Temperature_Celsius
if smart_data['temperature'] == 0:
smart_data['temperature'] = raw_value
print(f"[v0] Temperature (ID 194): {raw_value}°C")
elif attr_id == 190: # Airflow_Temperature_Cel
if smart_data['temperature'] == 0:
smart_data['temperature'] = raw_value
print(f"[v0] Airflow Temperature (ID 190): {raw_value}°C")
elif attr_id == 5: # Reallocated_Sector_Ct
smart_data['reallocated_sectors'] = raw_value
print(f"[v0] Reallocated Sectors (ID 5): {raw_value}")
elif attr_id == 197: # Current_Pending_Sector
smart_data['pending_sectors'] = raw_value
print(f"[v0] Pending Sectors (ID 197): {raw_value}")
elif attr_id == 199: # UDMA_CRC_Error_Count
smart_data['crc_errors'] = raw_value
print(f"[v0] CRC Errors (ID 199): {raw_value}")
# Parse NVMe SMART data
if 'nvme_smart_health_information_log' in data:
print(f"[v0] Parsing NVMe SMART data...")
nvme_data = data['nvme_smart_health_information_log']
if 'temperature' in nvme_data:
smart_data['temperature'] = nvme_data['temperature']
print(f"[v0] NVMe Temperature: {smart_data['temperature']}°C")
if 'power_on_hours' in nvme_data:
smart_data['power_on_hours'] = nvme_data['power_on_hours']
print(f"[v0] NVMe Power On Hours: {smart_data['power_on_hours']}")
if 'power_cycles' in nvme_data:
smart_data['power_cycles'] = nvme_data['power_cycles']
print(f"[v0] NVMe Power Cycles: {smart_data['power_cycles']}")
# If we got good data, break out of the loop
if smart_data['model'] != 'Unknown' and smart_data['serial'] != 'Unknown':
print(f"[v0] Successfully extracted complete data from JSON (attempt {cmd_index + 1})")
break
except json.JSONDecodeError as e:
print(f"[v0] JSON parse failed: {e}, trying text parsing...")
if smart_data['model'] == 'Unknown' or smart_data['serial'] == 'Unknown' or smart_data['temperature'] == 0:
print(f"[v0] Parsing text output (model={smart_data['model']}, serial={smart_data['serial']}, temp={smart_data['temperature']})...")
output = result.stdout
# Get basic info
for line in output.split('\n'):
line = line.strip()
# Model detection
if (line.startswith('Device Model:') or line.startswith('Model Number:')) and smart_data['model'] == 'Unknown':
smart_data['model'] = line.split(':', 1)[1].strip()
print(f"[v0] Found model: {smart_data['model']}")
elif line.startswith('Model Family:') and smart_data['model'] == 'Unknown':
smart_data['model'] = line.split(':', 1)[1].strip()
print(f"[v0] Found model family: {smart_data['model']}")
# Serial detection
elif line.startswith('Serial Number:') and smart_data['serial'] == 'Unknown':
smart_data['serial'] = line.split(':', 1)[1].strip()
print(f"[v0] Found serial: {smart_data['serial']}")
elif line.startswith('Rotation Rate:') and smart_data['rotation_rate'] == 0:
rate_str = line.split(':', 1)[1].strip()
if 'rpm' in rate_str.lower():
try:
smart_data['rotation_rate'] = int(rate_str.split()[0])
print(f"[v0] Found rotation rate: {smart_data['rotation_rate']} RPM")
except (ValueError, IndexError):
pass
elif 'Solid State Device' in rate_str:
smart_data['rotation_rate'] = 0 # SSD
print(f"[v0] Found SSD (no rotation)")
# SMART status detection
elif 'SMART overall-health self-assessment test result:' in line:
if 'PASSED' in line:
smart_data['smart_status'] = 'passed'
smart_data['health'] = 'healthy'
print(f"[v0] SMART status: PASSED")
elif 'FAILED' in line:
smart_data['smart_status'] = 'failed'
smart_data['health'] = 'critical'
print(f"[v0] SMART status: FAILED")
# NVMe health
elif 'SMART Health Status:' in line:
if 'OK' in line:
smart_data['smart_status'] = 'passed'
smart_data['health'] = 'healthy'
print(f"[v0] NVMe Health: OK")
# Temperature detection (various formats)
elif 'Current Temperature:' in line and smart_data['temperature'] == 0:
try:
temp_str = line.split(':')[1].strip().split()[0]
smart_data['temperature'] = int(temp_str)
print(f"[v0] Found temperature: {smart_data['temperature']}°C")
except (ValueError, IndexError):
pass
# Parse SMART attributes table
in_attributes = False
for line in output.split('\n'):
line = line.strip()
if 'ID# ATTRIBUTE_NAME' in line or 'ID#' in line and 'ATTRIBUTE_NAME' in line:
in_attributes = True
print(f"[v0] Found SMART attributes table")
continue
if in_attributes:
# Stop at empty line or next section
if not line or line.startswith('SMART') or line.startswith('==='):
in_attributes = False
continue
parts = line.split()
if len(parts) >= 10:
try:
attr_id = parts[0]
# Raw value is typically the last column
raw_value = parts[-1]
# Parse based on attribute ID
if attr_id == '9': # Power On Hours
raw_clean = raw_value.split()[0].replace('h', '').replace(',', '')
smart_data['power_on_hours'] = int(raw_clean)
print(f"[v0] Power On Hours: {smart_data['power_on_hours']}")
elif attr_id == '12': # Power Cycle Count
raw_clean = raw_value.split()[0].replace(',', '')
smart_data['power_cycles'] = int(raw_clean)
print(f"[v0] Power Cycles: {smart_data['power_cycles']}")
elif attr_id == '194' and smart_data['temperature'] == 0: # Temperature
temp_str = raw_value.split()[0]
smart_data['temperature'] = int(temp_str)
print(f"[v0] Temperature (attr 194): {smart_data['temperature']}°C")
elif attr_id == '190' and smart_data['temperature'] == 0: # Airflow Temperature
temp_str = raw_value.split()[0]
smart_data['temperature'] = int(temp_str)
print(f"[v0] Airflow Temperature (attr 190): {smart_data['temperature']}°C")
elif attr_id == '5': # Reallocated Sectors
smart_data['reallocated_sectors'] = int(raw_value)
print(f"[v0] Reallocated Sectors: {smart_data['reallocated_sectors']}")
elif attr_id == '197': # Pending Sectors
smart_data['pending_sectors'] = int(raw_value)
print(f"[v0] Pending Sectors: {smart_data['pending_sectors']}")
elif attr_id == '199': # CRC Errors
smart_data['crc_errors'] = int(raw_value)
print(f"[v0] CRC Errors: {smart_data['crc_errors']}")
except (ValueError, IndexError) as e:
print(f"[v0] Error parsing attribute line '{line}': {e}")
continue
# If we got complete data, break
if smart_data['model'] != 'Unknown' and smart_data['serial'] != 'Unknown':
print(f"[v0] Successfully extracted complete data from text output (attempt {cmd_index + 1})")
break
elif smart_data['model'] != 'Unknown' or smart_data['serial'] != 'Unknown':
print(f"[v0] Extracted partial data from text output, continuing to next attempt...")
else:
print(f"[v0] No usable output (return code {result.returncode}), trying next command...")
except subprocess.TimeoutExpired:
print(f"[v0] Command timeout for attempt {cmd_index + 1}, trying next...")
continue
except Exception as e:
print(f"[v0] Error in attempt {cmd_index + 1}: {type(e).__name__}: {e}")
continue
if smart_data['reallocated_sectors'] > 0 or smart_data['pending_sectors'] > 0:
if smart_data['health'] == 'healthy':
smart_data['health'] = 'warning'
print(f"[v0] Health: WARNING (reallocated/pending sectors)")
if smart_data['reallocated_sectors'] > 10 or smart_data['pending_sectors'] > 10:
smart_data['health'] = 'critical'
print(f"[v0] Health: CRITICAL (high sector count)")
if smart_data['smart_status'] == 'failed':
smart_data['health'] = 'critical'
print(f"[v0] Health: CRITICAL (SMART failed)")
# Temperature-based health (only if we have a valid temperature)
if smart_data['health'] == 'healthy' and smart_data['temperature'] > 0:
if smart_data['temperature'] >= 70:
smart_data['health'] = 'critical'
print(f"[v0] Health: CRITICAL (temperature {smart_data['temperature']}°C)")
elif smart_data['temperature'] >= 60:
smart_data['health'] = 'warning'
print(f"[v0] Health: WARNING (temperature {smart_data['temperature']}°C)")
except FileNotFoundError:
print(f"[v0] ERROR: smartctl not found - install smartmontools")
except Exception as e:
print(f"[v0] ERROR: Unexpected exception for {disk_name}: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
print(f"[v0] ===== Final SMART data for /dev/{disk_name}: {smart_data} =====")
return smart_data
def get_proxmox_storage():
"""Get Proxmox storage information using pvesm status"""
try:
print("[v0] Getting Proxmox storage with pvesm status...")
result = subprocess.run(['pvesm', 'status'], capture_output=True, text=True, timeout=10)
if result.returncode != 0:
print(f"[v0] pvesm status failed with return code {result.returncode}")
print(f"[v0] stderr: {result.stderr}")
return {
'error': 'pvesm command not available or failed',
'storage': []
}
storage_list = []
lines = result.stdout.strip().split('\n')
# Skip header line
if len(lines) < 2:
print("[v0] No storage found in pvesm output")
return {'storage': []}
# Parse each storage line
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 6:
name = parts[0]
storage_type = parts[1]
status = parts[2]
total = int(parts[3])
used = int(parts[4])
available = int(parts[5])
percent = float(parts[6].rstrip('%')) if len(parts) > 6 else 0.0
# Convert bytes to GB
total_gb = round(total / (1024**2), 2)
used_gb = round(used / (1024**2), 2)
available_gb = round(available / (1024**2), 2)
storage_info = {
'name': name,
'type': storage_type,
'status': status,
'total': total_gb,
'used': used_gb,
'available': available_gb,
'percent': round(percent, 2)
}
print(f"[v0] Found storage: {name} ({storage_type}) - {used_gb}/{total_gb} GB ({percent}%)")
storage_list.append(storage_info)
return {'storage': storage_list}
except FileNotFoundError:
print("[v0] pvesm command not found - Proxmox not installed or not in PATH")
return {
'error': 'pvesm command not found - Proxmox not installed',
'storage': []
}
except Exception as e:
print(f"[v0] Error getting Proxmox storage: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return {
'error': f'Unable to get Proxmox storage: {str(e)}',
'storage': []
}
def get_interface_type(interface_name):
"""Detect the type of network interface"""
try:
# Skip loopback
if interface_name == 'lo':
return 'skip'
if interface_name.startswith(('veth', 'tap')):
return 'vm_lxc'
# Skip other virtual interfaces
if interface_name.startswith(('tun', 'vnet', 'docker', 'virbr')):
return 'skip'
# Check if it's a bond
if interface_name.startswith('bond'):
return 'bond'
# Check if it's a bridge (but not virbr which we skip above)
if interface_name.startswith(('vmbr', 'br')):
return 'bridge'
# Check if it's a VLAN (contains a dot)
if '.' in interface_name:
return 'vlan'
# Check if it's a physical interface
if interface_name.startswith(('enp', 'eth', 'wlan', 'wlp', 'eno', 'ens')):
return 'physical'
# Default to skip for unknown types
return 'skip'
except Exception as e:
print(f"[v0] Error detecting interface type for {interface_name}: {e}")
return 'skip'
def get_bond_info(bond_name):
"""Get detailed information about a bonding interface"""
bond_info = {
'mode': 'unknown',
'slaves': [],
'active_slave': None
}
try:
bond_file = f'/proc/net/bonding/{bond_name}'
if os.path.exists(bond_file):
with open(bond_file, 'r') as f:
content = f.read()
# Parse bonding mode
for line in content.split('\n'):
if 'Bonding Mode:' in line:
bond_info['mode'] = line.split(':', 1)[1].strip()
elif 'Slave Interface:' in line:
slave_name = line.split(':', 1)[1].strip()
bond_info['slaves'].append(slave_name)
elif 'Currently Active Slave:' in line:
bond_info['active_slave'] = line.split(':', 1)[1].strip()
print(f"[v0] Bond {bond_name} info: mode={bond_info['mode']}, slaves={bond_info['slaves']}")
except Exception as e:
print(f"[v0] Error reading bond info for {bond_name}: {e}")
return bond_info
def get_bridge_info(bridge_name):
"""Get detailed information about a bridge interface"""
bridge_info = {
'members': []
}
try:
# Try to read bridge members from /sys/class/net//brif/
brif_path = f'/sys/class/net/{bridge_name}/brif'
if os.path.exists(brif_path):
members = os.listdir(brif_path)
bridge_info['members'] = members
print(f"[v0] Bridge {bridge_name} members: {members}")
except Exception as e:
print(f"[v0] Error reading bridge info for {bridge_name}: {e}")
return bridge_info
def get_network_info():
"""Get network interface information - Enhanced with VM/LXC interface separation"""
try:
network_data = {
'interfaces': [],
'vm_lxc_interfaces': [], # Added separate list for VM/LXC interfaces
'traffic': {'bytes_sent': 0, 'bytes_recv': 0, 'packets_sent': 0, 'packets_recv': 0}
}
vm_lxc_map = get_vm_lxc_names()
# Get network interfaces
net_if_addrs = psutil.net_if_addrs()
net_if_stats = psutil.net_if_stats()
try:
net_io_per_nic = psutil.net_io_counters(pernic=True)
except Exception as e:
print(f"[v0] Error getting per-NIC stats: {e}")
net_io_per_nic = {}
active_count = 0
total_count = 0
vm_lxc_active_count = 0
vm_lxc_total_count = 0
for interface_name, interface_addresses in net_if_addrs.items():
interface_type = get_interface_type(interface_name)
if interface_type == 'skip':
print(f"[v0] Skipping interface: {interface_name} (type: {interface_type})")
continue
stats = net_if_stats.get(interface_name)
if not stats:
continue
if interface_type == 'vm_lxc':
vm_lxc_total_count += 1
if stats.isup:
vm_lxc_active_count += 1
else:
total_count += 1
if stats.isup:
active_count += 1
interface_info = {
'name': interface_name,
'type': interface_type,
'status': 'up' if stats.isup else 'down',
'speed': stats.speed if stats.speed > 0 else 0,
'duplex': 'full' if stats.duplex == 2 else 'half' if stats.duplex == 1 else 'unknown',
'mtu': stats.mtu,
'addresses': [],
'mac_address': None,
}
if interface_type == 'vm_lxc':
vmid, vm_type = extract_vmid_from_interface(interface_name)
if vmid and vmid in vm_lxc_map:
interface_info['vmid'] = vmid
interface_info['vm_name'] = vm_lxc_map[vmid]['name']
interface_info['vm_type'] = vm_lxc_map[vmid]['type']
interface_info['vm_status'] = vm_lxc_map[vmid]['status']
elif vmid:
interface_info['vmid'] = vmid
interface_info['vm_name'] = f'{"LXC" if vm_type == "lxc" else "VM"} {vmid}'
interface_info['vm_type'] = vm_type
interface_info['vm_status'] = 'unknown'
for address in interface_addresses:
if address.family == 2: # IPv4
interface_info['addresses'].append({
'ip': address.address,
'netmask': address.netmask
})
elif address.family == 17: # AF_PACKET (MAC address on Linux)
interface_info['mac_address'] = address.address
if interface_name in net_io_per_nic:
io_stats = net_io_per_nic[interface_name]
interface_info['bytes_sent'] = io_stats.bytes_sent
interface_info['bytes_recv'] = io_stats.bytes_recv
interface_info['packets_sent'] = io_stats.packets_sent
interface_info['packets_recv'] = io_stats.packets_recv
interface_info['errors_in'] = io_stats.errin
interface_info['errors_out'] = io_stats.errout
interface_info['drops_in'] = io_stats.dropin
interface_info['drops_out'] = io_stats.dropout
total_packets_in = io_stats.packets_recv + io_stats.dropin
total_packets_out = io_stats.packets_sent + io_stats.dropout
if total_packets_in > 0:
interface_info['packet_loss_in'] = round((io_stats.dropin / total_packets_in) * 100, 2)
else:
interface_info['packet_loss_in'] = 0
if total_packets_out > 0:
interface_info['packet_loss_out'] = round((io_stats.dropout / total_packets_out) * 100, 2)
else:
interface_info['packet_loss_out'] = 0
if interface_type == 'bond':
bond_info = get_bond_info(interface_name)
interface_info['bond_mode'] = bond_info['mode']
interface_info['bond_slaves'] = bond_info['slaves']
interface_info['bond_active_slave'] = bond_info['active_slave']
if interface_type == 'bridge':
bridge_info = get_bridge_info(interface_name)
interface_info['bridge_members'] = bridge_info['members']
if interface_type == 'vm_lxc':
network_data['vm_lxc_interfaces'].append(interface_info)
else:
network_data['interfaces'].append(interface_info)
network_data['active_count'] = active_count
network_data['total_count'] = total_count
network_data['vm_lxc_active_count'] = vm_lxc_active_count
network_data['vm_lxc_total_count'] = vm_lxc_total_count
print(f"[v0] Physical interfaces: {active_count} active out of {total_count} total")
print(f"[v0] VM/LXC interfaces: {vm_lxc_active_count} active out of {vm_lxc_total_count} total")
# Get network I/O statistics (global)
net_io = psutil.net_io_counters()
network_data['traffic'] = {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv,
'errin': net_io.errin,
'errout': net_io.errout,
'dropin': net_io.dropin,
'dropout': net_io.dropout
}
total_packets_in = net_io.packets_recv + net_io.dropin
total_packets_out = net_io.packets_sent + net_io.dropout
if total_packets_in > 0:
network_data['traffic']['packet_loss_in'] = round((net_io.dropin / total_packets_in) * 100, 2)
else:
network_data['traffic']['packet_loss_in'] = 0
if total_packets_out > 0:
network_data['traffic']['packet_loss_out'] = round((net_io.dropout / total_packets_out) * 100, 2)
else:
network_data['traffic']['packet_loss_out'] = 0
return network_data
except Exception as e:
print(f"Error getting network info: {e}")
import traceback
traceback.print_exc()
return {
'error': f'Unable to access network information: {str(e)}',
'interfaces': [],
'vm_lxc_interfaces': [],
'traffic': {'bytes_sent': 0, 'bytes_recv': 0, 'packets_sent': 0, 'packets_recv': 0},
'active_count': 0,
'total_count': 0,
'vm_lxc_active_count': 0,
'vm_lxc_total_count': 0
}
def get_proxmox_vms():
"""Get Proxmox VM information (requires pvesh command)"""
try:
# Try to get VM list using pvesh command
result = subprocess.run(['pvesh', 'get', '/nodes/localhost/qemu', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
vms = json.loads(result.stdout)
return vms
else:
# Handle LXC containers as well
result_lxc = subprocess.run(['pvesh', 'get', '/nodes/localhost/lxc', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if result_lxc.returncode == 0:
lxc_vms = json.loads(result_lxc.stdout)
# Combine QEMU and LXC for a complete VM list
if 'vms' in locals(): # Check if vms were loaded from QEMU
vms.extend(lxc_vms)
else:
vms = lxc_vms
return vms
else:
return {
'error': 'pvesh command not available or failed - Proxmox API not accessible for QEMU and LXC',
'vms': []
}
except Exception as e:
print(f"Error getting VM info: {e}")
return {
'error': f'Unable to access VM information: {str(e)}',
'vms': []
}
@app.route('/api/system', methods=['GET'])
def api_system():
"""Get system information"""
return jsonify(get_system_info())
@app.route('/api/storage', methods=['GET'])
def api_storage():
"""Get storage information"""
return jsonify(get_storage_info())
@app.route('/api/proxmox-storage', methods=['GET'])
def api_proxmox_storage():
"""Get Proxmox storage information"""
return jsonify(get_proxmox_storage())
@app.route('/api/network', methods=['GET'])
def api_network():
"""Get network information"""
return jsonify(get_network_info())
@app.route('/api/vms', methods=['GET'])
def api_vms():
"""Get virtual machine information"""
return jsonify(get_proxmox_vms())
@app.route('/api/logs', methods=['GET'])
def api_logs():
"""Get system logs"""
try:
# Get recent system logs
result = subprocess.run(['journalctl', '-n', '100', '--output', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
logs = []
for line in result.stdout.strip().split('\n'):
if line:
try:
log_entry = json.loads(line)
logs.append({
'timestamp': log_entry.get('__REALTIME_TIMESTAMP', ''),
'level': log_entry.get('PRIORITY', '6'),
'service': log_entry.get('_SYSTEMD_UNIT', 'system'),
'message': log_entry.get('MESSAGE', ''),
'source': 'journalctl'
})
except json.JSONDecodeError:
continue
return jsonify(logs)
else:
return jsonify({
'error': 'journalctl not available or failed',
'logs': []
})
except Exception as e:
print(f"Error getting logs: {e}")
return jsonify({
'error': f'Unable to access system logs: {str(e)}',
'logs': []
})
@app.route('/api/health', methods=['GET'])
def api_health():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
@app.route('/api/system-info', methods=['GET'])
def api_system_info():
"""Get system and node information for dashboard header"""
try:
hostname = socket.gethostname()
node_id = f"pve-{hostname}"
pve_version = None
# Try to get Proxmox version
try:
result = subprocess.run(['pveversion'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
pve_version = result.stdout.strip().split('\n')[0]
except:
pass
# Try to get node info from Proxmox API
try:
result = subprocess.run(['pvesh', 'get', '/nodes', '--output-format', 'json'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
nodes = json.loads(result.stdout)
if nodes and len(nodes) > 0:
node_info = nodes[0]
node_id = node_info.get('node', node_id)
hostname = node_info.get('node', hostname)
except:
pass
response = {
'hostname': hostname,
'node_id': node_id,
'status': 'online',
'timestamp': datetime.now().isoformat()
}
if pve_version:
response['pve_version'] = pve_version
else:
response['error'] = 'Proxmox version not available - pveversion command not found'
return jsonify(response)
except Exception as e:
print(f"Error getting system info: {e}")
return jsonify({
'error': f'Unable to access system information: {str(e)}',
'hostname': socket.gethostname(),
'status': 'error',
'timestamp': datetime.now().isoformat()
})
@app.route('/api/info', methods=['GET'])
def api_info():
"""Root endpoint with API information"""
return jsonify({
'name': 'ProxMenux Monitor API',
'version': '1.0.0',
'endpoints': [
'/api/system',
'/api/system-info',
'/api/storage',
'/api/proxmox-storage', # Added new endpoint
'/api/network',
'/api/vms',
'/api/logs',
'/api/health'
]
})
if __name__ == '__main__':
print("Starting ProxMenux Flask Server on port 8008...")
print("Server will be accessible on all network interfaces (0.0.0.0:8008)")
print("API endpoints available at: /api/system, /api/storage, /api/network, /api/vms, /api/logs, /api/health")
app.run(host='0.0.0.0', port=8008, debug=False)