mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-06-13 20:56:53 +00:00
Revert "Refactor command system to be queue based"
This reverts commit 28957a4b60.
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
from typing import Any, Callable, Coroutine, Dict, List, Optional, Union
|
||||
|
||||
from meshcore.packets import BinaryReqType
|
||||
|
||||
from ..events import Event, EventDispatcher, EventType
|
||||
from ..reader import MessageReader
|
||||
|
||||
@@ -52,21 +55,14 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes
|
||||
|
||||
class CommandHandlerBase:
|
||||
DEFAULT_TIMEOUT = 5.0
|
||||
MAX_QUEUE_SIZE = 100
|
||||
|
||||
def __init__(self, default_timeout: Optional[float] = None, max_queue_size: Optional[int] = None):
|
||||
def __init__(self, default_timeout: Optional[float] = None):
|
||||
self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None
|
||||
self._reader: Optional[MessageReader] = None
|
||||
self.dispatcher: Optional[EventDispatcher] = None
|
||||
self.default_timeout = (
|
||||
default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT
|
||||
)
|
||||
|
||||
max_size = max_queue_size if max_queue_size is not None else self.MAX_QUEUE_SIZE
|
||||
self._command_queue = asyncio.Queue(maxsize=max_size)
|
||||
self._start_lock = asyncio.Lock() # Only for start/stop operations
|
||||
self._queue_processor_task: Optional[asyncio.Task] = None
|
||||
self._is_running = False
|
||||
|
||||
def set_connection(self, connection: Any) -> None:
|
||||
async def sender(data: bytes) -> None:
|
||||
@@ -87,48 +83,7 @@ class CommandHandlerBase:
|
||||
timeout: Optional[float] = None,
|
||||
) -> Event:
|
||||
"""
|
||||
Queue a command for execution and wait for the response.
|
||||
|
||||
Args:
|
||||
data: The data to send
|
||||
expected_events: EventType or list of EventTypes to wait for
|
||||
timeout: Timeout in seconds, or None to use default_timeout
|
||||
|
||||
Returns:
|
||||
Event: The full event object that was received in response to the command
|
||||
|
||||
Raises:
|
||||
RuntimeError: If the command queue is full
|
||||
"""
|
||||
async with self._start_lock:
|
||||
if not self._is_running:
|
||||
await self._start_queue_processor()
|
||||
|
||||
future = asyncio.Future()
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._command_queue.put((data, expected_events, timeout, future)),
|
||||
timeout=1.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
future.set_exception(RuntimeError(
|
||||
f"Command queue is full ({self._command_queue.maxsize} commands pending)"
|
||||
))
|
||||
except Exception as e:
|
||||
future.set_exception(e)
|
||||
|
||||
return await future
|
||||
|
||||
async def _send_internal(
|
||||
self,
|
||||
data: bytes,
|
||||
expected_events: Optional[Union[EventType, List[EventType]]] = None,
|
||||
timeout: Optional[float] = None,
|
||||
) -> Event:
|
||||
"""
|
||||
Internal method that does the actual sending and waiting for events.
|
||||
This runs inside the queue processor with lock protection.
|
||||
Send a command and wait for expected event responses.
|
||||
|
||||
Args:
|
||||
data: The data to send
|
||||
@@ -141,6 +96,7 @@ class CommandHandlerBase:
|
||||
if not self.dispatcher:
|
||||
raise RuntimeError("Dispatcher not set, cannot send commands")
|
||||
|
||||
# Use the provided timeout or fall back to default_timeout
|
||||
timeout = timeout if timeout is not None else self.default_timeout
|
||||
|
||||
if self._sender_func:
|
||||
@@ -151,11 +107,13 @@ class CommandHandlerBase:
|
||||
|
||||
if expected_events:
|
||||
try:
|
||||
# Convert single event to list if needed
|
||||
if not isinstance(expected_events, list):
|
||||
expected_events = [expected_events]
|
||||
|
||||
logger.debug(f"Waiting for events {expected_events}, timeout={timeout}")
|
||||
|
||||
# Create futures for all expected events
|
||||
futures = []
|
||||
for event_type in expected_events:
|
||||
future = asyncio.create_task(
|
||||
@@ -163,18 +121,22 @@ class CommandHandlerBase:
|
||||
)
|
||||
futures.append(future)
|
||||
|
||||
# Wait for the first event to complete or all to timeout
|
||||
done, pending = await asyncio.wait(
|
||||
futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
|
||||
# Cancel all pending futures
|
||||
for future in pending:
|
||||
future.cancel()
|
||||
|
||||
# Check if any future completed successfully
|
||||
for future in done:
|
||||
event = await future
|
||||
if event:
|
||||
return event
|
||||
|
||||
# Create an error event when no event is received
|
||||
return Event(EventType.ERROR, {"reason": "no_event_received"})
|
||||
except asyncio.TimeoutError:
|
||||
logger.debug(f"Command timed out {data}")
|
||||
@@ -182,107 +144,26 @@ class CommandHandlerBase:
|
||||
except Exception as e:
|
||||
logger.debug(f"Command error: {e}")
|
||||
return Event(EventType.ERROR, {"error": str(e)})
|
||||
# For commands that don't expect events, return a success event
|
||||
return Event(EventType.OK, {})
|
||||
|
||||
async def start_queue_processor(self):
|
||||
"""
|
||||
Start the command queue processor.
|
||||
This should be called once when the connection is established.
|
||||
"""
|
||||
async with self._start_lock:
|
||||
if not self._is_running:
|
||||
await self._start_queue_processor()
|
||||
|
||||
async def _start_queue_processor(self):
|
||||
"""Internal method to start the background queue processor."""
|
||||
if not self._queue_processor_task or self._queue_processor_task.done():
|
||||
self._is_running = True
|
||||
self._queue_processor_task = asyncio.create_task(self._process_queue())
|
||||
logger.debug("Started command queue processor")
|
||||
# attached at base because its a common method
|
||||
async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, timeout=None) -> Event:
|
||||
dst_bytes = _validate_destination(dst, prefix_length=32)
|
||||
pubkey_prefix = _validate_destination(dst, prefix_length=6)
|
||||
logger.debug(f"Binary request to {dst_bytes.hex()}")
|
||||
data = b"\x32" + dst_bytes + request_type.value.to_bytes(1, "little", signed=False) + (data if data else b"")
|
||||
|
||||
async def _process_queue(self):
|
||||
"""Process commands from the queue sequentially."""
|
||||
logger.debug("Command queue processor started")
|
||||
while self._is_running:
|
||||
try:
|
||||
item = await self._command_queue.get()
|
||||
|
||||
# kill queue signal
|
||||
if item is None:
|
||||
logger.debug("Received shutdown sentinel")
|
||||
break
|
||||
|
||||
data, expected_events, timeout, future = item
|
||||
|
||||
if future.cancelled():
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.debug(f"Processing queued command: {data.hex() if isinstance(data, bytes) else data}")
|
||||
result = await self._send_internal(data, expected_events, timeout)
|
||||
|
||||
if not future.cancelled():
|
||||
future.set_result(result)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing command: {e}")
|
||||
if not future.cancelled():
|
||||
future.set_exception(e)
|
||||
|
||||
# Small delay between commands to avoid overwhelming the device
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Queue processor cancelled")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Queue processor error: {e}")
|
||||
# Continue processing even if there was an error
|
||||
result = await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
|
||||
|
||||
logger.debug("Command queue processor stopped")
|
||||
|
||||
async def stop_queue_processor(self):
|
||||
"""Stop the queue processor gracefully."""
|
||||
logger.debug("Stopping command queue processor")
|
||||
|
||||
if not self._is_running:
|
||||
return
|
||||
# Register the request with the reader if we have both reader and request_type
|
||||
if (result.type == EventType.MSG_SENT and
|
||||
self._reader is not None and
|
||||
request_type is not None):
|
||||
|
||||
self._is_running = False
|
||||
|
||||
try:
|
||||
# send kill signal and wait for it to be processed
|
||||
await asyncio.wait_for(self._command_queue.put(None), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Could not send shutdown sentinel (queue may be full)")
|
||||
|
||||
if self._queue_processor_task:
|
||||
try:
|
||||
await asyncio.wait_for(self._queue_processor_task, timeout=2.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Queue processor did not stop gracefully, cancelling")
|
||||
self._queue_processor_task.cancel()
|
||||
try:
|
||||
await self._queue_processor_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._queue_processor_task = None
|
||||
|
||||
cancelled_count = 0
|
||||
while not self._command_queue.empty():
|
||||
try:
|
||||
item = self._command_queue.get_nowait()
|
||||
if item is None:
|
||||
continue
|
||||
if isinstance(item, tuple) and len(item) == 4:
|
||||
_, _, _, future = item
|
||||
if not future.cancelled():
|
||||
future.cancel()
|
||||
cancelled_count += 1
|
||||
except Exception as e:
|
||||
logger.debug(f"Error during cleanup: {e}")
|
||||
break
|
||||
|
||||
if cancelled_count > 0:
|
||||
logger.debug(f"Cancelled {cancelled_count} pending commands")
|
||||
|
||||
logger.debug("Command queue processor stopped")
|
||||
exp_tag = result.payload["expected_ack"].hex()
|
||||
# Use provided timeout or fallback to suggested timeout (with 5s default)
|
||||
actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0
|
||||
self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout)
|
||||
|
||||
return result
|
||||
@@ -1,20 +1,21 @@
|
||||
import logging
|
||||
from mailbox import Message
|
||||
|
||||
from meshcore.commands.messaging import MessagingCommands
|
||||
from .base import CommandHandlerBase
|
||||
from ..events import EventType
|
||||
from ..binary_parsing import BinaryReqType, lpp_parse, lpp_parse_mma, parse_acl
|
||||
from ..packets import BinaryReqType
|
||||
|
||||
logger = logging.getLogger("meshcore")
|
||||
|
||||
|
||||
class BinaryCommandHandler(MessagingCommands):
|
||||
class BinaryCommandHandler(CommandHandlerBase):
|
||||
"""Helper functions to handle binary requests through binary commands"""
|
||||
|
||||
|
||||
async def req_status(self, contact, timeout=0):
|
||||
res = await self.send_binary_req(contact, BinaryReqType.STATUS.value.to_bytes(1, "little"))
|
||||
res = await self.send_binary_req(
|
||||
contact,
|
||||
BinaryReqType.STATUS,
|
||||
timeout=timeout
|
||||
)
|
||||
if res.type == EventType.ERROR:
|
||||
return None
|
||||
|
||||
@@ -24,31 +25,32 @@ class BinaryCommandHandler(MessagingCommands):
|
||||
if self.dispatcher is None:
|
||||
return None
|
||||
|
||||
# Listen for STATUS_RESPONSE event with matching pubkey
|
||||
contact_pubkey_prefix = contact["public_key"][0:12]
|
||||
status_event = await self.dispatcher.wait_for_event(
|
||||
EventType.STATUS_RESPONSE,
|
||||
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
|
||||
attribute_filters={"tag": exp_tag},
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return status_event.payload if status_event else None
|
||||
|
||||
async def req_telemetry(self, contact, timeout=0):
|
||||
res = await self.send_binary_req(contact, BinaryReqType.TELEMETRY.value.to_bytes(1, "little"))
|
||||
res = await self.send_binary_req(
|
||||
contact,
|
||||
BinaryReqType.TELEMETRY,
|
||||
timeout=timeout
|
||||
)
|
||||
if res.type == EventType.ERROR:
|
||||
return None
|
||||
|
||||
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
|
||||
|
||||
|
||||
if self.dispatcher is None:
|
||||
return None
|
||||
|
||||
# Listen for TELEMETRY_RESPONSE event with matching pubkey
|
||||
contact_pubkey_prefix = contact["public_key"][0:12]
|
||||
# Listen for TELEMETRY_RESPONSE event
|
||||
telem_event = await self.dispatcher.wait_for_event(
|
||||
EventType.TELEMETRY_RESPONSE,
|
||||
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
|
||||
attribute_filters={"tag": res.payload["expected_ack"].hex()},
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
@@ -56,12 +58,16 @@ class BinaryCommandHandler(MessagingCommands):
|
||||
|
||||
async def req_mma(self, contact, start, end, timeout=0):
|
||||
req = (
|
||||
BinaryReqType.MMA.value.to_bytes(1, "little", signed=False)
|
||||
+ start.to_bytes(4, "little", signed=False)
|
||||
start.to_bytes(4, "little", signed=False)
|
||||
+ end.to_bytes(4, "little", signed=False)
|
||||
+ b"\0\0"
|
||||
)
|
||||
res = await self.send_binary_req(contact, req)
|
||||
res = await self.send_binary_req(
|
||||
contact,
|
||||
BinaryReqType.MMA,
|
||||
data=req,
|
||||
timeout=timeout
|
||||
)
|
||||
if res.type == EventType.ERROR:
|
||||
return None
|
||||
|
||||
@@ -70,19 +76,23 @@ class BinaryCommandHandler(MessagingCommands):
|
||||
if self.dispatcher is None:
|
||||
return None
|
||||
|
||||
# Listen for MMA_RESPONSE event with matching pubkey
|
||||
contact_pubkey_prefix = contact["public_key"][0:12]
|
||||
# Listen for MMA_RESPONSE
|
||||
mma_event = await self.dispatcher.wait_for_event(
|
||||
EventType.MMA_RESPONSE,
|
||||
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
|
||||
attribute_filters={"tag": res.payload["expected_ack"].hex()},
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return mma_event.payload["mma_data"] if mma_event else None
|
||||
|
||||
async def req_acl(self, contact, timeout=0):
|
||||
req = BinaryReqType.ACL.value.to_bytes(1, "little", signed=False) + b"\0\0"
|
||||
res = await self.send_binary_req(contact, req)
|
||||
req = b"\0\0"
|
||||
res = await self.send_binary_req(
|
||||
contact,
|
||||
BinaryReqType.ACL,
|
||||
data=req,
|
||||
timeout=timeout
|
||||
)
|
||||
if res.type == EventType.ERROR:
|
||||
return None
|
||||
|
||||
|
||||
@@ -97,12 +97,6 @@ class MessagingCommands(CommandHandlerBase):
|
||||
data = b"\x27\x00\x00\x00" + dst_bytes
|
||||
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
|
||||
|
||||
async def send_binary_req(self, dst: DestinationType, bin_data) -> Event:
|
||||
dst_bytes = _validate_destination(dst, prefix_length=32)
|
||||
logger.debug(f"Binary request to {dst_bytes.hex()}")
|
||||
data = b"\x32" + dst_bytes + bin_data
|
||||
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
|
||||
|
||||
async def send_path_discovery(self, dst: DestinationType) -> Event:
|
||||
dst_bytes = _validate_destination(dst, prefix_length=32)
|
||||
logger.debug(f"Path discovery request for {dst_bytes.hex()}")
|
||||
|
||||
Reference in New Issue
Block a user