Files
ProxMenux/AppImage/scripts/flask_terminal_routes.py
2025-12-06 12:06:12 +01:00

424 lines
14 KiB
Python

#!/usr/bin/env python3
"""
ProxMenux Terminal WebSocket Routes
Provides a WebSocket endpoint for interactive terminal sessions
"""
from flask import Blueprint, jsonify, request
from flask_sock import Sock
import subprocess
import os
import pty
import select
import struct
import fcntl
import termios
import threading
import time
import requests
import json
terminal_bp = Blueprint('terminal', __name__)
sock = Sock()
# Active terminal sessions
active_sessions = {}
@terminal_bp.route('/api/terminal/health', methods=['GET'])
def terminal_health():
"""Health check for terminal service"""
return {'success': True, 'active_sessions': len(active_sessions)}
@terminal_bp.route('/api/terminal/search-command', methods=['GET'])
def search_command():
"""Proxy endpoint for cheat.sh API to avoid CORS issues"""
query = request.args.get('q', '')
if not query or len(query) < 2:
return jsonify({'error': 'Query too short'}), 400
try:
url = f'https://cht.sh/{query.replace(" ", "+")}?QT'
headers = {
'User-Agent': 'curl/7.68.0'
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
content = response.text
examples = []
current_description = []
for line in content.split('\n'):
stripped = line.strip()
# Ignorar líneas vacías
if not stripped:
continue
# Si es un comentario
if stripped.startswith('#'):
# Acumular descripciones
current_description.append(stripped[1:].strip())
# Si no es comentario, es un comando
elif stripped and not stripped.startswith('http'):
# Unir las descripciones acumuladas
description = ' '.join(current_description) if current_description else ''
examples.append({
'description': description,
'command': stripped
})
# Resetear descripciones para el siguiente comando
current_description = []
return jsonify({
'success': True,
'examples': examples
})
else:
return jsonify({
'success': False,
'error': f'API returned status {response.status_code}'
}), response.status_code
except requests.Timeout:
return jsonify({
'success': False,
'error': 'Request timeout'
}), 504
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
def set_winsize(fd, rows, cols):
"""Set terminal window size"""
try:
winsize = struct.pack('HHHH', rows, cols, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
except Exception as e:
print(f"Error setting window size: {e}")
def read_and_forward_output(master_fd, ws):
"""Read from PTY and send to WebSocket"""
while True:
try:
# Use select with timeout to check if data is available
r, _, _ = select.select([master_fd], [], [], 0.01)
if master_fd in r:
try:
data = os.read(master_fd, 4096)
if data:
ws.send(data.decode('utf-8', errors='ignore'))
else:
break
except OSError:
break
except Exception as e:
print(f"Error reading from PTY: {e}")
break
@sock.route('/ws/terminal')
def terminal_websocket(ws):
"""WebSocket endpoint for terminal sessions"""
# Create pseudo-terminal
master_fd, slave_fd = pty.openpty()
# Start bash process
shell_process = subprocess.Popen(
['/bin/bash', '-i'],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
preexec_fn=os.setsid,
cwd='/',
env=dict(os.environ, TERM='xterm-256color', PS1='\\u@\\h:\\w\\$ ')
)
session_id = id(ws)
active_sessions[session_id] = {
'process': shell_process,
'master_fd': master_fd
}
# Set non-blocking mode for master_fd
flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Set initial terminal size
set_winsize(master_fd, 30, 120)
# Start thread to read PTY output and forward to WebSocket
output_thread = threading.Thread(
target=read_and_forward_output,
args=(master_fd, ws),
daemon=True
)
output_thread.start()
try:
while True:
# Receive data from WebSocket (blocking)
data = ws.receive(timeout=None)
if data is None:
# Client closed connection
break
handled = False
# Try to handle JSON control messages (e.g. resize)
if isinstance(data, str):
try:
msg = json.loads(data)
except Exception:
msg = None
if isinstance(msg, dict) and msg.get('type') == 'resize':
cols = int(msg.get('cols', 120))
rows = int(msg.get('rows', 30))
set_winsize(master_fd, rows, cols)
handled = True
if handled:
# Control message processed, do not send to bash
continue
# Optional: legacy resize escape sequence support
if isinstance(data, str) and data.startswith('\x1b[8;'):
try:
parts = data[4:-1].split(';')
rows, cols = int(parts[0]), int(parts[1])
set_winsize(master_fd, rows, cols)
continue
except Exception:
pass
# Send input to bash
try:
os.write(master_fd, data.encode('utf-8'))
except OSError as e:
print(f"Error writing to PTY: {e}")
break
# Check if process is still alive
if shell_process.poll() is not None:
break
except Exception as e:
print(f"Terminal session error: {e}")
finally:
# Cleanup
try:
shell_process.terminate()
shell_process.wait(timeout=1)
except:
try:
shell_process.kill()
except:
pass
try:
os.close(master_fd)
except:
pass
try:
os.close(slave_fd)
except:
pass
if session_id in active_sessions:
del active_sessions[session_id]
@sock.route('/ws/script/<session_id>')
def script_websocket(ws, session_id):
"""WebSocket endpoint for executing scripts with hybrid web mode"""
print(f"[DEBUG] Script WebSocket connected for session: {session_id}")
try:
print(f"[DEBUG] Waiting for init data...")
init_data = ws.receive(timeout=10)
print(f"[DEBUG] Received init data: {init_data}")
if not init_data:
error_msg = '{"type": "error", "message": "No script data received"}\r\n'
print(f"[DEBUG] Error: No init data received")
ws.send(error_msg)
return
script_data = json.loads(init_data)
print(f"[DEBUG] Parsed script data: {script_data}")
script_path = script_data.get('script_path')
params = script_data.get('params', {})
print(f"[DEBUG] Script path: {script_path}")
print(f"[DEBUG] Params: {params}")
if not script_path:
error_msg = '{"type": "error", "message": "No script_path provided"}\r\n'
print(f"[DEBUG] Error: No script_path in data")
ws.send(error_msg)
return
if not os.path.exists(script_path):
error_msg = f'{{"type": "error", "message": "Script not found: {script_path}"}}\r\n'
print(f"[DEBUG] Error: Script file not found: {script_path}")
ws.send(error_msg)
return
print(f"[DEBUG] Script file exists, starting execution...")
except Exception as e:
error_msg = f'{{"type": "error", "message": "Invalid init data: {str(e)}"}}\r\n'
print(f"[DEBUG] Exception parsing init data: {e}")
ws.send(error_msg)
return
# Create pseudo-terminal for script execution
master_fd, slave_fd = pty.openpty()
print(f"[DEBUG] Created PTY: master_fd={master_fd}, slave_fd={slave_fd}")
env = os.environ.copy()
for key, value in params.items():
env[key] = str(value)
# Force unbuffered output
env['PYTHONUNBUFFERED'] = '1'
env['TERM'] = 'xterm-256color'
# Add stdbuf to force unbuffered output for the script
print(f"[DEBUG] Starting script process with unbuffered output: {script_path}")
script_process = subprocess.Popen(
['script', '-qefc', f'/bin/bash {script_path}', '/dev/null'],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
preexec_fn=os.setsid,
env=env
)
print(f"[DEBUG] Script process started with PID: {script_process.pid}")
# Set non-blocking mode for master_fd
flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Set terminal size
set_winsize(master_fd, 30, 120)
# Thread to read script output and forward to WebSocket
def read_script_output():
print(f"[DEBUG] Output reader thread started")
while True:
try:
r, _, _ = select.select([master_fd], [], [], 0.01)
if master_fd in r:
try:
data = os.read(master_fd, 4096)
if not data:
print(f"[DEBUG] No more data from script")
break
text = data.decode('utf-8', errors='ignore')
print(f"[DEBUG] Read {len(data)} bytes ({len(text)} chars) from script: {text[:100]}")
# Send raw text to terminal (TerminalPanel expects plain text)
try:
ws.send(text)
print(f"[DEBUG] Sent {len(text)} chars to WebSocket")
except Exception as e:
print(f"[DEBUG] Error sending to WebSocket: {e}")
break
except OSError as e:
print(f"[DEBUG] OSError reading from PTY: {e}")
break
except Exception as e:
print(f"[DEBUG] Error reading script output: {e}")
break
script_process.wait()
exit_code = script_process.returncode if script_process.returncode is not None else 0
print(f"[DEBUG] Script exited with code: {exit_code}")
# Only send exit message if WebSocket is still open
try:
ws.send(f'\r\n[Script exited with code {exit_code}]\r\n')
except Exception as e:
print(f"[DEBUG] Could not send exit message: {e}")
output_thread = threading.Thread(target=read_script_output, daemon=True)
output_thread.start()
print(f"[DEBUG] Output thread started")
try:
while True:
# Receive user input or interaction responses
data = ws.receive(timeout=None)
if data is None:
print(f"[DEBUG] WebSocket closed by client")
break
print(f"[DEBUG] Received from client: {data[:100] if len(data) > 100 else data}")
try:
msg = json.loads(data)
# Handle resize
if msg.get('type') == 'resize':
cols = int(msg.get('cols', 120))
rows = int(msg.get('rows', 30))
set_winsize(master_fd, rows, cols)
print(f"[DEBUG] Resized terminal to {cols}x{rows}")
continue
except json.JSONDecodeError:
# Raw text input, send to script
try:
os.write(master_fd, data.encode('utf-8'))
print(f"[DEBUG] Sent input to script")
except OSError as e:
print(f"[DEBUG] Error writing to script: {e}")
break
# Check if process is still alive
if script_process.poll() is not None:
print(f"[DEBUG] Script process terminated")
break
except Exception as e:
print(f"[DEBUG] Script session error: {e}")
finally:
print(f"[DEBUG] Cleaning up script session")
# Cleanup
try:
script_process.terminate()
script_process.wait(timeout=1)
except:
try:
script_process.kill()
except:
pass
try:
os.close(master_fd)
except:
pass
try:
os.close(slave_fd)
except:
pass
def init_terminal_routes(app):
"""Initialize terminal routes with Flask app"""
sock.init_app(app)
app.register_blueprint(terminal_bp)