Merge pull request #10 from fdlamotte/awolden/connection-manager

Add better connection management
This commit is contained in:
fdlamotte
2025-07-01 09:15:58 +02:00
committed by GitHub
9 changed files with 370 additions and 31 deletions

View File

@@ -111,6 +111,43 @@ meshcore = await MeshCore.create_ble("12:34:56:78:90:AB")
meshcore = await MeshCore.create_tcp("192.168.1.100", 4000) meshcore = await MeshCore.create_tcp("192.168.1.100", 4000)
``` ```
#### Auto-Reconnect and Connection Events
Enable automatic reconnection when connections are lost:
```python
# Enable auto-reconnect with custom retry limits
meshcore = await MeshCore.create_tcp(
"192.168.1.100", 4000,
auto_reconnect=True,
max_reconnect_attempts=5
)
# Subscribe to connection events
async def on_connected(event):
print(f"Connected: {event.payload}")
if event.payload.get('reconnected'):
print("Successfully reconnected!")
async def on_disconnected(event):
print(f"Disconnected: {event.payload['reason']}")
if event.payload.get('max_attempts_exceeded'):
print("Max reconnection attempts exceeded")
meshcore.subscribe(EventType.CONNECTED, on_connected)
meshcore.subscribe(EventType.DISCONNECTED, on_disconnected)
# Check connection status
if meshcore.is_connected:
print("Device is currently connected")
```
**Auto-reconnect features:**
- Exponential backoff (1s, 2s, 4s, 8s max delay)
- Configurable retry limits (default: 3 attempts)
- Automatic disconnect detection (especially useful for TCP connections)
- Connection events with detailed information
### Using Commands (Synchronous Style) ### Using Commands (Synchronous Style)
Send commands and wait for responses: Send commands and wait for responses:

View File

