G5: F08 — defer asyncio.Queue and asyncio.Lock construction

Why: On Python 3.9/3.10, asyncio.Queue() and asyncio.Lock() bind to
the running event loop at construction time. If the SDK is instantiated
from a synchronous factory before an event loop exists, both primitives
raise "RuntimeError: ... is bound to a different event loop" on first
use. Fix: EventDispatcher defers Queue creation to start(), with a
guard in dispatch() that raises RuntimeError if called before start().
CommandHandlerBase defers Lock creation via a lazy @property accessor.
Both document the contract change in class docstrings.

Refs: Forensics report finding F08
This commit is contained in:
Matthew Wolter
2026-04-12 03:57:06 -07:00
parent d4581a8e13
commit b4cd5840ab
2 changed files with 34 additions and 3 deletions

View File

@@ -58,17 +58,32 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes
class CommandHandlerBase: class CommandHandlerBase:
"""Base class for command handlers.
.. note::
The internal ``asyncio.Lock`` is created lazily on first access
so that it binds to the correct running event loop (required for
Python 3.9/3.10 compatibility).
"""
DEFAULT_TIMEOUT = 5.0 DEFAULT_TIMEOUT = 5.0
def __init__(self, default_timeout: Optional[float] = None): def __init__(self, default_timeout: Optional[float] = None):
self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None
self._reader: Optional[MessageReader] = None self._reader: Optional[MessageReader] = None
self.dispatcher: Optional[EventDispatcher] = None self.dispatcher: Optional[EventDispatcher] = None
self._mesh_request_lock = asyncio.Lock() self.__mesh_request_lock: Optional[asyncio.Lock] = None
self.default_timeout = ( self.default_timeout = (
default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT
) )
@property
def _mesh_request_lock(self) -> asyncio.Lock:
"""Lazy-init lock so it binds to the running loop, not import-time."""
if self.__mesh_request_lock is None:
self.__mesh_request_lock = asyncio.Lock()
return self.__mesh_request_lock
def set_connection(self, connection: Any) -> None: def set_connection(self, connection: Any) -> None:
async def sender(data: bytes) -> None: async def sender(data: bytes) -> None:
await connection.send(data) await connection.send(data)

View File

@@ -129,8 +129,17 @@ class Subscription:
class EventDispatcher: class EventDispatcher:
"""Event dispatch engine.
.. note::
``start()`` must be called before dispatching or processing events.
The internal ``asyncio.Queue`` is created lazily inside ``start()``
so that it binds to the correct running event loop (required for
Python 3.9/3.10 compatibility).
"""
def __init__(self): def __init__(self):
self.queue: asyncio.Queue[Event] = asyncio.Queue() self.queue: Optional[asyncio.Queue[Event]] = None
self.subscriptions: List[Subscription] = [] self.subscriptions: List[Subscription] = []
self.running = False self.running = False
self._task = None self._task = None
@@ -174,6 +183,10 @@ class EventDispatcher:
self.subscriptions.remove(subscription) self.subscriptions.remove(subscription)
async def dispatch(self, event: Event): async def dispatch(self, event: Event):
if self.queue is None:
raise RuntimeError(
"EventDispatcher.start() must be called before dispatching events"
)
await self.queue.put(event) await self.queue.put(event)
async def _process_events(self): async def _process_events(self):
@@ -228,6 +241,8 @@ class EventDispatcher:
async def start(self): async def start(self):
if not self.running: if not self.running:
if self.queue is None:
self.queue = asyncio.Queue()
self.running = True self.running = True
self._task = asyncio.create_task(self._process_events()) self._task = asyncio.create_task(self._process_events())
@@ -235,6 +250,7 @@ class EventDispatcher:
if self.running: if self.running:
self.running = False self.running = False
if self._task: if self._task:
if self.queue is not None:
await self.queue.join() await self.queue.join()
# Wait for any in-flight async callbacks to complete before # Wait for any in-flight async callbacks to complete before
# tearing down (F07: task_done fires before callbacks finish). # tearing down (F07: task_done fires before callbacks finish).