Refactor command system to be queue based

This commit is contained in:
Alex Wolden
2025-08-21 19:08:57 -07:00
parent e92eb273d8
commit b0dd9d1123
7 changed files with 203 additions and 31 deletions

View File

@@ -9,20 +9,22 @@ from meshcore.events import EventType
async def main(): async def main():
# Parse command line arguments # Parse command line arguments
parser = argparse.ArgumentParser(description='Get status from a repeater via serial connection') parser = argparse.ArgumentParser(description='Get status from a repeater via serial connection')
parser.add_argument('-p', '--port', required=True, help='Serial port') # parser.add_argument('-p', '--port', required=True, help='Serial port')
parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate') # parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate')
parser.add_argument('-r', '--repeater', required=True, help='Repeater name') parser.add_argument('-r', '--repeater', required=True, help='Repeater name')
parser.add_argument('-pw', '--password', required=True, help='Password for login') parser.add_argument('-pw', '--password', required=True, help='Password for login')
parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds') # parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds')
args = parser.parse_args() args = parser.parse_args()
# Connect to the device # Connect to the device
mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True) mc = await MeshCore.create_ble("lora-py-tester")
534463
try: try:
# Get contacts # Get contacts
await mc.ensure_contacts() result = await mc.commands.get_contacts()
repeater = mc.get_contact_by_name(args.repeater) print(result)
print(mc._contacts)
repeater = mc.get_contact_by_key_prefix(args.repeater)
if repeater is None: if repeater is None:
print(f"Repeater '{args.repeater}' not found in contacts.") print(f"Repeater '{args.repeater}' not found in contacts.")
@@ -35,14 +37,25 @@ async def main():
if login_event.type != EventType.ERROR: if login_event.type != EventType.ERROR:
print("Login successful") print("Login successful")
# Send status request # Continuously poll for telemetry every 60 seconds
print("Sending status request...") print("Starting continuous telemetry polling every 60 seconds...")
await mc.commands.send_telemetry_req(repeater) while True:
try:
# Send status request
print("Sending status request...")
await mc.commands.send_telemetry_req(repeater)
# Wait for status response # Wait for status response
telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=args.timeout) telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=10)
print(telemetry_event.payload["lpp"]) print(telemetry_event)
# Wait 60 seconds before next poll
await asyncio.sleep(60)
except Exception as e:
print(f"Error during telemetry poll: {e}")
# Wait before retrying
await asyncio.sleep(60)
else: else:
print("Login failed or timed out") print("Login failed or timed out")

View File

@@ -52,7 +52,7 @@ class BLEConnection:
if self.client: if self.client:
logger.debug("Using pre-configured BleakClient.") logger.debug("Using pre-configured BleakClient.")
assert isinstance(self.client, BleakClient) assert isinstance(self.client, BleakClient)
if client.is_connected : if self.client.is_connected :
logger.error("Client is already connected !!! weird") logger.error("Client is already connected !!! weird")
self.address = self.client.address self.address = self.client.address
return self.address return self.address

View File