@@ -0,0 +1,87 @@
"""
Example demonstrating connection events and auto-reconnect functionality.
"""
import asyncio
import logging
import sys
from meshcore import MeshCore
from meshcore.events import EventType
logging.basicConfig(level=logging.DEBUG)
async def main():
mc = None
# Example with auto-reconnect enabled
try:
# mc = await MeshCore.create_serial(
# port="/dev/cu.usbmodem1101",
# auto_reconnect=True,
# max_reconnect_attempts=3,
# debug=True
# )
# mc = await MeshCore.create_tcp(
# host="192.168.1.22",
# port=5000,
# auto_reconnect=True,
# max_reconnect_attempts=sys.maxsize,
# debug=True
# )
mc = await MeshCore.create_ble(
address="92849669",
auto_reconnect=True,
max_reconnect_attempts=3,
debug=True
)
# Subscribe to connection events
async def on_connected(event):
print(f"✅ Connected! Info: {event.payload}")
if event.payload.get('reconnected'):
print("🔄 This was a reconnection!")
async def on_disconnected(event):
print(f"❌ Disconnected! Reason: {event.payload.get('reason')}")
if event.payload.get('max_attempts_exceeded'):
print("⚠️ Max reconnection attempts exceeded")
mc.subscribe(EventType.CONNECTED, on_connected)
mc.subscribe(EventType.DISCONNECTED, on_disconnected)
# Check connection status
print("\n📱 Disconnect your device now to test auto-reconnect...")
print("Press Ctrl+C to exit")
# Keep running and periodically test the connection
while True:
await asyncio.sleep(2)
print(f"Connected: {mc.is_connected}")
if mc.is_connected:
try:
print("🔄 Testing connection by getting battery...")
result = await mc.commands.get_bat()
if result.type == EventType.ERROR:
print(f"❌ Error getting battery: {result.payload}")
else:
print("✅ Connection test successfeul")
except Exception as e:
print(f"❌ Connection test failed: {e}")
# This should trigger the disconnect detection
else:
print("⏳ Waiting for reconnection...")
except KeyboardInterrupt:
print("\n🛑 Exiting...")
except ConnectionError as e:
print(f"❌ Failed to connect: {e}")
finally:
if mc is not None:
await mc.disconnect()
print(f"Connected after disconnect: {mc.is_connected}")
print(f"Connected after disconnect: {mc.is_connected}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -5,6 +5,7 @@ logging.basicConfig(level=logging.INFO)
from meshcore.events import EventType from meshcore.events import EventType
from meshcore.meshcore import MeshCore, logger from meshcore.meshcore import MeshCore, logger
from meshcore.connection_manager import ConnectionManager
from meshcore.tcp_cx import TCPConnection from meshcore.tcp_cx import TCPConnection
from meshcore.ble_cx import BLEConnection from meshcore.ble_cx import BLEConnection
from meshcore.serial_cx import SerialConnection from meshcore.serial_cx import SerialConnection

View File

@@ -21,9 +21,10 @@ class BLEConnection:
def __init__(self, address): def __init__(self, address):
""" Constructor : specify address """ """ Constructor : specify address """
self.address = address self.address = address
self._user_provided_address = address
self.client = None self.client = None
self.rx_char = None self.rx_char = None
self.mc = None self._disconnect_callback = None
async def connect(self): async def connect(self):
""" """
@@ -31,6 +32,7 @@ class BLEConnection:
Returns : the address used for connection Returns : the address used for connection
""" """
logger.debug(f"Connecting existing connection: {self.client} with address {self.address}")
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData): def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
""" Filter to mach MeshCore devices """ """ Filter to mach MeshCore devices """
if not adv.local_name is None\ if not adv.local_name is None\
@@ -39,20 +41,20 @@ class BLEConnection:
return True return True
return False return False
if self.address is None or self.address == "" or len(self.address.split(":")) != 6 : if self.address is None or self.address == "" or len(self.address.split(":")) != 6:
scanner = BleakScanner() scanner = BleakScanner()
logger.info("Scanning for devices") logger.info("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device) device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None : if device is None:
return None return None
logger.info(f"Found device : {device}") logger.info(f"Found device : {device}")
self.client = BleakClient(device) self.client = BleakClient(device, disconnected_callback=self.handle_disconnect)
self.address = self.client.address self.address = self.client.address
else: else:
self.client = BleakClient(self.address) self.client = BleakClient(self.address, disconnected_callback=self.handle_disconnect)
try: try:
await self.client.connect(disconnected_callback=self.handle_disconnect) await self.client.connect()
except BleakDeviceNotFoundError: except BleakDeviceNotFoundError:
return None return None
except TimeoutError: except TimeoutError:
@@ -69,12 +71,19 @@ class BLEConnection:
logger.info("BLE Connection started") logger.info("BLE Connection started")
return self.address return self.address
def handle_disconnect(self, _: BleakClient): def handle_disconnect(self, client: BleakClient):
""" Callback to handle disconnection """ """ Callback to handle disconnection """
logger.info("Device was disconnected, goodbye.") logger.debug(f"BLE device disconnected: {client.address} (is_connected: {client.is_connected})")
# cancelling all tasks effectively ends the program # Reset the address we found to what user specified
for task in asyncio.all_tasks(): # this allows to reconnect to the same device
task.cancel() self.address = self._user_provided_address
if self._disconnect_callback:
asyncio.create_task(self._disconnect_callback("ble_disconnect"))
def set_disconnect_callback(self, callback):
"""Set callback to handle disconnections."""
self._disconnect_callback = callback
def set_reader(self, reader) : def set_reader(self, reader) :
self.reader = reader self.reader = reader

View File

@@ -0,0 +1,149 @@
"""
Connection manager that orchestrates reconnection logic for any connection type.
"""
import asyncio
import logging
from typing import Optional, Any, Callable, Protocol
from .events import Event, EventType
logger = logging.getLogger("meshcore")
class ConnectionProtocol(Protocol):
"""Protocol defining the interface that connection classes must implement."""
async def connect(self) -> Optional[Any]:
"""Connect and return connection info, or None if failed."""
...
async def disconnect(self):
"""Disconnect from the device/server."""
...
async def send(self, data):
"""Send data through the connection."""
...
def set_reader(self, reader):
"""Set the message reader."""
...
class ConnectionManager:
"""Manages connection lifecycle with auto-reconnect and event emission."""
def __init__(self, connection: ConnectionProtocol, event_dispatcher=None,
auto_reconnect: bool = False, max_reconnect_attempts: int = 3):
self.connection = connection
self.event_dispatcher = event_dispatcher
self.auto_reconnect = auto_reconnect
self.max_reconnect_attempts = max_reconnect_attempts
self._reconnect_attempts = 0
self._is_connected = False
self._reconnect_task = None
self._disconnect_callback: Optional[Callable] = None
def set_disconnect_callback(self, callback: Callable):
"""Set a callback to be called when disconnection is detected."""
self._disconnect_callback = callback
async def connect(self) -> Optional[Any]:
"""Connect with event handling and state management."""
result = await self.connection.connect()
if result is not None:
self._is_connected = True
self._reconnect_attempts = 0
await self._emit_event(EventType.CONNECTED, {"connection_info": result})
logger.debug(f"Connected successfully: {result}")
else:
logger.debug("Connection failed")
return result
async def disconnect(self):
"""Disconnect with proper cleanup."""
if self._reconnect_task:
self._reconnect_task.cancel()
try:
await self._reconnect_task
except asyncio.CancelledError:
pass
self._reconnect_task = None
if self._is_connected:
await self.connection.disconnect()
self._is_connected = False
await self._emit_event(EventType.DISCONNECTED, {"reason": "manual_disconnect"})
async def handle_disconnect(self, reason: str = "unknown"):
"""Handle unexpected disconnections with optional auto-reconnect."""
if not self._is_connected:
return
self._is_connected = False
logger.debug(f"Connection lost: {reason}")
if self.auto_reconnect and self._reconnect_attempts < self.max_reconnect_attempts:
self._reconnect_task = asyncio.create_task(self._attempt_reconnect())
else:
await self._emit_event(EventType.DISCONNECTED, {
"reason": reason,
"reconnect_failed": self._reconnect_attempts >= self.max_reconnect_attempts
})
async def _attempt_reconnect(self):
"""Attempt to reconnect with flat delay."""
logger.debug(f"Attempting reconnection ({self._reconnect_attempts + 1}/{self.max_reconnect_attempts})")
self._reconnect_attempts += 1
# Flat 1 second delay for all attempts
await asyncio.sleep(1)
try:
result = await self.connection.connect()
if result is not None:
self._is_connected = True
self._reconnect_attempts = 0
await self._emit_event(EventType.CONNECTED, {
"connection_info": result,
"reconnected": True
})
logger.debug(f"Reconnected successfully")
else:
# Reconnection failed, try again if we haven't exceeded max attempts
if self._reconnect_attempts < self.max_reconnect_attempts:
self._reconnect_task = asyncio.create_task(self._attempt_reconnect())
else:
await self._emit_event(EventType.DISCONNECTED, {
"reason": "reconnect_failed",
"max_attempts_exceeded": True
})
except Exception as e:
logger.debug(f"Reconnection attempt failed: {e}")
if self._reconnect_attempts < self.max_reconnect_attempts:
self._reconnect_task = asyncio.create_task(self._attempt_reconnect())
else:
await self._emit_event(EventType.DISCONNECTED, {
"reason": f"reconnect_error: {e}",
"max_attempts_exceeded": True
})
async def _emit_event(self, event_type: EventType, payload: dict):
"""Emit connection events if dispatcher is available."""
if self.event_dispatcher:
event = Event(event_type, payload)
await self.event_dispatcher.dispatch(event)
@property
def is_connected(self) -> bool:
"""Check if the connection is active."""
return self._is_connected
async def send(self, data):
"""Send data through the managed connection."""
return await self.connection.send(data)
def set_reader(self, reader):
"""Set the message reader on the underlying connection."""
self.connection.set_reader(reader)

View File

@@ -41,6 +41,10 @@ class EventType(Enum):
OK = "command_ok" OK = "command_ok"
ERROR = "command_error" ERROR = "command_error"
# Connection events
CONNECTED = "connected"
DISCONNECTED = "disconnected"
@dataclass @dataclass
class Event: class Event:

View File

@@ -5,6 +5,7 @@ from typing import Optional, Dict, Any, Union
from .events import EventDispatcher, EventType from .events import EventDispatcher, EventType
from .reader import MessageReader from .reader import MessageReader
from .commands import CommandHandler from .commands import CommandHandler
from .connection_manager import ConnectionManager
from .ble_cx import BLEConnection from .ble_cx import BLEConnection
from .tcp_cx import TCPConnection from .tcp_cx import TCPConnection
from .serial_cx import SerialConnection from .serial_cx import SerialConnection
@@ -16,9 +17,14 @@ class MeshCore:
""" """
Interface to a MeshCore device Interface to a MeshCore device
""" """
def __init__(self, cx, debug=False, default_timeout=None): def __init__(self, cx, debug=False, default_timeout=None, auto_reconnect=False, max_reconnect_attempts=3):
self.cx = cx # Wrap connection with ConnectionManager
self.dispatcher = EventDispatcher() self.dispatcher = EventDispatcher()
self.connection_manager = ConnectionManager(
cx, self.dispatcher, auto_reconnect, max_reconnect_attempts
)
self.cx = self.connection_manager # For backward compatibility
self._reader = MessageReader(self.dispatcher) self._reader = MessageReader(self.dispatcher)
self.commands = CommandHandler(default_timeout=default_timeout) self.commands = CommandHandler(default_timeout=default_timeout)
@@ -29,7 +35,7 @@ class MeshCore:
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
# Set up connections # Set up connections
self.commands.set_connection(cx) self.commands.set_connection(self.connection_manager)
# Set the dispatcher in the command handler # Set the dispatcher in the command handler
self.commands.set_dispatcher(self.dispatcher) self.commands.set_dispatcher(self.dispatcher)
@@ -43,47 +49,54 @@ class MeshCore:
# Set up event subscriptions to track data # Set up event subscriptions to track data
self._setup_data_tracking() self._setup_data_tracking()
cx.set_reader(self._reader) self.connection_manager.set_reader(self._reader)
# Set up disconnect callback
cx.set_disconnect_callback(self.connection_manager.handle_disconnect)
@classmethod @classmethod
async def create_tcp(cls, host: str, port: int, debug: bool = False, default_timeout=None) -> 'MeshCore': async def create_tcp(cls, host: str, port: int, debug: bool = False, default_timeout=None,
auto_reconnect: bool = False, max_reconnect_attempts: int = 3) -> 'MeshCore':
"""Create and connect a MeshCore instance using TCP connection""" """Create and connect a MeshCore instance using TCP connection"""
connection = TCPConnection(host, port) connection = TCPConnection(host, port)
await connection.connect()
mc = cls(connection, debug=debug, default_timeout=default_timeout) mc = cls(connection, debug=debug, default_timeout=default_timeout,
auto_reconnect=auto_reconnect, max_reconnect_attempts=max_reconnect_attempts)
await mc.connect() await mc.connect()
return mc return mc
@classmethod @classmethod
async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False, default_timeout=None) -> 'MeshCore': async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False, default_timeout=None,
auto_reconnect: bool = False, max_reconnect_attempts: int = 3) -> 'MeshCore':
"""Create and connect a MeshCore instance using serial connection""" """Create and connect a MeshCore instance using serial connection"""
connection = SerialConnection(port, baudrate) connection = SerialConnection(port, baudrate)
await connection.connect()
await asyncio.sleep(0.1) # Time for transport to establish await asyncio.sleep(0.1) # Time for transport to establish
mc = cls(connection, debug=debug, default_timeout=default_timeout) mc = cls(connection, debug=debug, default_timeout=default_timeout,
auto_reconnect=auto_reconnect, max_reconnect_attempts=max_reconnect_attempts)
await mc.connect() await mc.connect()
return mc return mc
@classmethod @classmethod
async def create_ble(cls, address: Optional[str] = None, debug: bool = False, default_timeout=None) -> 'MeshCore': async def create_ble(cls, address: Optional[str] = None, debug: bool = False, default_timeout=None,
auto_reconnect: bool = False, max_reconnect_attempts: int = 3) -> 'MeshCore':
"""Create and connect a MeshCore instance using BLE connection """Create and connect a MeshCore instance using BLE connection
If address is None, it will scan for and connect to the first available MeshCore device. If address is None, it will scan for and connect to the first available MeshCore device.
""" """
connection = BLEConnection(address) connection = BLEConnection(address)
result = await connection.connect()
if result is None:
raise ConnectionError("Failed to connect to BLE device")
mc = cls(connection, debug=debug, default_timeout=default_timeout) mc = cls(connection, debug=debug, default_timeout=default_timeout,
auto_reconnect=auto_reconnect, max_reconnect_attempts=max_reconnect_attempts)
await mc.connect() await mc.connect()
return mc return mc
async def connect(self): async def connect(self):
await self.dispatcher.start() await self.dispatcher.start()
result = await self.connection_manager.connect()
if result is None:
raise ConnectionError("Failed to connect to device")
return await self.commands.send_appstart() return await self.commands.send_appstart()
async def disconnect(self): async def disconnect(self):
@@ -96,8 +109,7 @@ class MeshCore:
await self.stop_auto_message_fetching() await self.stop_auto_message_fetching()
# Disconnect the connection object # Disconnect the connection object
if self.cx: await self.connection_manager.disconnect()
await self.cx.disconnect()
def stop(self): def stop(self):
"""Synchronously stop the event dispatcher task""" """Synchronously stop the event dispatcher task"""
@@ -195,6 +207,11 @@ class MeshCore:
"""Get the current device time""" """Get the current device time"""
return self._time return self._time
@property
def is_connected(self):
"""Check if the connection is active"""
return self.connection_manager.is_connected
@property @property
def default_timeout(self): def default_timeout(self):
"""Get the default timeout for commands""" """Get the default timeout for commands"""

