update storage-overview.tsx

This commit is contained in:
MacRimi
2026-04-13 14:49:48 +02:00
parent 44aefb5d3b
commit 2344935357
10 changed files with 4798 additions and 884 deletions

View File

@@ -6353,14 +6353,103 @@ def api_proxmox_storage():
# ─── SMART Disk Testing API ───────────────────────────────────────────────────
SMART_DIR = '/usr/local/share/proxmenux/smart'
SMART_CONFIG_DIR = '/usr/local/share/proxmenux/smart/config'
DEFAULT_SMART_RETENTION = 10 # Keep last 10 JSON files per disk by default
def _is_nvme(disk_name):
"""Check if disk is NVMe (supports names like nvme0n1, nvme0n1p1)."""
return 'nvme' in disk_name
def _get_smart_json_path(disk_name):
"""Get path to SMART JSON file for a disk."""
return os.path.join(SMART_DIR, f"{disk_name}.json")
def _get_smart_disk_dir(disk_name):
"""Get directory path for a disk's SMART JSON files."""
return os.path.join(SMART_DIR, disk_name)
def _get_smart_json_path(disk_name, test_type='short'):
"""Get path to a new SMART JSON file for a disk with timestamp."""
disk_dir = _get_smart_disk_dir(disk_name)
os.makedirs(disk_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y-%m-%dT%H-%M-%S')
return os.path.join(disk_dir, f"{timestamp}_{test_type}.json")
def _get_latest_smart_json(disk_name):
"""Get the most recent SMART JSON file for a disk."""
disk_dir = _get_smart_disk_dir(disk_name)
if not os.path.exists(disk_dir):
return None
json_files = sorted(
[f for f in os.listdir(disk_dir) if f.endswith('.json')],
reverse=True # Most recent first (timestamp-based naming)
)
if json_files:
return os.path.join(disk_dir, json_files[0])
return None
def _get_smart_history(disk_name, limit=None):
"""Get list of all SMART JSON files for a disk, sorted by date (newest first)."""
disk_dir = _get_smart_disk_dir(disk_name)
if not os.path.exists(disk_dir):
return []
json_files = sorted(
[f for f in os.listdir(disk_dir) if f.endswith('.json')],
reverse=True
)
if limit:
json_files = json_files[:limit]
result = []
for filename in json_files:
# Parse timestamp and test type from filename: 2026-04-13T10-30-00_short.json
parts = filename.replace('.json', '').split('_')
if len(parts) >= 2:
timestamp_str = parts[0]
test_type = parts[1]
try:
# Convert back to readable format
dt = datetime.strptime(timestamp_str, '%Y-%m-%dT%H-%M-%S')
result.append({
'filename': filename,
'path': os.path.join(disk_dir, filename),
'timestamp': dt.isoformat(),
'test_type': test_type,
'date_readable': dt.strftime('%Y-%m-%d %H:%M:%S')
})
except ValueError:
# Filename doesn't match expected format, skip
pass
return result
def _cleanup_old_smart_jsons(disk_name, retention=None):
"""Remove old SMART JSON files, keeping only the most recent ones."""
if retention is None:
retention = DEFAULT_SMART_RETENTION
if retention <= 0: # 0 or negative means keep all
return 0
disk_dir = _get_smart_disk_dir(disk_name)
if not os.path.exists(disk_dir):
return 0
json_files = sorted(
[f for f in os.listdir(disk_dir) if f.endswith('.json')],
reverse=True # Most recent first
)
removed = 0
# Keep first 'retention' files, delete the rest
for old_file in json_files[retention:]:
try:
os.remove(os.path.join(disk_dir, old_file))
removed += 1
except Exception:
pass
return removed
def _ensure_smart_tools(install_if_missing=False):
"""Check if SMART tools are installed and optionally install them."""
@@ -6483,14 +6572,17 @@ def api_smart_status(disk_name):
result['error'] = 'smartmontools not installed'
return jsonify(result)
# Check for existing JSON file (from previous test)
json_path = _get_smart_json_path(disk_name)
if os.path.exists(json_path):
# Check for existing JSON file (from previous test) - get most recent
json_path = _get_latest_smart_json(disk_name)
if json_path and os.path.exists(json_path):
try:
with open(json_path, 'r') as f:
saved_data = json.load(f)
result['saved_data'] = saved_data
result['saved_timestamp'] = os.path.getmtime(json_path)
result['saved_path'] = json_path
# Get test history
result['test_history'] = _get_smart_history(disk_name, limit=10)
except (json.JSONDecodeError, IOError):
pass
@@ -6712,6 +6804,74 @@ def api_smart_status(disk_name):
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/<disk_name>/history', methods=['GET'])
@require_auth
def api_smart_history(disk_name):
"""Get SMART test history for a disk."""
try:
# Validate disk name (security)
if not re.match(r'^[a-zA-Z0-9]+$', disk_name):
return jsonify({'error': 'Invalid disk name'}), 400
limit = request.args.get('limit', 20, type=int)
history = _get_smart_history(disk_name, limit=limit)
return jsonify({
'disk': disk_name,
'history': history,
'total': len(history)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/<disk_name>/latest', methods=['GET'])
@require_auth
def api_smart_latest(disk_name):
"""Get the most recent SMART JSON data for a disk."""
try:
# Validate disk name (security)
if not re.match(r'^[a-zA-Z0-9]+$', disk_name):
return jsonify({'error': 'Invalid disk name'}), 400
json_path = _get_latest_smart_json(disk_name)
if not json_path or not os.path.exists(json_path):
return jsonify({
'disk': disk_name,
'has_data': False,
'message': 'No SMART test data available. Run a SMART test first.'
})
with open(json_path, 'r') as f:
smart_data = json.load(f)
# Extract timestamp from filename
filename = os.path.basename(json_path)
parts = filename.replace('.json', '').split('_')
timestamp = None
test_type = 'unknown'
if len(parts) >= 2:
try:
dt = datetime.strptime(parts[0], '%Y-%m-%dT%H-%M-%S')
timestamp = dt.isoformat()
test_type = parts[1]
except ValueError:
pass
return jsonify({
'disk': disk_name,
'has_data': True,
'data': smart_data,
'timestamp': timestamp,
'test_type': test_type,
'path': json_path
})
except json.JSONDecodeError:
return jsonify({'error': 'Invalid JSON data in saved file'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/<disk_name>/test', methods=['POST'])
@require_auth
def api_smart_run_test(disk_name):
@@ -6736,9 +6896,12 @@ def api_smart_run_test(disk_name):
# Check tools and auto-install if missing
tools = _ensure_smart_tools(install_if_missing=True)
# Ensure SMART directory exists
# Ensure SMART directory exists and get path for new JSON file
os.makedirs(SMART_DIR, exist_ok=True)
json_path = _get_smart_json_path(disk_name)
json_path = _get_smart_json_path(disk_name, test_type)
# Cleanup old JSON files based on retention policy
_cleanup_old_smart_jsons(disk_name)
if is_nvme:
if not tools['nvme']:
@@ -6832,7 +6995,7 @@ def api_smart_run_test(disk_name):
while smartctl -c {device} 2>/dev/null | grep -qiE 'Self-test routine in progress|[1-9][0-9]?% of test remaining'; do
sleep {sleep_interval}
done
smartctl --json=c {device} > {json_path} 2>/dev/null
smartctl -a --json=c {device} > {json_path} 2>/dev/null
''',
shell=True, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
@@ -6850,6 +7013,189 @@ def api_smart_run_test(disk_name):
return jsonify({'error': str(e)}), 500
# ─── SMART Schedule API ───────────────────────────────────────────────────────
SMART_SCHEDULE_FILE = os.path.join(SMART_CONFIG_DIR, 'smart-schedule.json')
SMART_CRON_FILE = '/etc/cron.d/proxmenux-smart'
def _load_smart_schedules():
"""Load SMART test schedules from config file."""
os.makedirs(SMART_CONFIG_DIR, exist_ok=True)
if os.path.exists(SMART_SCHEDULE_FILE):
try:
with open(SMART_SCHEDULE_FILE, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {'enabled': True, 'schedules': []}
def _save_smart_schedules(config):
"""Save SMART test schedules to config file."""
os.makedirs(SMART_CONFIG_DIR, exist_ok=True)
with open(SMART_SCHEDULE_FILE, 'w') as f:
json.dump(config, f, indent=2)
def _update_smart_cron():
"""Update cron file based on current schedules."""
config = _load_smart_schedules()
if not config.get('enabled') or not config.get('schedules'):
# Remove cron file if disabled or no schedules
if os.path.exists(SMART_CRON_FILE):
os.remove(SMART_CRON_FILE)
return
cron_lines = [
'# ProxMenux SMART Scheduled Tests',
'# Auto-generated - do not edit manually',
'SHELL=/bin/bash',
'PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin',
''
]
for schedule in config['schedules']:
if not schedule.get('active', True):
continue
schedule_id = schedule.get('id', 'unknown')
hour = schedule.get('hour', 3)
minute = schedule.get('minute', 0)
frequency = schedule.get('frequency', 'weekly')
# Build cron time specification
if frequency == 'daily':
cron_time = f'{minute} {hour} * * *'
elif frequency == 'weekly':
dow = schedule.get('day_of_week', 0) # 0=Sunday
cron_time = f'{minute} {hour} * * {dow}'
elif frequency == 'monthly':
dom = schedule.get('day_of_month', 1)
cron_time = f'{minute} {hour} {dom} * *'
else:
continue
# Build command
disks = schedule.get('disks', ['all'])
test_type = schedule.get('test_type', 'short')
retention = schedule.get('retention', 10)
cmd = f'/usr/local/share/proxmenux/scripts/smart-scheduled-test.sh --schedule-id {schedule_id} --test-type {test_type} --retention {retention}'
if disks != ['all']:
cmd += f" --disks '{','.join(disks)}'"
cron_lines.append(f'{cron_time} root {cmd} >> /var/log/proxmenux/smart-schedule.log 2>&1')
cron_lines.append('') # Empty line at end
with open(SMART_CRON_FILE, 'w') as f:
f.write('\n'.join(cron_lines))
# Set proper permissions
os.chmod(SMART_CRON_FILE, 0o644)
@app.route('/api/storage/smart/schedules', methods=['GET'])
@require_auth
def api_smart_schedules_list():
"""Get all SMART test schedules."""
config = _load_smart_schedules()
return jsonify(config)
@app.route('/api/storage/smart/schedules', methods=['POST'])
@require_auth
def api_smart_schedules_create():
"""Create or update a SMART test schedule."""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
config = _load_smart_schedules()
# Generate ID if not provided
schedule_id = data.get('id') or f"schedule-{datetime.now().strftime('%Y%m%d%H%M%S')}"
data['id'] = schedule_id
# Set defaults
data.setdefault('active', True)
data.setdefault('test_type', 'short')
data.setdefault('frequency', 'weekly')
data.setdefault('hour', 3)
data.setdefault('minute', 0)
data.setdefault('day_of_week', 0)
data.setdefault('day_of_month', 1)
data.setdefault('disks', ['all'])
data.setdefault('retention', 10)
data.setdefault('notify_on_complete', True)
data.setdefault('notify_only_on_failure', False)
# Update existing or add new
existing_idx = next((i for i, s in enumerate(config['schedules']) if s['id'] == schedule_id), None)
if existing_idx is not None:
config['schedules'][existing_idx] = data
else:
config['schedules'].append(data)
_save_smart_schedules(config)
_update_smart_cron()
return jsonify({
'success': True,
'schedule': data,
'message': 'Schedule saved successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/schedules/<schedule_id>', methods=['DELETE'])
@require_auth
def api_smart_schedules_delete(schedule_id):
"""Delete a SMART test schedule."""
try:
config = _load_smart_schedules()
original_len = len(config['schedules'])
config['schedules'] = [s for s in config['schedules'] if s['id'] != schedule_id]
if len(config['schedules']) == original_len:
return jsonify({'error': 'Schedule not found'}), 404
_save_smart_schedules(config)
_update_smart_cron()
return jsonify({
'success': True,
'message': 'Schedule deleted successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/schedules/toggle', methods=['POST'])
@require_auth
def api_smart_schedules_toggle():
"""Enable or disable all SMART test schedules."""
try:
data = request.get_json() or {}
enabled = data.get('enabled', True)
config = _load_smart_schedules()
config['enabled'] = enabled
_save_smart_schedules(config)
_update_smart_cron()
return jsonify({
'success': True,
'enabled': enabled,
'message': f'SMART schedules {"enabled" if enabled else "disabled"}'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/storage/smart/tools', methods=['GET'])
@require_auth
def api_smart_tools_status():