@@ -1,6 +1,5 @@
import asyncio import asyncio
import logging import logging
import random
from typing import Any, Callable, Coroutine, Dict, List, Optional, Union from typing import Any, Callable, Coroutine, Dict, List, Optional, Union
from ..events import Event, EventDispatcher, EventType from ..events import Event, EventDispatcher, EventType
@@ -53,8 +52,9 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes
class CommandHandlerBase: class CommandHandlerBase:
DEFAULT_TIMEOUT = 5.0 DEFAULT_TIMEOUT = 5.0
MAX_QUEUE_SIZE = 100
def __init__(self, default_timeout: Optional[float] = None): def __init__(self, default_timeout: Optional[float] = None, max_queue_size: Optional[int] = None):
self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None
self._reader: Optional[MessageReader] = None self._reader: Optional[MessageReader] = None
self.dispatcher: Optional[EventDispatcher] = None self.dispatcher: Optional[EventDispatcher] = None
@@ -62,6 +62,12 @@ class CommandHandlerBase:
default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT
) )
max_size = max_queue_size if max_queue_size is not None else self.MAX_QUEUE_SIZE
self._command_queue = asyncio.Queue(maxsize=max_size)
self._start_lock = asyncio.Lock() # Only for start/stop operations
self._queue_processor_task: Optional[asyncio.Task] = None
self._is_running = False
def set_connection(self, connection: Any) -> None: def set_connection(self, connection: Any) -> None:
async def sender(data: bytes) -> None: async def sender(data: bytes) -> None:
await connection.send(data) await connection.send(data)
@@ -81,7 +87,48 @@ class CommandHandlerBase:
timeout: Optional[float] = None, timeout: Optional[float] = None,
) -> Event: ) -> Event:
""" """
Send a command and wait for expected event responses. Queue a command for execution and wait for the response.
Args:
data: The data to send
expected_events: EventType or list of EventTypes to wait for
timeout: Timeout in seconds, or None to use default_timeout
Returns:
Event: The full event object that was received in response to the command
Raises:
RuntimeError: If the command queue is full
"""
async with self._start_lock:
if not self._is_running:
await self._start_queue_processor()
future = asyncio.Future()
try:
await asyncio.wait_for(
self._command_queue.put((data, expected_events, timeout, future)),
timeout=1.0
)
except asyncio.TimeoutError:
future.set_exception(RuntimeError(
f"Command queue is full ({self._command_queue.maxsize} commands pending)"
))
except Exception as e:
future.set_exception(e)
return await future
async def _send_internal(
self,
data: bytes,
expected_events: Optional[Union[EventType, List[EventType]]] = None,
timeout: Optional[float] = None,
) -> Event:
"""
Internal method that does the actual sending and waiting for events.
This runs inside the queue processor with lock protection.
Args: Args:
data: The data to send data: The data to send
@@ -94,7 +141,6 @@ class CommandHandlerBase:
if not self.dispatcher: if not self.dispatcher:
raise RuntimeError("Dispatcher not set, cannot send commands") raise RuntimeError("Dispatcher not set, cannot send commands")
# Use the provided timeout or fall back to default_timeout
timeout = timeout if timeout is not None else self.default_timeout timeout = timeout if timeout is not None else self.default_timeout
if self._sender_func: if self._sender_func:
@@ -105,13 +151,11 @@ class CommandHandlerBase:
if expected_events: if expected_events:
try: try:
# Convert single event to list if needed
if not isinstance(expected_events, list): if not isinstance(expected_events, list):
expected_events = [expected_events] expected_events = [expected_events]
logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") logger.debug(f"Waiting for events {expected_events}, timeout={timeout}")
# Create futures for all expected events
futures = [] futures = []
for event_type in expected_events: for event_type in expected_events:
future = asyncio.create_task( future = asyncio.create_task(
@@ -119,22 +163,18 @@ class CommandHandlerBase:
) )
futures.append(future) futures.append(future)
# Wait for the first event to complete or all to timeout
done, pending = await asyncio.wait( done, pending = await asyncio.wait(
futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
) )
# Cancel all pending futures
for future in pending: for future in pending:
future.cancel() future.cancel()
# Check if any future completed successfully
for future in done: for future in done:
event = await future event = await future
if event: if event:
return event return event
# Create an error event when no event is received
return Event(EventType.ERROR, {"reason": "no_event_received"}) return Event(EventType.ERROR, {"reason": "no_event_received"})
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.debug(f"Command timed out {data}") logger.debug(f"Command timed out {data}")
@@ -142,5 +182,107 @@ class CommandHandlerBase:
except Exception as e: except Exception as e:
logger.debug(f"Command error: {e}") logger.debug(f"Command error: {e}")
return Event(EventType.ERROR, {"error": str(e)}) return Event(EventType.ERROR, {"error": str(e)})
# For commands that don't expect events, return a success event
return Event(EventType.OK, {}) return Event(EventType.OK, {})
async def start_queue_processor(self):
"""
Start the command queue processor.
This should be called once when the connection is established.
"""
async with self._start_lock:
if not self._is_running:
await self._start_queue_processor()
async def _start_queue_processor(self):
"""Internal method to start the background queue processor."""
if not self._queue_processor_task or self._queue_processor_task.done():
self._is_running = True
self._queue_processor_task = asyncio.create_task(self._process_queue())
logger.debug("Started command queue processor")
async def _process_queue(self):
"""Process commands from the queue sequentially."""
logger.debug("Command queue processor started")
while self._is_running:
try:
item = await self._command_queue.get()
# kill queue signal
if item is None:
logger.debug("Received shutdown sentinel")
break
data, expected_events, timeout, future = item
if future.cancelled():
continue
try:
logger.debug(f"Processing queued command: {data.hex() if isinstance(data, bytes) else data}")
result = await self._send_internal(data, expected_events, timeout)
if not future.cancelled():
future.set_result(result)
except Exception as e:
logger.error(f"Error processing command: {e}")
if not future.cancelled():
future.set_exception(e)
# Small delay between commands to avoid overwhelming the device
await asyncio.sleep(0.01)
except asyncio.CancelledError:
logger.debug("Queue processor cancelled")
break
except Exception as e:
logger.error(f"Queue processor error: {e}")
# Continue processing even if there was an error
logger.debug("Command queue processor stopped")
async def stop_queue_processor(self):
"""Stop the queue processor gracefully."""
logger.debug("Stopping command queue processor")
if not self._is_running:
return
self._is_running = False
try:
# send kill signal and wait for it to be processed
await asyncio.wait_for(self._command_queue.put(None), timeout=1.0)
except asyncio.TimeoutError:
logger.warning("Could not send shutdown sentinel (queue may be full)")
if self._queue_processor_task:
try:
await asyncio.wait_for(self._queue_processor_task, timeout=2.0)
except asyncio.TimeoutError:
logger.warning("Queue processor did not stop gracefully, cancelling")
self._queue_processor_task.cancel()
try:
await self._queue_processor_task
except asyncio.CancelledError:
pass
self._queue_processor_task = None
cancelled_count = 0
while not self._command_queue.empty():
try:
item = self._command_queue.get_nowait()
if item is None:
continue
if isinstance(item, tuple) and len(item) == 4:
_, _, _, future = item
if not future.cancelled():
future.cancel()
cancelled_count += 1
except Exception as e:
logger.debug(f"Error during cleanup: {e}")
break
if cancelled_count > 0:
logger.debug(f"Cancelled {cancelled_count} pending commands")
logger.debug("Command queue processor stopped")