View File

@@ -17,6 +17,7 @@ class SerialConnection:
self.transport = None self.transport = None
self.header = b"" self.header = b""
self.inframe = b"" self.inframe = b""
self._disconnect_callback = None
class MCSerialClientProtocol(asyncio.Protocol): class MCSerialClientProtocol(asyncio.Protocol):
def __init__(self, cx): def __init__(self, cx):
@@ -32,7 +33,9 @@ class SerialConnection:
self.cx.handle_rx(data) self.cx.handle_rx(data)
def connection_lost(self, exc): def connection_lost(self, exc):
logger.info('port closed') logger.debug('Serial port closed')
if self.cx._disconnect_callback:
asyncio.create_task(self.cx._disconnect_callback("serial_disconnect"))
def pause_writing(self): def pause_writing(self):
logger.debug('pause writing') logger.debug('pause writing')
@@ -94,3 +97,7 @@ class SerialConnection:
self.transport.close() self.transport.close()
self.transport = None self.transport = None
logger.debug("Serial Connection closed") logger.debug("Serial Connection closed")
def set_disconnect_callback(self, callback):
"""Set callback to handle disconnections."""
self._disconnect_callback = callback

View File

@@ -7,6 +7,9 @@ import logging
# Get logger # Get logger
logger = logging.getLogger("meshcore") logger = logging.getLogger("meshcore")
# TCP disconnect detection threshold
TCP_DISCONNECT_THRESHOLD = 5
class TCPConnection: class TCPConnection:
def __init__(self, host, port): def __init__(self, host, port):
self.host = host self.host = host
@@ -16,6 +19,9 @@ class TCPConnection:
self.frame_size = 0 self.frame_size = 0
self.header = b"" self.header = b""
self.inframe = b"" self.inframe = b""
self._disconnect_callback = None
self._send_count = 0
self._receive_count = 0
class MCClientProtocol(asyncio.Protocol): class MCClientProtocol(asyncio.Protocol):
def __init__(self, cx): def __init__(self, cx):
@@ -23,17 +29,23 @@ class TCPConnection:
def connection_made(self, transport): def connection_made(self, transport):
self.cx.transport = transport self.cx.transport = transport
# Reset counters on new connection
self.cx._send_count = 0
self.cx._receive_count = 0
logger.debug('connection established') logger.debug('connection established')
def data_received(self, data): def data_received(self, data):
logger.debug('data received') logger.debug('data received')
self.cx._receive_count += 1
self.cx.handle_rx(data) self.cx.handle_rx(data)
def error_received(self, exc): def error_received(self, exc):
logger.error(f'Error received: {exc}') logger.error(f'Error received: {exc}')
def connection_lost(self, exc): def connection_lost(self, exc):
logger.info('The server closed the connection') logger.debug('TCP server closed the connection')
if self.cx._disconnect_callback:
asyncio.create_task(self.cx._disconnect_callback("tcp_disconnect"))
async def connect(self): async def connect(self):
""" """
@@ -80,7 +92,19 @@ class TCPConnection:
async def send(self, data): async def send(self, data):
if not self.transport: if not self.transport:
logger.error("Transport not connected, cannot send data") logger.error("Transport not connected, cannot send data")
if self._disconnect_callback:
await self._disconnect_callback("tcp_transport_lost")
return return
self._send_count += 1
# Check if we've sent packets without any responses
if self._send_count - self._receive_count >= TCP_DISCONNECT_THRESHOLD:
logger.debug(f"TCP disconnect detected: sent {self._send_count}, received {self._receive_count}")
if self._disconnect_callback:
await self._disconnect_callback("tcp_no_response")
return
size = len(data) size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
logger.debug(f"sending pkt : {pkt}") logger.debug(f"sending pkt : {pkt}")
@@ -92,3 +116,7 @@ class TCPConnection:
self.transport.close() self.transport.close()
self.transport = None self.transport = None
logger.debug("TCP Connection closed") logger.debug("TCP Connection closed")
def set_disconnect_callback(self, callback):
"""Set callback to handle disconnections."""
self._disconnect_callback = callback