mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-06-11 11:56:18 +00:00
G4: add verification tests for F04, NEW-A, F18, M06, F16, F17, N04
10 new tests in tests/unit/test_g4_transport_symmetry.py covering all G4 findings: - test_g4_tcp_send_write_error_fires_disconnect (F04): TCP write OSError fires _disconnect_callback. - test_g4_serial_send_no_transport_fires_disconnect (NEW-A): serial send on None transport fires _disconnect_callback. - test_g4_serial_send_write_error_fires_disconnect (F04): serial write OSError fires _disconnect_callback. - test_g4_ble_send_no_client_fires_disconnect (F04): BLE send with no client fires _disconnect_callback. - test_g4_serial_connect_timeout (F18): connect raises TimeoutError when connection_made never fires. - test_g4_tcp_oversize_frame_empty_data_returns (M06): oversize header with empty trailing data returns without dispatch. - test_g4_serial_oversize_frame_empty_data_returns (M06): same for serial transport. - test_g4_tcp_receive_count_per_frame_not_per_segment (N04): 3 TCP segments carrying 1 frame yield _receive_count == 1. - test_g4_tcp_multiple_frames_count_correctly (N04): 2 complete frames yield _receive_count == 2. F16 and F17 are covered by the updated pre-existing test in tests/test_ble_pin_pairing.py (committed with F17). Refs: Forensics report findings F04, NEW-A, F18, M06, N04
This commit is contained in:
238
tests/unit/test_g4_transport_symmetry.py
Normal file
238
tests/unit/test_g4_transport_symmetry.py
Normal file
@@ -0,0 +1,238 @@
|
|||||||
|
"""
|
||||||
|
Verification tests for G4 — Transport symmetry fixes.
|
||||||
|
|
||||||
|
Covers: F04 (send symmetry across transports), NEW-A (serial disconnect
|
||||||
|
callback on transport-lost), F18 (serial connect timeout), M06 (oversize-frame
|
||||||
|
return), F16 (BLE disconnect-callback re-registration), F17 (BLE pairing
|
||||||
|
failure re-raise), N04 (TCP counter per frame not per segment).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from meshcore.tcp_cx import TCPConnection
|
||||||
|
from meshcore.serial_cx import SerialConnection
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class RecordingReader:
|
||||||
|
"""Minimal reader mock that records dispatched frames."""
|
||||||
|
def __init__(self):
|
||||||
|
self.frames = []
|
||||||
|
|
||||||
|
async def handle_rx(self, data):
|
||||||
|
self.frames.append(bytes(data))
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# F04 — TCP send() wraps transport.write in try/except
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_tcp_send_write_error_fires_disconnect():
|
||||||
|
"""TCP: OSError during transport.write fires _disconnect_callback."""
|
||||||
|
cx = TCPConnection("127.0.0.1", 5000)
|
||||||
|
cb = AsyncMock()
|
||||||
|
cx.set_disconnect_callback(cb)
|
||||||
|
|
||||||
|
mock_transport = MagicMock()
|
||||||
|
mock_transport.write.side_effect = OSError("Broken pipe")
|
||||||
|
cx.transport = mock_transport
|
||||||
|
cx._send_count = 0
|
||||||
|
cx._receive_count = 0
|
||||||
|
|
||||||
|
await cx.send(b"\x01\x02\x03")
|
||||||
|
|
||||||
|
cb.assert_awaited_once()
|
||||||
|
reason = cb.call_args[0][0]
|
||||||
|
assert "tcp_write_failed" in reason
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# F04 + NEW-A — Serial send() fires disconnect on transport-lost and write error
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_serial_send_no_transport_fires_disconnect():
|
||||||
|
"""Serial: send() on None transport fires _disconnect_callback (NEW-A)."""
|
||||||
|
cx = SerialConnection("/dev/null", 115200)
|
||||||
|
cb = AsyncMock()
|
||||||
|
cx.set_disconnect_callback(cb)
|
||||||
|
cx.transport = None
|
||||||
|
|
||||||
|
await cx.send(b"\x01")
|
||||||
|
|
||||||
|
cb.assert_awaited_once()
|
||||||
|
reason = cb.call_args[0][0]
|
||||||
|
assert reason == "serial_transport_lost"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_serial_send_write_error_fires_disconnect():
|
||||||
|
"""Serial: OSError during transport.write fires _disconnect_callback."""
|
||||||
|
cx = SerialConnection("/dev/null", 115200)
|
||||||
|
cb = AsyncMock()
|
||||||
|
cx.set_disconnect_callback(cb)
|
||||||
|
|
||||||
|
mock_transport = MagicMock()
|
||||||
|
mock_transport.write.side_effect = OSError("Device not configured")
|
||||||
|
cx.transport = mock_transport
|
||||||
|
|
||||||
|
await cx.send(b"\x01")
|
||||||
|
|
||||||
|
cb.assert_awaited_once()
|
||||||
|
reason = cb.call_args[0][0]
|
||||||
|
assert "serial_write_failed" in reason
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# F04 — BLE send() fires disconnect on transport-lost and write error
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_ble_send_no_client_fires_disconnect():
|
||||||
|
"""BLE: send() with no client fires _disconnect_callback."""
|
||||||
|
# Can't import BLEConnection directly if bleak isn't installed,
|
||||||
|
# so test via dynamic import with a guard.
|
||||||
|
try:
|
||||||
|
from meshcore.ble_cx import BLEConnection
|
||||||
|
except ImportError:
|
||||||
|
pytest.skip("bleak not installed")
|
||||||
|
|
||||||
|
# BLEConnection.__init__ checks BLEAK_AVAILABLE; patch it
|
||||||
|
with patch("meshcore.ble_cx.BLEAK_AVAILABLE", True), \
|
||||||
|
patch("meshcore.ble_cx.BleakClient", MagicMock()):
|
||||||
|
cx = BLEConnection.__new__(BLEConnection)
|
||||||
|
cx.client = None
|
||||||
|
cx._user_provided_client = None
|
||||||
|
cx._user_provided_address = None
|
||||||
|
cx._user_provided_device = None
|
||||||
|
cx.address = None
|
||||||
|
cx.device = None
|
||||||
|
cx.pin = None
|
||||||
|
cx.rx_char = None
|
||||||
|
cb = AsyncMock()
|
||||||
|
cx._disconnect_callback = cb
|
||||||
|
|
||||||
|
result = await cx.send(b"\x01")
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
cb.assert_awaited_once()
|
||||||
|
reason = cb.call_args[0][0]
|
||||||
|
assert reason == "ble_transport_lost"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# F18 — Serial connect() times out if connection_made never fires
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_serial_connect_timeout():
|
||||||
|
"""Serial: connect() raises TimeoutError if connection_made never fires."""
|
||||||
|
cx = SerialConnection("/dev/null", 115200)
|
||||||
|
|
||||||
|
# Mock create_serial_connection to do nothing (never fires connection_made)
|
||||||
|
async def mock_create(*args, **kwargs):
|
||||||
|
return (MagicMock(), MagicMock())
|
||||||
|
|
||||||
|
with patch("meshcore.serial_cx.serial_asyncio.create_serial_connection",
|
||||||
|
side_effect=mock_create):
|
||||||
|
with pytest.raises(asyncio.TimeoutError):
|
||||||
|
await cx.connect(timeout=0.1)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# M06 — Oversize frame resets state and returns without dispatch
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_tcp_oversize_frame_empty_data_returns():
|
||||||
|
"""TCP: oversize header with no trailing data returns without dispatch."""
|
||||||
|
cx = TCPConnection("127.0.0.1", 5000)
|
||||||
|
reader = RecordingReader()
|
||||||
|
cx.set_reader(reader)
|
||||||
|
|
||||||
|
# Build a frame header with size > 300 and no payload data after header
|
||||||
|
# Header: 0x3e + 2-byte LE size (e.g. 500 = 0x01F4)
|
||||||
|
header = b"\x3e" + (500).to_bytes(2, "little")
|
||||||
|
cx.handle_rx(header)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
# No frames should be dispatched, and state should be reset
|
||||||
|
assert reader.frames == []
|
||||||
|
assert cx.header == b""
|
||||||
|
assert cx.inframe == b""
|
||||||
|
assert cx.frame_expected_size == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_serial_oversize_frame_empty_data_returns():
|
||||||
|
"""Serial: oversize header with no trailing data returns without dispatch."""
|
||||||
|
cx = SerialConnection("/dev/null", 115200)
|
||||||
|
reader = RecordingReader()
|
||||||
|
cx.set_reader(reader)
|
||||||
|
|
||||||
|
header = b"\x3e" + (500).to_bytes(2, "little")
|
||||||
|
cx.handle_rx(header)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
assert reader.frames == []
|
||||||
|
assert cx.header == b""
|
||||||
|
assert cx.inframe == b""
|
||||||
|
assert cx.frame_expected_size == 0
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# N04 — TCP receive counter increments per MeshCore frame, not per TCP segment
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_tcp_receive_count_per_frame_not_per_segment():
|
||||||
|
"""TCP: _receive_count increments per completed frame, not per data_received call."""
|
||||||
|
cx = TCPConnection("127.0.0.1", 5000)
|
||||||
|
reader = RecordingReader()
|
||||||
|
cx.set_reader(reader)
|
||||||
|
cx._receive_count = 0
|
||||||
|
|
||||||
|
# Build a 4-byte payload frame
|
||||||
|
payload = b"\xAA\xBB\xCC\xDD"
|
||||||
|
frame = b"\x3e" + len(payload).to_bytes(2, "little") + payload
|
||||||
|
|
||||||
|
# Split the frame into 3 TCP segments (simulating fragmentation)
|
||||||
|
protocol = TCPConnection.MCClientProtocol(cx)
|
||||||
|
protocol.data_received(frame[:2]) # partial header
|
||||||
|
protocol.data_received(frame[2:5]) # rest of header + 2 bytes payload
|
||||||
|
protocol.data_received(frame[5:]) # remaining payload
|
||||||
|
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
# 3 data_received calls but only 1 completed frame
|
||||||
|
assert cx._receive_count == 1
|
||||||
|
assert reader.frames == [payload]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_g4_tcp_multiple_frames_count_correctly():
|
||||||
|
"""TCP: two complete frames in separate segments → _receive_count == 2."""
|
||||||
|
cx = TCPConnection("127.0.0.1", 5000)
|
||||||
|
reader = RecordingReader()
|
||||||
|
cx.set_reader(reader)
|
||||||
|
cx._receive_count = 0
|
||||||
|
|
||||||
|
payload1 = b"\x01\x02"
|
||||||
|
frame1 = b"\x3e" + len(payload1).to_bytes(2, "little") + payload1
|
||||||
|
payload2 = b"\x03\x04\x05"
|
||||||
|
frame2 = b"\x3e" + len(payload2).to_bytes(2, "little") + payload2
|
||||||
|
|
||||||
|
protocol = TCPConnection.MCClientProtocol(cx)
|
||||||
|
protocol.data_received(frame1)
|
||||||
|
protocol.data_received(frame2)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
assert cx._receive_count == 2
|
||||||
|
assert reader.frames == [payload1, payload2]
|
||||||
Reference in New Issue
Block a user