From ba84c644dfbd4a7841dca9503c73f51a06960975 Mon Sep 17 00:00:00 2001 From: MacRimi Date: Fri, 7 Nov 2025 14:33:27 +0100 Subject: [PATCH] Update flask_server.py --- AppImage/scripts/flask_server.py | 678 +------------------------------ 1 file changed, 1 insertion(+), 677 deletions(-) diff --git a/AppImage/scripts/flask_server.py b/AppImage/scripts/flask_server.py index c246408..654ed92 100644 --- a/AppImage/scripts/flask_server.py +++ b/AppImage/scripts/flask_server.py @@ -41,8 +41,7 @@ CORS(app) # Enable CORS for Next.js frontend app.register_blueprint(auth_bp) app.register_blueprint(health_bp) -# PCIe info is static and doesn't change while the system is running -PCIE_INFO_CACHE = {} + def identify_gpu_type(name, vendor=None, bus=None, driver=None): """ @@ -937,9 +936,6 @@ def get_disk_hardware_info(disk_name): def get_pcie_link_speed(disk_name): """Get PCIe link speed information for NVMe drives""" - if disk_name in PCIE_INFO_CACHE: - return PCIE_INFO_CACHE[disk_name] - pcie_info = { 'pcie_gen': None, 'pcie_width': None, @@ -955,7 +951,6 @@ def get_pcie_link_speed(disk_name): match = re.match(r'(nvme\d+)n\d+', disk_name) if not match: print(f"[v0] Could not extract controller from {disk_name}") - PCIE_INFO_CACHE[disk_name] = pcie_info return pcie_info controller = match.group(1) # nvme0n1 -> nvme0 @@ -1112,7 +1107,6 @@ def get_pcie_link_speed(disk_name): traceback.print_exc() print(f"[v0] Final PCIe info for {disk_name}: {pcie_info}") - PCIE_INFO_CACHE[disk_name] = pcie_info return pcie_info # get_pcie_link_speed function definition ends here @@ -5300,676 +5294,6 @@ def api_events(): 'total': 0 }) -@app.route('/api/notifications', methods=['GET']) -def api_notifications(): - """Get Proxmox notification history""" - try: - notifications = [] - - # 1. Get notifications from journalctl (Proxmox notification service) - try: - cmd = [ - 'journalctl', - '-u', 'pve-ha-lrm', - '-u', 'pve-ha-crm', - '-u', 'pvedaemon', - '-u', 'pveproxy', - '-u', 'pvestatd', - '--grep', 'notification|email|webhook|alert|notify', - '-n', '100', - '--output', 'json', - '--no-pager' - ] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - for line in result.stdout.strip().split('\n'): - if line: - try: - log_entry = json.loads(line) - timestamp_us = int(log_entry.get('__REALTIME_TIMESTAMP', '0')) - timestamp = datetime.fromtimestamp(timestamp_us / 1000000).strftime('%Y-%m-%d %H:%M:%S') - - message = log_entry.get('MESSAGE', '') - - # Determine notification type from message - notif_type = 'info' - if 'email' in message.lower(): - notif_type = 'email' - elif 'webhook' in message.lower(): - notif_type = 'webhook' - elif 'alert' in message.lower() or 'warning' in message.lower(): - notif_type = 'alert' - elif 'error' in message.lower() or 'fail' in message.lower(): - notif_type = 'error' - - notifications.append({ - 'timestamp': timestamp, - 'type': notif_type, - 'service': log_entry.get('_SYSTEMD_UNIT', 'proxmox'), - 'message': message, - 'source': 'journal' - }) - except (json.JSONDecodeError, ValueError): - continue - except Exception as e: - # print(f"Error reading notification logs: {e}") - pass - - # 2. Try to read Proxmox notification configuration - try: - notif_config_path = '/etc/pve/notifications.cfg' - if os.path.exists(notif_config_path): - with open(notif_config_path, 'r') as f: - config_content = f.read() - # Parse notification targets (emails, webhooks, etc.) - for line in config_content.split('\n'): - if line.strip() and not line.startswith('#'): - notifications.append({ - 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'type': 'config', - 'service': 'notification-config', - 'message': f'Notification target configured: {line.strip()}', - 'source': 'config' - }) - except Exception as e: - # print(f"Error reading notification config: {e}") - pass - - # 3. Get backup notifications from task log - try: - cmd = ['pvesh', 'get', '/cluster/tasks', '--output-format', 'json'] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - tasks = json.loads(result.stdout) - for task in tasks: - if task.get('type') in ['vzdump', 'backup']: - status = task.get('status', 'unknown') - notif_type = 'success' if status == 'OK' else 'error' if status == 'stopped' else 'info' - - notifications.append({ - 'timestamp': datetime.fromtimestamp(task.get('starttime', 0)).strftime('%Y-%m-%d %H:%M:%S'), - 'type': notif_type, - 'service': 'backup', - 'message': f"Backup task {task.get('upid', 'unknown')}: {status}", - 'source': 'task-log' - }) - except Exception as e: - # print(f"Error reading task notifications: {e}") - pass - - # Sort by timestamp (newest first) - notifications.sort(key=lambda x: x['timestamp'], reverse=True) - - return jsonify({ - 'notifications': notifications[:100], # Limit to 100 most recent - 'total': len(notifications) - }) - - except Exception as e: - # print(f"Error getting notifications: {e}") - pass - return jsonify({ - 'error': str(e), - 'notifications': [], - 'total': 0 - }) - -@app.route('/api/notifications/download', methods=['GET']) -def api_notifications_download(): - """Download complete log for a specific notification""" - try: - timestamp = request.args.get('timestamp', '') - - if not timestamp: - return jsonify({'error': 'Timestamp parameter required'}), 400 - - from datetime import datetime, timedelta - - try: - # Parse timestamp format: "2025-10-11 14:27:35" - dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") - # Use a very small time window (2 minutes) to get just this notification - since_time = (dt - timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S") - until_time = (dt + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S") - except ValueError: - # If parsing fails, use a default range - since_time = "2 minutes ago" - until_time = "now" - - # Get logs around the specific timestamp - cmd = [ - 'journalctl', - '--since', since_time, - '--until', until_time, - '-n', '50', # Limit to 50 lines around the notification - '--no-pager' - ] - - result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - - if result.returncode == 0: - import tempfile - with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log') as f: - f.write(f"ProxMenux Log ({log_type}, since {since_days if since_days else f'{hours}h'}) - Generated: {datetime.now().isoformat()}\n") - f.write("=" * 80 + "\n\n") - f.write(result.stdout) - temp_path = f.name - - return send_file( - temp_path, - mimetype='text/plain', - as_attachment=True, - download_name=f'notification_{timestamp.replace(":", "_").replace(" ", "_")}.log' - ) - else: - return jsonify({'error': 'Failed to generate log file'}), 500 - - except Exception as e: - # print(f"Error downloading logs: {e}") - pass - return jsonify({'error': str(e)}), 500 - -@app.route('/api/backups', methods=['GET']) -def api_backups(): - """Get list of all backup files from Proxmox storage""" - try: - backups = [] - - # Get list of storage locations - try: - result = subprocess.run(['pvesh', 'get', '/storage', '--output-format', 'json'], - capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - storages = json.loads(result.stdout) - - # For each storage, get backup files - for storage in storages: - storage_id = storage.get('storage') - storage_type = storage.get('type') - - # Only check storages that can contain backups - if storage_type in ['dir', 'nfs', 'cifs', 'pbs']: - try: - # Get content of storage - content_result = subprocess.run( - ['pvesh', 'get', f'/nodes/localhost/storage/{storage_id}/content', '--output-format', 'json'], - capture_output=True, text=True, timeout=10) - - if content_result.returncode == 0: - contents = json.loads(content_result.stdout) - - for item in contents: - if item.get('content') == 'backup': - # Parse backup information - volid = item.get('volid', '') - size = item.get('size', 0) - ctime = item.get('ctime', 0) - - # Extract VMID from volid (format: storage:backup/vzdump-qemu-100-...) - vmid = None - backup_type = None - if 'vzdump-qemu-' in volid: - backup_type = 'qemu' - try: - vmid = volid.split('vzdump-qemu-')[1].split('-')[0] - except: - pass - elif 'vzdump-lxc-' in volid: - backup_type = 'lxc' - try: - vmid = volid.split('vzdump-lxc-')[1].split('-')[0] - except: - pass - - backups.append({ - 'volid': volid, - 'storage': storage_id, - 'vmid': vmid, - 'type': backup_type, - 'size': size, - 'size_human': format_bytes(size), - 'created': datetime.fromtimestamp(ctime).strftime('%Y-%m-%d %H:%M:%S'), - 'timestamp': ctime - }) - except Exception as e: - # print(f"Error getting content for storage {storage_id}: {e}") - pass - continue - except Exception as e: - # print(f"Error getting storage list: {e}") - pass - - # Sort by creation time (newest first) - backups.sort(key=lambda x: x['timestamp'], reverse=True) - - return jsonify({ - 'backups': backups, - 'total': len(backups) - }) - - except Exception as e: - # print(f"Error getting backups: {e}") - pass - return jsonify({ - 'error': str(e), - 'backups': [], - 'total': 0 - }) - -@app.route('/api/events', methods=['GET']) -def api_events(): - """Get Proxmox events and tasks""" - try: - limit = request.args.get('limit', '50') - events = [] - - try: - result = subprocess.run(['pvesh', 'get', '/cluster/tasks', '--output-format', 'json'], - capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - tasks = json.loads(result.stdout) - - for task in tasks[:int(limit)]: - upid = task.get('upid', '') - task_type = task.get('type', 'unknown') - status = task.get('status', 'unknown') - node = task.get('node', 'unknown') - user = task.get('user', 'unknown') - vmid = task.get('id', '') - starttime = task.get('starttime', 0) - endtime = task.get('endtime', 0) - - # Calculate duration - duration = '' - if endtime and starttime: - duration_sec = endtime - starttime - if duration_sec < 60: - duration = f"{duration_sec}s" - elif duration_sec < 3600: - duration = f"{duration_sec // 60}m {duration_sec % 60}s" - else: - hours = duration_sec // 3600 - minutes = (duration_sec % 3600) // 60 - duration = f"{hours}h {minutes}m" - - # Determine level based on status - level = 'info' - if status == 'OK': - level = 'info' - elif status in ['stopped', 'error']: - level = 'error' - elif status == 'running': - level = 'warning' - - events.append({ - 'upid': upid, - 'type': task_type, - 'status': status, - 'level': level, - 'node': node, - 'user': user, - 'vmid': str(vmid) if vmid else '', - 'starttime': datetime.fromtimestamp(starttime).strftime('%Y-%m-%d %H:%M:%S') if starttime else '', - 'endtime': datetime.fromtimestamp(endtime).strftime('%Y-%m-%d %H:%M:%S') if endtime else 'Running', - 'duration': duration - }) - except Exception as e: - # print(f"Error getting events: {e}") - pass - - return jsonify({ - 'events': events, - 'total': len(events) - }) - - except Exception as e: - # print(f"Error getting events: {e}") - pass - return jsonify({ - 'error': str(e), - 'events': [], - 'total': 0 - }) - -@app.route('/api/notifications', methods=['GET']) -def api_notifications(): - """Get Proxmox notification history""" - try: - notifications = [] - - # 1. Get notifications from journalctl (Proxmox notification service) - try: - cmd = [ - 'journalctl', - '-u', 'pve-ha-lrm', - '-u', 'pve-ha-crm', - '-u', 'pvedaemon', - '-u', 'pveproxy', - '-u', 'pvestatd', - '--grep', 'notification|email|webhook|alert|notify', - '-n', '100', - '--output', 'json', - '--no-pager' - ] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - for line in result.stdout.strip().split('\n'): - if line: - try: - log_entry = json.loads(line) - timestamp_us = int(log_entry.get('__REALTIME_TIMESTAMP', '0')) - timestamp = datetime.fromtimestamp(timestamp_us / 1000000).strftime('%Y-%m-%d %H:%M:%S') - - message = log_entry.get('MESSAGE', '') - - # Determine notification type from message - notif_type = 'info' - if 'email' in message.lower(): - notif_type = 'email' - elif 'webhook' in message.lower(): - notif_type = 'webhook' - elif 'alert' in message.lower() or 'warning' in message.lower(): - notif_type = 'alert' - elif 'error' in message.lower() or 'fail' in message.lower(): - notif_type = 'error' - - notifications.append({ - 'timestamp': timestamp, - 'type': notif_type, - 'service': log_entry.get('_SYSTEMD_UNIT', 'proxmox'), - 'message': message, - 'source': 'journal' - }) - except (json.JSONDecodeError, ValueError): - continue - except Exception as e: - # print(f"Error reading notification logs: {e}") - pass - - # 2. Try to read Proxmox notification configuration - try: - notif_config_path = '/etc/pve/notifications.cfg' - if os.path.exists(notif_config_path): - with open(notif_config_path, 'r') as f: - config_content = f.read() - # Parse notification targets (emails, webhooks, etc.) - for line in config_content.split('\n'): - if line.strip() and not line.startswith('#'): - notifications.append({ - 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'type': 'config', - 'service': 'notification-config', - 'message': f'Notification target configured: {line.strip()}', - 'source': 'config' - }) - except Exception as e: - # print(f"Error reading notification config: {e}") - pass - - # 3. Get backup notifications from task log - try: - cmd = ['pvesh', 'get', '/cluster/tasks', '--output-format', 'json'] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - tasks = json.loads(result.stdout) - for task in tasks: - if task.get('type') in ['vzdump', 'backup']: - status = task.get('status', 'unknown') - notif_type = 'success' if status == 'OK' else 'error' if status == 'stopped' else 'info' - - notifications.append({ - 'timestamp': datetime.fromtimestamp(task.get('starttime', 0)).strftime('%Y-%m-%d %H:%M:%S'), - 'type': notif_type, - 'service': 'backup', - 'message': f"Backup task {task.get('upid', 'unknown')}: {status}", - 'source': 'task-log' - }) - except Exception as e: - # print(f"Error reading task notifications: {e}") - pass - - # Sort by timestamp (newest first) - notifications.sort(key=lambda x: x['timestamp'], reverse=True) - - return jsonify({ - 'notifications': notifications[:100], # Limit to 100 most recent - 'total': len(notifications) - }) - - except Exception as e: - # print(f"Error getting notifications: {e}") - pass - return jsonify({ - 'error': str(e), - 'notifications': [], - 'total': 0 - }) - -@app.route('/api/notifications/download', methods=['GET']) -def api_notifications_download(): - """Download complete log for a specific notification""" - try: - timestamp = request.args.get('timestamp', '') - - if not timestamp: - return jsonify({'error': 'Timestamp parameter required'}), 400 - - from datetime import datetime, timedelta - - try: - # Parse timestamp format: "2025-10-11 14:27:35" - dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") - # Use a very small time window (2 minutes) to get just this notification - since_time = (dt - timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S") - until_time = (dt + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:%S") - except ValueError: - # If parsing fails, use a default range - since_time = "2 minutes ago" - until_time = "now" - - # Get logs around the specific timestamp - cmd = [ - 'journalctl', - '--since', since_time, - '--until', until_time, - '-n', '50', # Limit to 50 lines around the notification - '--no-pager' - ] - - result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - - if result.returncode == 0: - import tempfile - with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log') as f: - f.write(f"ProxMenux Log ({log_type}, since {since_days if since_days else f'{hours}h'}) - Generated: {datetime.now().isoformat()}\n") - f.write("=" * 80 + "\n\n") - f.write(result.stdout) - temp_path = f.name - - return send_file( - temp_path, - mimetype='text/plain', - as_attachment=True, - download_name=f'notification_{timestamp.replace(":", "_").replace(" ", "_")}.log' - ) - else: - return jsonify({'error': 'Failed to generate log file'}), 500 - - except Exception as e: - # print(f"Error downloading logs: {e}") - pass - return jsonify({'error': str(e)}), 500 - -@app.route('/api/backups', methods=['GET']) -def api_backups(): - """Get list of all backup files from Proxmox storage""" - try: - backups = [] - - # Get list of storage locations - try: - result = subprocess.run(['pvesh', 'get', '/storage', '--output-format', 'json'], - capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - storages = json.loads(result.stdout) - - # For each storage, get backup files - for storage in storages: - storage_id = storage.get('storage') - storage_type = storage.get('type') - - # Only check storages that can contain backups - if storage_type in ['dir', 'nfs', 'cifs', 'pbs']: - try: - # Get content of storage - content_result = subprocess.run( - ['pvesh', 'get', f'/nodes/localhost/storage/{storage_id}/content', '--output-format', 'json'], - capture_output=True, text=True, timeout=10) - - if content_result.returncode == 0: - contents = json.loads(content_result.stdout) - - for item in contents: - if item.get('content') == 'backup': - # Parse backup information - volid = item.get('volid', '') - size = item.get('size', 0) - ctime = item.get('ctime', 0) - - # Extract VMID from volid (format: storage:backup/vzdump-qemu-100-...) - vmid = None - backup_type = None - if 'vzdump-qemu-' in volid: - backup_type = 'qemu' - try: - vmid = volid.split('vzdump-qemu-')[1].split('-')[0] - except: - pass - elif 'vzdump-lxc-' in volid: - backup_type = 'lxc' - try: - vmid = volid.split('vzdump-lxc-')[1].split('-')[0] - except: - pass - - backups.append({ - 'volid': volid, - 'storage': storage_id, - 'vmid': vmid, - 'type': backup_type, - 'size': size, - 'size_human': format_bytes(size), - 'created': datetime.fromtimestamp(ctime).strftime('%Y-%m-%d %H:%M:%S'), - 'timestamp': ctime - }) - except Exception as e: - # print(f"Error getting content for storage {storage_id}: {e}") - pass - continue - except Exception as e: - # print(f"Error getting storage list: {e}") - pass - - # Sort by creation time (newest first) - backups.sort(key=lambda x: x['timestamp'], reverse=True) - - return jsonify({ - 'backups': backups, - 'total': len(backups) - }) - - except Exception as e: - # print(f"Error getting backups: {e}") - pass - return jsonify({ - 'error': str(e), - 'backups': [], - 'total': 0 - }) - -@app.route('/api/events', methods=['GET']) -def api_events(): - """Get recent Proxmox events and tasks""" - try: - limit = request.args.get('limit', '50') - events = [] - - try: - result = subprocess.run(['pvesh', 'get', '/cluster/tasks', '--output-format', 'json'], - capture_output=True, text=True, timeout=10) - - if result.returncode == 0: - tasks = json.loads(result.stdout) - - for task in tasks[:int(limit)]: - upid = task.get('upid', '') - task_type = task.get('type', 'unknown') - status = task.get('status', 'unknown') - node = task.get('node', 'unknown') - user = task.get('user', 'unknown') - vmid = task.get('id', '') - starttime = task.get('starttime', 0) - endtime = task.get('endtime', 0) - - # Calculate duration - duration = '' - if endtime and starttime: - duration_sec = endtime - starttime - if duration_sec < 60: - duration = f"{duration_sec}s" - elif duration_sec < 3600: - duration = f"{duration_sec // 60}m {duration_sec % 60}s" - else: - hours = duration_sec // 3600 - minutes = (duration_sec % 3600) // 60 - duration = f"{hours}h {minutes}m" - - # Determine level based on status - level = 'info' - if status == 'OK': - level = 'info' - elif status in ['stopped', 'error']: - level = 'error' - elif status == 'running': - level = 'warning' - - events.append({ - 'upid': upid, - 'type': task_type, - 'status': status, - 'level': level, - 'node': node, - 'user': user, - 'vmid': str(vmid) if vmid else '', - 'starttime': datetime.fromtimestamp(starttime).strftime('%Y-%m-%d %H:%M:%S') if starttime else '', - 'endtime': datetime.fromtimestamp(endtime).strftime('%Y-%m-%d %H:%M:%S') if endtime else 'Running', - 'duration': duration - }) - except Exception as e: - # print(f"Error getting events: {e}") - pass - - return jsonify({ - 'events': events, - 'total': len(events) - }) - - except Exception as e: - # print(f"Error getting events: {e}") - pass - return jsonify({ - 'error': str(e), - 'events': [], - 'total': 0 - }) - @app.route('/api/task-log/') def get_task_log(upid): """Get complete task log from Proxmox using UPID"""