Fix: Improved BLE Connection Logic on macOS

This commit is contained in:
Ventz Petkov
2025-08-05 15:52:44 -04:00
parent 999bf2ec8b
commit f4d3be1360
4 changed files with 141 additions and 41 deletions

View File

@@ -1,2 +1 @@
[pytest] [pytest]
asyncio_mode = auto

View File

@@ -1,57 +1,80 @@
""" """
mccli.py : CLI interface to MeschCore BLE companion app mccli.py : CLI interface to MeschCore BLE companion app
""" """
import asyncio import asyncio
import logging import logging
# Get logger
logger = logging.getLogger("meshcore")
from bleak import BleakClient, BleakScanner from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData from bleak.backends.scanner import AdvertisementData
from bleak.exc import BleakDeviceNotFoundError from bleak.exc import BleakDeviceNotFoundError
# Get logger
logger = logging.getLogger("meshcore")
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
class BLEConnection: class BLEConnection:
def __init__(self, address): def __init__(self, address=None, client=None):
""" Constructor : specify address """ """
Constructor: specify address or an existing BleakClient.
Args:
address (str, optional): The Bluetooth address of the device.
client (BleakClient, optional): An existing BleakClient instance.
"""
self.address = address self.address = address
self._user_provided_address = address self._user_provided_address = address
self.client = None self.client = client
self.rx_char = None self.rx_char = None
self._disconnect_callback = None self._disconnect_callback = None
async def connect(self): async def connect(self):
""" """
Connects to the device Connects to the device.
Returns : the address used for connection If a BleakClient was provided to the constructor, it uses that.
Otherwise, it will scan or connect based on the provided address.
Returns:
The address used for connection, or None on failure.
""" """
logger.debug(f"Connecting existing connection: {self.client} with address {self.address}") logger.debug(f"Connecting with client: {self.client}, address: {self.address}")
if self.client:
logger.debug("Using pre-configured BleakClient.")
# If a client is already provided, ensure its disconnect callback is set
self.client._disconnected_callback = self.handle_disconnect
self.address = self.client.address
else:
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData): def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
""" Filter to mach MeshCore devices """ """Filter to match MeshCore devices."""
if not adv.local_name is None\ if adv.local_name and adv.local_name.startswith("MeshCore"):
and adv.local_name.startswith("MeshCore")\ if self.address is None or self.address in adv.local_name:
and (self.address is None or self.address in adv.local_name) :
return True return True
return False return False
if self.address is None or self.address == "" or len(self.address.split(":")) != 6: if self.address is None or ":" not in self.address:
scanner = BleakScanner() logger.info("Scanning for devices...")
logger.info("Scanning for devices") device = await BleakScanner.find_device_by_filter(match_meshcore_device)
device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None: if device is None:
logger.warning("No MeshCore device found during scan.")
return None return None
logger.info(f"Found device : {device}") logger.info(f"Found device: {device}")
self.client = BleakClient(device, disconnected_callback=self.handle_disconnect) self.client = BleakClient(
device, disconnected_callback=self.handle_disconnect
)
self.address = self.client.address self.address = self.client.address
else: else:
self.client = BleakClient(self.address, disconnected_callback=self.handle_disconnect) self.client = BleakClient(
self.address, disconnected_callback=self.handle_disconnect
)
try: try:
await self.client.connect() await self.client.connect()
@@ -72,8 +95,10 @@ class BLEConnection:
return self.address return self.address
def handle_disconnect(self, client: BleakClient): def handle_disconnect(self, client: BleakClient):
""" Callback to handle disconnection """ """Callback to handle disconnection"""
logger.debug(f"BLE device disconnected: {client.address} (is_connected: {client.is_connected})") logger.debug(
f"BLE device disconnected: {client.address} (is_connected: {client.is_connected})"
)
# Reset the address we found to what user specified # Reset the address we found to what user specified
# this allows to reconnect to the same device # this allows to reconnect to the same device
self.address = self._user_provided_address self.address = self._user_provided_address
@@ -85,11 +110,11 @@ class BLEConnection:
"""Set callback to handle disconnections.""" """Set callback to handle disconnections."""
self._disconnect_callback = callback self._disconnect_callback = callback
def set_reader(self, reader) : def set_reader(self, reader):
self.reader = reader self.reader = reader
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray): def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
if not self.reader is None: if self.reader is not None:
asyncio.create_task(self.reader.handle_rx(data)) asyncio.create_task(self.reader.handle_rx(data))
async def send(self, data): async def send(self, data):
@@ -99,7 +124,7 @@ class BLEConnection:
if not self.rx_char: if not self.rx_char:
logger.error("RX characteristic not found") logger.error("RX characteristic not found")
return False return False
await self.client.write_gatt_char(self.rx_char, bytes(data), response=False) await self.client.write_gatt_char(self.rx_char, bytes(data), response=True)
async def disconnect(self): async def disconnect(self):
"""Disconnect from the BLE device.""" """Disconnect from the BLE device."""