View File

@@ -1,6 +1,9 @@
import logging import logging
from enum import Enum from enum import Enum
import json import json
from mailbox import Message
from meshcore.commands.messaging import MessagingCommands
from .base import CommandHandlerBase from .base import CommandHandlerBase
from ..events import EventType from ..events import EventType
from cayennelpp import LppFrame, LppData from cayennelpp import LppFrame, LppData
@@ -36,6 +39,9 @@ def lpp_parse_mma(buf):
i = i + 1 i = i + 1
type = buf[i] type = buf[i]
lpp_type = LppType.get_lpp_type(type) lpp_type = LppType.get_lpp_type(type)
if lpp_type is None:
logger.error(f"Unknown LPP type: {type}")
return None
size = lpp_type.size size = lpp_type.size
i = i + 1 i = i + 1
min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
@@ -68,7 +74,7 @@ def parse_acl(buf):
return res return res
class BinaryCommandHandler(CommandHandlerBase): class BinaryCommandHandler(MessagingCommands):
"""Helper functions to handle binary requests through binary commands""" """Helper functions to handle binary requests through binary commands"""
async def req_binary(self, contact, request, timeout=0): async def req_binary(self, contact, request, timeout=0):
@@ -82,6 +88,9 @@ class BinaryCommandHandler(CommandHandlerBase):
timeout = ( timeout = (
res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
) )
if self.dispatcher is None:
logger.error("No dispatcher set, cannot wait for response")
return None
res2 = await self.dispatcher.wait_for_event( res2 = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE, EventType.BINARY_RESPONSE,
attribute_filters={"tag": exp_tag}, attribute_filters={"tag": exp_tag},

View File

@@ -21,7 +21,7 @@ class ConnectionProtocol(Protocol):
"""Disconnect from the device/server.""" """Disconnect from the device/server."""
... ...
async def send(self, data): async def send(self, data) -> Any:
"""Send data through the connection.""" """Send data through the connection."""
... ...

View File

@@ -1,3 +1,4 @@
from collections.abc import Coroutine
from enum import Enum from enum import Enum
import inspect import inspect
import logging import logging
@@ -113,7 +114,7 @@ class EventDispatcher:
def subscribe( def subscribe(
self, self,
event_type: Union[EventType, None], event_type: Union[EventType, None],
callback: Callable[[Event], Union[None, asyncio.Future]], callback: Callable[[Event], Coroutine[Any, Any, None]],
attribute_filters: Optional[Dict[str, Any]] = None, attribute_filters: Optional[Dict[str, Any]] = None,
) -> Subscription: ) -> Subscription:
""" """
@@ -226,7 +227,7 @@ class EventDispatcher:
""" """
future = asyncio.Future() future = asyncio.Future()
def event_handler(event: Event): async def event_handler(event: Event):
if not future.done(): if not future.done():
future.set_result(event) future.set_result(event)

View File

@@ -162,6 +162,10 @@ class MeshCore:
result = await self.connection_manager.connect() result = await self.connection_manager.connect()
if result is None: if result is None:
raise ConnectionError("Failed to connect to device") raise ConnectionError("Failed to connect to device")
# Start the command queue processor after successful connection
await self.commands.start_queue_processor()
return await self.commands.send_appstart() return await self.commands.send_appstart()
async def disconnect(self): async def disconnect(self):
@@ -173,6 +177,9 @@ class MeshCore:
if hasattr(self, "_auto_fetch_subscription") and self._auto_fetch_subscription: if hasattr(self, "_auto_fetch_subscription") and self._auto_fetch_subscription:
await self.stop_auto_message_fetching() await self.stop_auto_message_fetching()
# Stop the command queue processor
await self.commands.stop_queue_processor()
# Disconnect the connection object # Disconnect the connection object
await self.connection_manager.disconnect() await self.connection_manager.disconnect()