Update AppImage

This commit is contained in:
MacRimi
2025-11-07 14:14:43 +01:00
parent 41dab03a5f
commit 37217b4219
2 changed files with 678 additions and 2 deletions

View File

@@ -41,7 +41,8 @@ CORS(app) # Enable CORS for Next.js frontend
app.register_blueprint(auth_bp)
app.register_blueprint(health_bp)
# PCIe info is static and doesn't change while the system is running
PCIE_INFO_CACHE = {}
def identify_gpu_type(name, vendor=None, bus=None, driver=None):
"""
@@ -936,6 +937,9 @@ def get_disk_hardware_info(disk_name):
def get_pcie_link_speed(disk_name):
"""Get PCIe link speed information for NVMe drives"""
if disk_name in PCIE_INFO_CACHE:
return PCIE_INFO_CACHE[disk_name]
pcie_info = {
'pcie_gen': None,
'pcie_width': None,
@@ -951,6 +955,7 @@ def get_pcie_link_speed(disk_name):
match = re.match(r'(nvme\d+)n\d+', disk_name)
if not match:
print(f"[v0] Could not extract controller from {disk_name}")
PCIE_INFO_CACHE[disk_name] = pcie_info
return pcie_info
controller = match.group(1) # nvme0n1 -> nvme0
@@ -1107,6 +1112,7 @@ def get_pcie_link_speed(disk_name):
traceback.print_exc()
print(f"[v0] Final PCIe info for {disk_name}: {pcie_info}")
PCIE_INFO_CACHE[disk_name] = pcie_info
return pcie_info
# get_pcie_link_speed function definition ends here
@@ -5294,6 +5300,676 @@ def api_events():
'total': 0
})
@app.route('/api/notifications', methods=['GET'])
def api_notifications():
"""Get Proxmox notification history"""
try:
notifications = []
# 1. Get notifications from journalctl (Proxmox notification service)
try:
cmd = [
'journalctl',
'-u', 'pve-ha-lrm',
'-u', 'pve-ha-crm',
'-u', 'pvedaemon',
'-u', 'pveproxy',
'-u', 'pvestatd',
'--grep', 'notification|email|webhook|alert|notify',
'-n', '100',
'--output', 'json',
'--no-pager'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line:
try:
log_entry = json.loads(line)
timestamp_us = int(log_entry.get('__REALTIME_TIMESTAMP', '0'))
timestamp = datetime.fromtimestamp(timestamp_us / 1000000).strftime('%Y-%m-%d %H:%M:%S')
message = log_entry.get('MESSAGE', '')
# Determine notification type from message
notif_type = 'info'
if 'email' in message.lower():
notif_type = 'email'
elif 'webhook' in message.lower():
notif_type = 'webhook'
elif 'alert' in message.lower() or 'warning' in message.lower():
notif_type = 'alert'
elif 'error' in message.lower() or 'fail' in message.lower():
notif_type = 'error'
notifications.append({
'timestamp': timestamp,
'type': notif_type,
'service': log_entry.get('_SYSTEMD_UNIT', 'proxmox'),
'message': message,
'source': 'journal'
})
except (json.JSONDecodeError, ValueError):
continue
except Exception as e:
# print(f"Error reading notification logs: {e}")
pass
# 2. Try to read Proxmox notification configuration
try:
notif_config_path = '/etc/pve/notifications.cfg'
if os.path.exists(notif_config_path):
with open(notif_config_path, 'r') as f:
config_content = f.read()
# Parse notification targets (emails, webhooks, etc.)
for line in config_content.split('\n'):
if line.strip() and not line.startswith('#'):
notifications.append({
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'type': 'config',
'service': 'notification-config',
'message': f'Notification target configured: {line.strip()}',
'source': 'config'
})
except Exception as e:
# print(f"Error reading notification config: {e}")
pass
# 3. Get backup notifications from task log
try:
cmd = ['pvesh', 'get', '/cluster/tasks', '--output-format', 'json']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
tasks = json.loads(result.stdout)
for task in tasks:
if task.get('type') in ['vzdump', 'backup']:
status = task.get('status', 'unknown')
notif_type = 'success' if status == 'OK' else 'error' if status == 'stopped' else 'info'
notifications.append({
'timestamp': datetime.fromtimestamp(task.get('starttime', 0)).strftime('%Y-%m-%d %H:%M:%S'),
'type': notif_type,
'service': 'backup',
'message': f"Backup task {task.get('upid', 'unknown')}: {status}",
'source': 'task-log'
})
except Exception as e:
# print(f"Error reading task notifications: {e}")
pass
# Sort by timestamp (newest first)
notifications.sort(key=lambda x: x['timestamp'], reverse=True)
return jsonify({
'notifications': notifications[:100], # Limit to 100 most recent
'total': len(notifications)
})
except Exception as e:
# print(f"Error getting notifications: {e}")
pass
return jsonify({
'error': str(e),
'notifications': [],
'total': 0
})
@app.route('/api/notifications/download', methods=['GET'])
def api_notifications_download():
"""Download complete log for a specific notification"""
try:
timestamp = request.args.get('timestamp', '')
if not timestamp:
return jsonify({'error': 'Timestamp parameter required'}), 400
from datetime import datetime, timedelta
try:
# Parse timestamp format: "2025-10-11 14:27:35"
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
# Use a very small time window (2 minutes) to get just this notification
since_time = (dt - timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S")
until_time = (dt + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
# If parsing fails, use a default range
since_time = "2 minutes ago"
until_time = "now"
# Get logs around the specific timestamp
cmd = [
'journalctl',
'--since', since_time,
'--until', until_time,
'-n', '50', # Limit to 50 lines around the notification
'--no-pager'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log') as f:
f.write(f"ProxMenux Log ({log_type}, since {since_days if since_days else f'{hours}h'}) - Generated: {datetime.now().isoformat()}\n")
f.write("=" * 80 + "\n\n")
f.write(result.stdout)
temp_path = f.name
return send_file(
temp_path,
mimetype='text/plain',
as_attachment=True,
download_name=f'notification_{timestamp.replace(":", "_").replace(" ", "_")}.log'
)
else:
return jsonify({'error': 'Failed to generate log file'}), 500
except Exception as e:
# print(f"Error downloading logs: {e}")
pass
return jsonify({'error': str(e)}), 500
@app.route('/api/backups', methods=['GET'])
def api_backups():
"""Get list of all backup files from Proxmox storage"""
try:
backups = []
# Get list of storage locations
try:
result = subprocess.run(['pvesh', 'get', '/storage', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
storages = json.loads(result.stdout)
# For each storage, get backup files
for storage in storages:
storage_id = storage.get('storage')
storage_type = storage.get('type')
# Only check storages that can contain backups
if storage_type in ['dir', 'nfs', 'cifs', 'pbs']:
try:
# Get content of storage
content_result = subprocess.run(
['pvesh', 'get', f'/nodes/localhost/storage/{storage_id}/content', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if content_result.returncode == 0:
contents = json.loads(content_result.stdout)
for item in contents:
if item.get('content') == 'backup':
# Parse backup information
volid = item.get('volid', '')
size = item.get('size', 0)
ctime = item.get('ctime', 0)
# Extract VMID from volid (format: storage:backup/vzdump-qemu-100-...)
vmid = None
backup_type = None
if 'vzdump-qemu-' in volid:
backup_type = 'qemu'
try:
vmid = volid.split('vzdump-qemu-')[1].split('-')[0]
except:
pass
elif 'vzdump-lxc-' in volid:
backup_type = 'lxc'
try:
vmid = volid.split('vzdump-lxc-')[1].split('-')[0]
except:
pass
backups.append({
'volid': volid,
'storage': storage_id,
'vmid': vmid,
'type': backup_type,
'size': size,
'size_human': format_bytes(size),
'created': datetime.fromtimestamp(ctime).strftime('%Y-%m-%d %H:%M:%S'),
'timestamp': ctime
})
except Exception as e:
# print(f"Error getting content for storage {storage_id}: {e}")
pass
continue
except Exception as e:
# print(f"Error getting storage list: {e}")
pass
# Sort by creation time (newest first)
backups.sort(key=lambda x: x['timestamp'], reverse=True)
return jsonify({
'backups': backups,
'total': len(backups)
})
except Exception as e:
# print(f"Error getting backups: {e}")
pass
return jsonify({
'error': str(e),
'backups': [],
'total': 0
})
@app.route('/api/events', methods=['GET'])
def api_events():
"""Get Proxmox events and tasks"""
try:
limit = request.args.get('limit', '50')
events = []
try:
result = subprocess.run(['pvesh', 'get', '/cluster/tasks', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
tasks = json.loads(result.stdout)
for task in tasks[:int(limit)]:
upid = task.get('upid', '')
task_type = task.get('type', 'unknown')
status = task.get('status', 'unknown')
node = task.get('node', 'unknown')
user = task.get('user', 'unknown')
vmid = task.get('id', '')
starttime = task.get('starttime', 0)
endtime = task.get('endtime', 0)
# Calculate duration
duration = ''
if endtime and starttime:
duration_sec = endtime - starttime
if duration_sec < 60:
duration = f"{duration_sec}s"
elif duration_sec < 3600:
duration = f"{duration_sec // 60}m {duration_sec % 60}s"
else:
hours = duration_sec // 3600
minutes = (duration_sec % 3600) // 60
duration = f"{hours}h {minutes}m"
# Determine level based on status
level = 'info'
if status == 'OK':
level = 'info'
elif status in ['stopped', 'error']:
level = 'error'
elif status == 'running':
level = 'warning'
events.append({
'upid': upid,
'type': task_type,
'status': status,
'level': level,
'node': node,
'user': user,
'vmid': str(vmid) if vmid else '',
'starttime': datetime.fromtimestamp(starttime).strftime('%Y-%m-%d %H:%M:%S') if starttime else '',
'endtime': datetime.fromtimestamp(endtime).strftime('%Y-%m-%d %H:%M:%S') if endtime else 'Running',
'duration': duration
})
except Exception as e:
# print(f"Error getting events: {e}")
pass
return jsonify({
'events': events,
'total': len(events)
})
except Exception as e:
# print(f"Error getting events: {e}")
pass
return jsonify({
'error': str(e),
'events': [],
'total': 0
})
@app.route('/api/notifications', methods=['GET'])
def api_notifications():
"""Get Proxmox notification history"""
try:
notifications = []
# 1. Get notifications from journalctl (Proxmox notification service)
try:
cmd = [
'journalctl',
'-u', 'pve-ha-lrm',
'-u', 'pve-ha-crm',
'-u', 'pvedaemon',
'-u', 'pveproxy',
'-u', 'pvestatd',
'--grep', 'notification|email|webhook|alert|notify',
'-n', '100',
'--output', 'json',
'--no-pager'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line:
try:
log_entry = json.loads(line)
timestamp_us = int(log_entry.get('__REALTIME_TIMESTAMP', '0'))
timestamp = datetime.fromtimestamp(timestamp_us / 1000000).strftime('%Y-%m-%d %H:%M:%S')
message = log_entry.get('MESSAGE', '')
# Determine notification type from message
notif_type = 'info'
if 'email' in message.lower():
notif_type = 'email'
elif 'webhook' in message.lower():
notif_type = 'webhook'
elif 'alert' in message.lower() or 'warning' in message.lower():
notif_type = 'alert'
elif 'error' in message.lower() or 'fail' in message.lower():
notif_type = 'error'
notifications.append({
'timestamp': timestamp,
'type': notif_type,
'service': log_entry.get('_SYSTEMD_UNIT', 'proxmox'),
'message': message,
'source': 'journal'
})
except (json.JSONDecodeError, ValueError):
continue
except Exception as e:
# print(f"Error reading notification logs: {e}")
pass
# 2. Try to read Proxmox notification configuration
try:
notif_config_path = '/etc/pve/notifications.cfg'
if os.path.exists(notif_config_path):
with open(notif_config_path, 'r') as f:
config_content = f.read()
# Parse notification targets (emails, webhooks, etc.)
for line in config_content.split('\n'):
if line.strip() and not line.startswith('#'):
notifications.append({
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'type': 'config',
'service': 'notification-config',
'message': f'Notification target configured: {line.strip()}',
'source': 'config'
})
except Exception as e:
# print(f"Error reading notification config: {e}")
pass
# 3. Get backup notifications from task log
try:
cmd = ['pvesh', 'get', '/cluster/tasks', '--output-format', 'json']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
tasks = json.loads(result.stdout)
for task in tasks:
if task.get('type') in ['vzdump', 'backup']:
status = task.get('status', 'unknown')
notif_type = 'success' if status == 'OK' else 'error' if status == 'stopped' else 'info'
notifications.append({
'timestamp': datetime.fromtimestamp(task.get('starttime', 0)).strftime('%Y-%m-%d %H:%M:%S'),
'type': notif_type,
'service': 'backup',
'message': f"Backup task {task.get('upid', 'unknown')}: {status}",
'source': 'task-log'
})
except Exception as e:
# print(f"Error reading task notifications: {e}")
pass
# Sort by timestamp (newest first)
notifications.sort(key=lambda x: x['timestamp'], reverse=True)
return jsonify({
'notifications': notifications[:100], # Limit to 100 most recent
'total': len(notifications)
})
except Exception as e:
# print(f"Error getting notifications: {e}")
pass
return jsonify({
'error': str(e),
'notifications': [],
'total': 0
})
@app.route('/api/notifications/download', methods=['GET'])
def api_notifications_download():
"""Download complete log for a specific notification"""
try:
timestamp = request.args.get('timestamp', '')
if not timestamp:
return jsonify({'error': 'Timestamp parameter required'}), 400
from datetime import datetime, timedelta
try:
# Parse timestamp format: "2025-10-11 14:27:35"
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
# Use a very small time window (2 minutes) to get just this notification
since_time = (dt - timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S")
until_time = (dt + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
# If parsing fails, use a default range
since_time = "2 minutes ago"
until_time = "now"
# Get logs around the specific timestamp
cmd = [
'journalctl',
'--since', since_time,
'--until', until_time,
'-n', '50', # Limit to 50 lines around the notification
'--no-pager'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log') as f:
f.write(f"ProxMenux Log ({log_type}, since {since_days if since_days else f'{hours}h'}) - Generated: {datetime.now().isoformat()}\n")
f.write("=" * 80 + "\n\n")
f.write(result.stdout)
temp_path = f.name
return send_file(
temp_path,
mimetype='text/plain',
as_attachment=True,
download_name=f'notification_{timestamp.replace(":", "_").replace(" ", "_")}.log'
)
else:
return jsonify({'error': 'Failed to generate log file'}), 500
except Exception as e:
# print(f"Error downloading logs: {e}")
pass
return jsonify({'error': str(e)}), 500
@app.route('/api/backups', methods=['GET'])
def api_backups():
"""Get list of all backup files from Proxmox storage"""
try:
backups = []
# Get list of storage locations
try:
result = subprocess.run(['pvesh', 'get', '/storage', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
storages = json.loads(result.stdout)
# For each storage, get backup files
for storage in storages:
storage_id = storage.get('storage')
storage_type = storage.get('type')
# Only check storages that can contain backups
if storage_type in ['dir', 'nfs', 'cifs', 'pbs']:
try:
# Get content of storage
content_result = subprocess.run(
['pvesh', 'get', f'/nodes/localhost/storage/{storage_id}/content', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if content_result.returncode == 0:
contents = json.loads(content_result.stdout)
for item in contents:
if item.get('content') == 'backup':
# Parse backup information
volid = item.get('volid', '')
size = item.get('size', 0)
ctime = item.get('ctime', 0)
# Extract VMID from volid (format: storage:backup/vzdump-qemu-100-...)
vmid = None
backup_type = None
if 'vzdump-qemu-' in volid:
backup_type = 'qemu'
try:
vmid = volid.split('vzdump-qemu-')[1].split('-')[0]
except:
pass
elif 'vzdump-lxc-' in volid:
backup_type = 'lxc'
try:
vmid = volid.split('vzdump-lxc-')[1].split('-')[0]
except:
pass
backups.append({
'volid': volid,
'storage': storage_id,
'vmid': vmid,
'type': backup_type,
'size': size,
'size_human': format_bytes(size),
'created': datetime.fromtimestamp(ctime).strftime('%Y-%m-%d %H:%M:%S'),
'timestamp': ctime
})
except Exception as e:
# print(f"Error getting content for storage {storage_id}: {e}")
pass
continue
except Exception as e:
# print(f"Error getting storage list: {e}")
pass
# Sort by creation time (newest first)
backups.sort(key=lambda x: x['timestamp'], reverse=True)
return jsonify({
'backups': backups,
'total': len(backups)
})
except Exception as e:
# print(f"Error getting backups: {e}")
pass
return jsonify({
'error': str(e),
'backups': [],
'total': 0
})
@app.route('/api/events', methods=['GET'])
def api_events():
"""Get recent Proxmox events and tasks"""
try:
limit = request.args.get('limit', '50')
events = []
try:
result = subprocess.run(['pvesh', 'get', '/cluster/tasks', '--output-format', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
tasks = json.loads(result.stdout)
for task in tasks[:int(limit)]:
upid = task.get('upid', '')
task_type = task.get('type', 'unknown')
status = task.get('status', 'unknown')
node = task.get('node', 'unknown')
user = task.get('user', 'unknown')
vmid = task.get('id', '')
starttime = task.get('starttime', 0)
endtime = task.get('endtime', 0)
# Calculate duration
duration = ''
if endtime and starttime:
duration_sec = endtime - starttime
if duration_sec < 60:
duration = f"{duration_sec}s"
elif duration_sec < 3600:
duration = f"{duration_sec // 60}m {duration_sec % 60}s"
else:
hours = duration_sec // 3600
minutes = (duration_sec % 3600) // 60
duration = f"{hours}h {minutes}m"
# Determine level based on status
level = 'info'
if status == 'OK':
level = 'info'
elif status in ['stopped', 'error']:
level = 'error'
elif status == 'running':
level = 'warning'
events.append({
'upid': upid,
'type': task_type,
'status': status,
'level': level,
'node': node,
'user': user,
'vmid': str(vmid) if vmid else '',
'starttime': datetime.fromtimestamp(starttime).strftime('%Y-%m-%d %H:%M:%S') if starttime else '',
'endtime': datetime.fromtimestamp(endtime).strftime('%Y-%m-%d %H:%M:%S') if endtime else 'Running',
'duration': duration
})
except Exception as e:
# print(f"Error getting events: {e}")
pass
return jsonify({
'events': events,
'total': len(events)
})
except Exception as e:
# print(f"Error getting events: {e}")
pass
return jsonify({
'error': str(e),
'events': [],
'total': 0
})
@app.route('/api/task-log/<path:upid>')
def get_task_log(upid):
"""Get complete task log from Proxmox using UPID"""