mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-06-11 11:56:18 +00:00
G5: F05 — track fire-and-forget asyncio.create_task references
Why: Python's asyncio holds only weak references to tasks created via create_task(). Under GC pressure (especially Python < 3.11), unretained tasks can be silently cancelled mid-execution, and any exceptions are swallowed as "Task exception was never retrieved." Seven call sites across TCPConnection, BLEConnection, SerialConnection, and EventDispatcher used fire-and-forget create_task with no stored reference. Fix: introduce _background_tasks set and _spawn_background() helper on each class, following the standard pattern from the asyncio docs (task.add_done_callback(set.discard)). Refs: Forensics report finding F05
This commit is contained in:
@@ -51,6 +51,14 @@ class BLEConnection:
|
|||||||
self.pin = pin
|
self.pin = pin
|
||||||
self.rx_char = None
|
self.rx_char = None
|
||||||
self._disconnect_callback = None
|
self._disconnect_callback = None
|
||||||
|
self._background_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
def _spawn_background(self, coro) -> asyncio.Task:
|
||||||
|
"""Create a tracked background task (prevents GC of fire-and-forget tasks)."""
|
||||||
|
task = asyncio.create_task(coro)
|
||||||
|
self._background_tasks.add(task)
|
||||||
|
task.add_done_callback(self._background_tasks.discard)
|
||||||
|
return task
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
"""
|
"""
|
||||||
@@ -155,7 +163,7 @@ class BLEConnection:
|
|||||||
self.device = self._user_provided_device
|
self.device = self._user_provided_device
|
||||||
|
|
||||||
if self._disconnect_callback:
|
if self._disconnect_callback:
|
||||||
asyncio.create_task(self._disconnect_callback("ble_disconnect"))
|
self._spawn_background(self._disconnect_callback("ble_disconnect"))
|
||||||
|
|
||||||
def set_disconnect_callback(self, callback):
|
def set_disconnect_callback(self, callback):
|
||||||
"""Set callback to handle disconnections."""
|
"""Set callback to handle disconnections."""
|
||||||
@@ -166,7 +174,7 @@ class BLEConnection:
|
|||||||
|
|
||||||
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
|
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
|
||||||
if self.reader is not None:
|
if self.reader is not None:
|
||||||
asyncio.create_task(self.reader.handle_rx(data))
|
self._spawn_background(self.reader.handle_rx(data))
|
||||||
|
|
||||||
async def send(self, data):
|
async def send(self, data):
|
||||||
if not self.client:
|
if not self.client:
|
||||||
|
|||||||
@@ -134,6 +134,14 @@ class EventDispatcher:
|
|||||||
self.subscriptions: List[Subscription] = []
|
self.subscriptions: List[Subscription] = []
|
||||||
self.running = False
|
self.running = False
|
||||||
self._task = None
|
self._task = None
|
||||||
|
self._background_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
def _spawn_background(self, coro) -> asyncio.Task:
|
||||||
|
"""Create a tracked background task (prevents GC of fire-and-forget tasks)."""
|
||||||
|
task = asyncio.create_task(coro)
|
||||||
|
self._background_tasks.add(task)
|
||||||
|
task.add_done_callback(self._background_tasks.discard)
|
||||||
|
return task
|
||||||
|
|
||||||
def subscribe(
|
def subscribe(
|
||||||
self,
|
self,
|
||||||
@@ -197,7 +205,7 @@ class EventDispatcher:
|
|||||||
# returns - avoids the race where create_task schedules the callback after
|
# returns - avoids the race where create_task schedules the callback after
|
||||||
# the waiter has already timed out with done=set().
|
# the waiter has already timed out with done=set().
|
||||||
if asyncio.iscoroutinefunction(subscription.callback):
|
if asyncio.iscoroutinefunction(subscription.callback):
|
||||||
asyncio.create_task(self._execute_callback(subscription.callback, event.clone()))
|
self._spawn_background(self._execute_callback(subscription.callback, event.clone()))
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
subscription.callback(event.clone())
|
subscription.callback(event.clone())
|
||||||
|
|||||||
@@ -20,11 +20,19 @@ class SerialConnection:
|
|||||||
self._disconnect_callback = None
|
self._disconnect_callback = None
|
||||||
self.cx_dly = cx_dly
|
self.cx_dly = cx_dly
|
||||||
self._connected_event = asyncio.Event()
|
self._connected_event = asyncio.Event()
|
||||||
|
self._background_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
self.frame_expected_size = 0
|
self.frame_expected_size = 0
|
||||||
self.inframe = b""
|
self.inframe = b""
|
||||||
self.header = b""
|
self.header = b""
|
||||||
|
|
||||||
|
def _spawn_background(self, coro) -> asyncio.Task:
|
||||||
|
"""Create a tracked background task (prevents GC of fire-and-forget tasks)."""
|
||||||
|
task = asyncio.create_task(coro)
|
||||||
|
self._background_tasks.add(task)
|
||||||
|
task.add_done_callback(self._background_tasks.discard)
|
||||||
|
return task
|
||||||
|
|
||||||
class MCSerialClientProtocol(asyncio.Protocol):
|
class MCSerialClientProtocol(asyncio.Protocol):
|
||||||
def __init__(self, cx):
|
def __init__(self, cx):
|
||||||
self.cx = cx
|
self.cx = cx
|
||||||
@@ -44,7 +52,7 @@ class SerialConnection:
|
|||||||
self.cx._connected_event.clear()
|
self.cx._connected_event.clear()
|
||||||
|
|
||||||
if self.cx._disconnect_callback:
|
if self.cx._disconnect_callback:
|
||||||
asyncio.create_task(self.cx._disconnect_callback("serial_disconnect"))
|
self.cx._spawn_background(self.cx._disconnect_callback("serial_disconnect"))
|
||||||
|
|
||||||
def pause_writing(self):
|
def pause_writing(self):
|
||||||
logger.debug("pause writing")
|
logger.debug("pause writing")
|
||||||
@@ -114,7 +122,7 @@ class SerialConnection:
|
|||||||
data = data[upbound:]
|
data = data[upbound:]
|
||||||
if self.reader is not None:
|
if self.reader is not None:
|
||||||
# feed meshcore reader
|
# feed meshcore reader
|
||||||
asyncio.create_task(self.reader.handle_rx(self.inframe))
|
self._spawn_background(self.reader.handle_rx(self.inframe))
|
||||||
# reset inframe
|
# reset inframe
|
||||||
self.inframe = b""
|
self.inframe = b""
|
||||||
self.header = b""
|
self.header = b""
|
||||||
|
|||||||
@@ -24,6 +24,14 @@ class TCPConnection:
|
|||||||
self.frame_expected_size = 0
|
self.frame_expected_size = 0
|
||||||
self.header = b""
|
self.header = b""
|
||||||
self.inframe = b""
|
self.inframe = b""
|
||||||
|
self._background_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
def _spawn_background(self, coro) -> asyncio.Task:
|
||||||
|
"""Create a tracked background task (prevents GC of fire-and-forget tasks)."""
|
||||||
|
task = asyncio.create_task(coro)
|
||||||
|
self._background_tasks.add(task)
|
||||||
|
task.add_done_callback(self._background_tasks.discard)
|
||||||
|
return task
|
||||||
|
|
||||||
class MCClientProtocol(asyncio.Protocol):
|
class MCClientProtocol(asyncio.Protocol):
|
||||||
def __init__(self, cx):
|
def __init__(self, cx):
|
||||||
@@ -47,7 +55,7 @@ class TCPConnection:
|
|||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
logger.debug("TCP server closed the connection")
|
logger.debug("TCP server closed the connection")
|
||||||
if self.cx._disconnect_callback:
|
if self.cx._disconnect_callback:
|
||||||
asyncio.create_task(self.cx._disconnect_callback("tcp_disconnect"))
|
self.cx._spawn_background(self.cx._disconnect_callback("tcp_disconnect"))
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
"""
|
"""
|
||||||
@@ -108,7 +116,7 @@ class TCPConnection:
|
|||||||
data = data[upbound:]
|
data = data[upbound:]
|
||||||
if self.reader is not None:
|
if self.reader is not None:
|
||||||
# feed meshcore reader
|
# feed meshcore reader
|
||||||
asyncio.create_task(self.reader.handle_rx(self.inframe))
|
self._spawn_background(self.reader.handle_rx(self.inframe))
|
||||||
# reset inframe
|
# reset inframe
|
||||||
self.inframe = b""
|
self.inframe = b""
|
||||||
self.header = b""
|
self.header = b""
|
||||||
|
|||||||
Reference in New Issue
Block a user