View File

@@ -83,14 +83,19 @@ class MeshCore:
return mc return mc
@classmethod @classmethod
async def create_ble(cls, address: Optional[str] = None, debug: bool = False, only_error:bool=False, default_timeout=None, async def create_ble(cls, address: Optional[str] = None, client=None, debug: bool = False, only_error:bool=False, default_timeout=None,
auto_reconnect: bool = False, max_reconnect_attempts: int = 3) -> 'MeshCore': auto_reconnect: bool = False, max_reconnect_attempts: int = 3) -> 'MeshCore':
"""Create and connect a MeshCore instance using BLE connection """
Create and connect a MeshCore instance using BLE connection.
If address is None, it will scan for and connect to the first available MeshCore device. Args:
address (str, optional): The Bluetooth address of the device.
client (BleakClient, optional): An existing BleakClient instance to use.
If provided, 'address' is ignored for connection
but can be used for identification.
""" """
connection = BLEConnection(address) connection = BLEConnection(address=address, client=client)
mc = cls(connection, debug=debug, only_error=only_error, default_timeout=default_timeout, mc = cls(connection, debug=debug, only_error=only_error, default_timeout=default_timeout,
auto_reconnect=auto_reconnect, max_reconnect_attempts=max_reconnect_attempts) auto_reconnect=auto_reconnect, max_reconnect_attempts=max_reconnect_attempts)

View File

@@ -0,0 +1,71 @@
import asyncio
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
from meshcore.ble_cx import BLEConnection, UART_SERVICE_UUID, UART_TX_CHAR_UUID, UART_RX_CHAR_UUID
class TestBLEConnection(unittest.TestCase):
@patch('meshcore.ble_cx.BleakClient')
def test_ble_connection_and_disconnection(self, mock_bleak_client):
"""
Tests the BLEConnection class for connecting and disconnecting from a BLE device.
"""
# Arrange
mock_client_instance = self._get_mock_bleak_client()
mock_bleak_client.return_value = mock_client_instance
address = "00:11:22:33:44:55"
ble_conn = BLEConnection(address=address)
# Act
asyncio.run(ble_conn.connect())
asyncio.run(ble_conn.disconnect())
# Assert
mock_client_instance.connect.assert_called_once()
mock_client_instance.start_notify.assert_called_once_with(UART_TX_CHAR_UUID, ble_conn.handle_rx)
mock_client_instance.disconnect.assert_called_once()
@patch('meshcore.ble_cx.BleakClient')
def test_send_data(self, mock_bleak_client):
"""
Tests the send method of the BLEConnection class.
"""
# Arrange
mock_client_instance = self._get_mock_bleak_client()
mock_bleak_client.return_value = mock_client_instance
address = "00:11:22:33:44:55"
ble_conn = BLEConnection(address=address)
asyncio.run(ble_conn.connect())
# Act
data_to_send = b"Hello, BLE"
asyncio.run(ble_conn.send(data_to_send))
# Assert
ble_conn.rx_char.write_gatt_char.assert_called_once_with(ble_conn.rx_char, data_to_send, response=False)
def _get_mock_bleak_client(self):
"""
Creates a mock BleakClient instance with all the necessary async methods and attributes.
"""
mock_client = MagicMock()
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.start_notify = AsyncMock()
mock_client.write_gatt_char = AsyncMock()
mock_client.is_connected = True
mock_service = MagicMock()
mock_char = MagicMock()
mock_char.uuid = UART_RX_CHAR_UUID
mock_char.write_gatt_char = mock_client.write_gatt_char
mock_service.get_characteristic.return_value = mock_char
mock_client.services.get_service.return_value = mock_service
return mock_client
if __name__ == '__main__':
unittest.main()