mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-06-11 03:56:16 +00:00
Merge pull request #80 from mwolter805/fix/protocol-surface-gaps
feat: add missing protocol handlers (CONTACT_DELETED, CONTACTS_FULL, TUNING_PARAMS) and command wrappers
This commit is contained in:
@@ -185,6 +185,24 @@ class ContactCommands(CommandHandlerBase):
|
|||||||
data = b"\x3B"
|
data = b"\x3B"
|
||||||
return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR])
|
return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR])
|
||||||
|
|
||||||
|
async def get_contact_by_key(self, pubkey: bytes) -> Event:
|
||||||
|
"""N09: Retrieve a single contact by its public key (CMD 30).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pubkey: 32-byte public key of the contact.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Event with the contact data (same format as CONTACT/NEXT_CONTACT),
|
||||||
|
or ERROR if not found.
|
||||||
|
"""
|
||||||
|
if not isinstance(pubkey, (bytes, bytearray)):
|
||||||
|
raise TypeError("pubkey must be bytes-like")
|
||||||
|
# Truncate or pad to 32 bytes
|
||||||
|
key_bytes = bytes(pubkey[:32])
|
||||||
|
logger.debug(f"Getting contact by key: {key_bytes.hex()}")
|
||||||
|
data = b"\x1e" + key_bytes
|
||||||
|
return await self.send(data, [EventType.NEXT_CONTACT, EventType.ERROR])
|
||||||
|
|
||||||
async def get_advert_path(self, key: DestinationType) -> Event:
|
async def get_advert_path(self, key: DestinationType) -> Event:
|
||||||
key_bytes = _validate_destination(key, prefix_length=32)
|
key_bytes = _validate_destination(key, prefix_length=32)
|
||||||
logger.debug(f"getting advert path for: {key} {key_bytes.hex()}")
|
logger.debug(f"getting advert path for: {key} {key_bytes.hex()}")
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from hashlib import sha256
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from ..events import Event, EventType
|
from ..events import Event, EventType
|
||||||
|
from ..packets import CommandType
|
||||||
from .base import CommandHandlerBase, DestinationType, _validate_destination
|
from .base import CommandHandlerBase, DestinationType, _validate_destination
|
||||||
|
|
||||||
logger = logging.getLogger("meshcore")
|
logger = logging.getLogger("meshcore")
|
||||||
@@ -291,20 +292,89 @@ class DeviceCommands(CommandHandlerBase):
|
|||||||
|
|
||||||
return await self.sign_finish(timeout=timeout, data_size=len(data))
|
return await self.sign_finish(timeout=timeout, data_size=len(data))
|
||||||
|
|
||||||
|
async def has_connection(self) -> Event:
|
||||||
|
"""N09: Check if the device has an active connection (CMD 28).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Event with a 1-byte response indicating connection status,
|
||||||
|
or ERROR.
|
||||||
|
"""
|
||||||
|
logger.debug("Checking device connection status")
|
||||||
|
return await self.send(b"\x1c", [EventType.OK, EventType.ERROR])
|
||||||
|
|
||||||
|
async def get_tuning(self) -> Event:
|
||||||
|
"""N03/N09: Request current tuning parameters (CMD_GET_TUNING_PARAMS = 43).
|
||||||
|
|
||||||
|
Firmware responds with RESP_CODE_TUNING_PARAMS (23): 9 bytes containing
|
||||||
|
rx_delay (4 bytes LE) and airtime_factor (4 bytes LE).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Event of type TUNING_PARAMS with rx_delay and airtime_factor,
|
||||||
|
or ERROR.
|
||||||
|
"""
|
||||||
|
logger.debug("Getting tuning parameters")
|
||||||
|
return await self.send(b"\x2b", [EventType.TUNING_PARAMS, EventType.ERROR])
|
||||||
|
|
||||||
|
async def request_factory_reset(self) -> str:
|
||||||
|
"""N09: Request a factory reset token (step 1 of 2).
|
||||||
|
|
||||||
|
This method returns a confirmation token string. Pass it to
|
||||||
|
``confirm_factory_reset(token)`` to actually execute the reset.
|
||||||
|
The two-step pattern is a Python-side safety measure; the firmware
|
||||||
|
itself has no token verification.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A confirmation token string to pass to confirm_factory_reset().
|
||||||
|
"""
|
||||||
|
import secrets
|
||||||
|
token = secrets.token_hex(8)
|
||||||
|
logger.warning(
|
||||||
|
"Factory reset requested. Call confirm_factory_reset('%s') to proceed. "
|
||||||
|
"This will ERASE ALL DATA on the device.", token
|
||||||
|
)
|
||||||
|
# Store the token on the instance for validation
|
||||||
|
self._factory_reset_token = token
|
||||||
|
return token
|
||||||
|
|
||||||
|
async def confirm_factory_reset(self, token: str) -> Event:
|
||||||
|
"""N09: Execute factory reset after token confirmation (step 2 of 2).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
token: The token returned by request_factory_reset().
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Event with OK or ERROR.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the token does not match.
|
||||||
|
"""
|
||||||
|
expected = getattr(self, "_factory_reset_token", None)
|
||||||
|
if expected is None or token != expected:
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid or expired factory reset token. "
|
||||||
|
"Call request_factory_reset() first."
|
||||||
|
)
|
||||||
|
self._factory_reset_token = None # Consume the token
|
||||||
|
logger.warning("Executing factory reset — all device data will be erased")
|
||||||
|
return await self.send(b"\x33", [EventType.OK, EventType.ERROR])
|
||||||
|
|
||||||
async def get_stats_core(self) -> Event:
|
async def get_stats_core(self) -> Event:
|
||||||
logger.debug("Getting core statistics")
|
logger.debug("Getting core statistics")
|
||||||
# CMD_GET_STATS (56) + STATS_TYPE_CORE (0)
|
# R04: Use CommandType enum instead of literal bytes
|
||||||
return await self.send(b"\x38\x00", [EventType.STATS_CORE, EventType.ERROR])
|
cmd = bytes([CommandType.GET_STATS.value, 0x00]) # GET_STATS + STATS_TYPE_CORE
|
||||||
|
return await self.send(cmd, [EventType.STATS_CORE, EventType.ERROR])
|
||||||
|
|
||||||
async def get_stats_radio(self) -> Event:
|
async def get_stats_radio(self) -> Event:
|
||||||
logger.debug("Getting radio statistics")
|
logger.debug("Getting radio statistics")
|
||||||
# CMD_GET_STATS (56) + STATS_TYPE_RADIO (1)
|
# R04: Use CommandType enum instead of literal bytes
|
||||||
return await self.send(b"\x38\x01", [EventType.STATS_RADIO, EventType.ERROR])
|
cmd = bytes([CommandType.GET_STATS.value, 0x01]) # GET_STATS + STATS_TYPE_RADIO
|
||||||
|
return await self.send(cmd, [EventType.STATS_RADIO, EventType.ERROR])
|
||||||
|
|
||||||
async def get_stats_packets(self) -> Event:
|
async def get_stats_packets(self) -> Event:
|
||||||
logger.debug("Getting packet statistics")
|
logger.debug("Getting packet statistics")
|
||||||
# CMD_GET_STATS (56) + STATS_TYPE_PACKETS (2)
|
# R04: Use CommandType enum instead of literal bytes
|
||||||
return await self.send(b"\x38\x02", [EventType.STATS_PACKETS, EventType.ERROR])
|
cmd = bytes([CommandType.GET_STATS.value, 0x02]) # GET_STATS + STATS_TYPE_PACKETS
|
||||||
|
return await self.send(cmd, [EventType.STATS_PACKETS, EventType.ERROR])
|
||||||
|
|
||||||
async def get_allowed_repeat_freq(self) -> Event:
|
async def get_allowed_repeat_freq(self) -> Event:
|
||||||
logger.debug("Getting allowed repeat freqs")
|
logger.debug("Getting allowed repeat freqs")
|
||||||
|
|||||||
@@ -295,12 +295,34 @@ class MessagingCommands(CommandHandlerBase):
|
|||||||
cmd_data.append(flags)
|
cmd_data.append(flags)
|
||||||
cmd_data.extend(path_bytes)
|
cmd_data.extend(path_bytes)
|
||||||
|
|
||||||
|
# N05: Firmware requires strict len > 10 (MyMesh.cpp:1620).
|
||||||
|
# When path is empty, cmd(1)+tag(4)+auth(4)+flags(1) = 10 bytes exactly,
|
||||||
|
# which is silently rejected. Pad with one zero byte to reach 11.
|
||||||
|
if len(cmd_data) <= 10:
|
||||||
|
cmd_data.append(0x00)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path_bytes.hex()}"
|
f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path_bytes.hex()}"
|
||||||
)
|
)
|
||||||
|
|
||||||
return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR])
|
return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR])
|
||||||
|
|
||||||
|
async def send_raw_data(self, payload: bytes) -> Event:
|
||||||
|
"""N09: Send raw data via CMD_SEND_RAW_DATA (25).
|
||||||
|
|
||||||
|
Sends an arbitrary payload through the mesh network.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
payload: Raw bytes to send.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Event with MSG_SENT or ERROR.
|
||||||
|
"""
|
||||||
|
if not isinstance(payload, (bytes, bytearray)):
|
||||||
|
raise TypeError("payload must be bytes-like")
|
||||||
|
data = b"\x19" + bytes(payload)
|
||||||
|
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
|
||||||
|
|
||||||
async def set_flood_scope(self, scope):
|
async def set_flood_scope(self, scope):
|
||||||
if scope is None:
|
if scope is None:
|
||||||
logger.debug(f"Resetting scope")
|
logger.debug(f"Resetting scope")
|
||||||
|
|||||||
@@ -49,6 +49,9 @@ class EventType(Enum):
|
|||||||
PATH_RESPONSE = "path_response"
|
PATH_RESPONSE = "path_response"
|
||||||
PRIVATE_KEY = "private_key"
|
PRIVATE_KEY = "private_key"
|
||||||
DISABLED = "disabled"
|
DISABLED = "disabled"
|
||||||
|
CONTACT_DELETED = "contact_deleted"
|
||||||
|
CONTACTS_FULL = "contacts_full"
|
||||||
|
TUNING_PARAMS = "tuning_params"
|
||||||
CONTROL_DATA = "control_data"
|
CONTROL_DATA = "control_data"
|
||||||
DISCOVER_RESPONSE = "discover_response"
|
DISCOVER_RESPONSE = "discover_response"
|
||||||
NEIGHBOURS_RESPONSE = "neighbours_response"
|
NEIGHBOURS_RESPONSE = "neighbours_response"
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ class CommandType(Enum):
|
|||||||
SET_AUTOADD_CONFIG = 58
|
SET_AUTOADD_CONFIG = 58
|
||||||
GET_AUTOADD_CONFIG = 59
|
GET_AUTOADD_CONFIG = 59
|
||||||
GET_ALLOWED_REPEAT_FREQ = 60
|
GET_ALLOWED_REPEAT_FREQ = 60
|
||||||
|
GET_STATS = 56 # R04: CMD_GET_STATS — used by get_stats_core/radio/packets
|
||||||
SET_PATH_HASH_MODE = 61
|
SET_PATH_HASH_MODE = 61
|
||||||
|
|
||||||
# Packet prefixes for the protocol
|
# Packet prefixes for the protocol
|
||||||
@@ -120,3 +121,6 @@ class PacketType(Enum):
|
|||||||
PATH_DISCOVERY_RESPONSE = 0x8D
|
PATH_DISCOVERY_RESPONSE = 0x8D
|
||||||
CONTROL_DATA = 0x8E
|
CONTROL_DATA = 0x8E
|
||||||
CONTACT_DELETED = 0x8F
|
CONTACT_DELETED = 0x8F
|
||||||
|
CONTACTS_FULL = 0x90 # N02: MyMesh::onContactsFull() — 1-byte push, no payload
|
||||||
|
# Note: 0x90 == ControlType.NODE_DISCOVER_RESP in a different namespace.
|
||||||
|
# Not a literal conflict (PacketType vs ControlType), but a maintenance hazard.
|
||||||
|
|||||||
@@ -916,6 +916,37 @@ class MessageReader:
|
|||||||
Event(EventType.DISCOVER_RESPONSE, ndr, attributes)
|
Event(EventType.DISCOVER_RESPONSE, ndr, attributes)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
elif packet_type_value == PacketType.CONTACT_DELETED.value:
|
||||||
|
# N01: PUSH_CODE_CONTACT_DELETED (0x8F) — 1-byte code + 32-byte pubkey
|
||||||
|
# Emitted by MyMesh::onContactOverwrite() (MyMesh.cpp:325-334)
|
||||||
|
if len(data) < 33:
|
||||||
|
logger.debug("CONTACT_DELETED frame too short (%d bytes, need 33)", len(data))
|
||||||
|
return
|
||||||
|
pubkey = data[1:33].hex()
|
||||||
|
await self.dispatcher.dispatch(
|
||||||
|
Event(EventType.CONTACT_DELETED, {"pubkey": pubkey}, {"pubkey": pubkey})
|
||||||
|
)
|
||||||
|
|
||||||
|
elif packet_type_value == PacketType.CONTACTS_FULL.value:
|
||||||
|
# N02: PUSH_CODE_CONTACTS_FULL (0x90) — 1-byte push, no payload
|
||||||
|
# Emitted by MyMesh::onContactsFull() (MyMesh.cpp:336)
|
||||||
|
await self.dispatcher.dispatch(Event(EventType.CONTACTS_FULL, {}))
|
||||||
|
|
||||||
|
elif packet_type_value == PacketType.TUNING_PARAMS.value:
|
||||||
|
# N03: RESP_CODE_TUNING_PARAMS (23) — response to CMD_GET_TUNING_PARAMS (43)
|
||||||
|
# Format: 1-byte code + 4-byte rx_delay (LE) + 4-byte airtime_factor (LE) = 9 bytes
|
||||||
|
# Emitted by MyMesh.cpp:1307-1313
|
||||||
|
if len(data) < 9:
|
||||||
|
logger.debug("TUNING_PARAMS frame too short (%d bytes, need 9)", len(data))
|
||||||
|
await self.dispatcher.dispatch(
|
||||||
|
Event(EventType.ERROR, {"reason": "invalid_frame_length"})
|
||||||
|
)
|
||||||
|
return
|
||||||
|
rx_delay = int.from_bytes(data[1:5], byteorder="little")
|
||||||
|
airtime_factor = int.from_bytes(data[5:9], byteorder="little")
|
||||||
|
res = {"rx_delay": rx_delay, "airtime_factor": airtime_factor}
|
||||||
|
await self.dispatcher.dispatch(Event(EventType.TUNING_PARAMS, res))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.debug(f"Unhandled data received {data}")
|
logger.debug(f"Unhandled data received {data}")
|
||||||
logger.debug(f"Unhandled packet type: {packet_type_value}")
|
logger.debug(f"Unhandled packet type: {packet_type_value}")
|
||||||
|
|||||||
364
tests/unit/test_protocol_surface_gaps.py
Normal file
364
tests/unit/test_protocol_surface_gaps.py
Normal file
@@ -0,0 +1,364 @@
|
|||||||
|
"""Verification tests for protocol surface gaps.
|
||||||
|
|
||||||
|
Each test constructs a mock firmware frame and verifies the SDK dispatches
|
||||||
|
the correct EventType with the expected payload fields.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import struct
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
from meshcore.events import Event, EventType, EventDispatcher
|
||||||
|
from meshcore.reader import MessageReader
|
||||||
|
from meshcore.packets import PacketType, CommandType
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _make_reader():
|
||||||
|
"""Create a MessageReader with a mock dispatcher that records dispatched events."""
|
||||||
|
dispatcher = MagicMock(spec=EventDispatcher)
|
||||||
|
dispatched = []
|
||||||
|
|
||||||
|
async def _capture(event):
|
||||||
|
dispatched.append(event)
|
||||||
|
|
||||||
|
dispatcher.dispatch = AsyncMock(side_effect=_capture)
|
||||||
|
reader = MessageReader(dispatcher)
|
||||||
|
return reader, dispatched
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# CONTACT_DELETED handler
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_contact_deleted_dispatches_event():
|
||||||
|
"""A 33-byte CONTACT_DELETED frame dispatches EventType.CONTACT_DELETED."""
|
||||||
|
reader, dispatched = _make_reader()
|
||||||
|
pubkey = bytes(range(32))
|
||||||
|
frame = bytes([PacketType.CONTACT_DELETED.value]) + pubkey
|
||||||
|
assert len(frame) == 33
|
||||||
|
|
||||||
|
await reader.handle_rx(bytearray(frame))
|
||||||
|
|
||||||
|
assert len(dispatched) == 1
|
||||||
|
evt = dispatched[0]
|
||||||
|
assert evt.type == EventType.CONTACT_DELETED
|
||||||
|
assert evt.payload["pubkey"] == pubkey.hex()
|
||||||
|
assert evt.attributes["pubkey"] == pubkey.hex()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_contact_deleted_short_frame_ignored():
|
||||||
|
"""A CONTACT_DELETED frame shorter than 33 bytes is silently dropped."""
|
||||||
|
reader, dispatched = _make_reader()
|
||||||
|
# Only 10 bytes — too short
|
||||||
|
frame = bytes([PacketType.CONTACT_DELETED.value]) + b"\x00" * 9
|
||||||
|
|
||||||
|
await reader.handle_rx(bytearray(frame))
|
||||||
|
|
||||||
|
assert len(dispatched) == 0
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# CONTACTS_FULL handler + enum entry
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_contacts_full_enum_exists():
|
||||||
|
"""PacketType.CONTACTS_FULL == 0x90."""
|
||||||
|
assert PacketType.CONTACTS_FULL.value == 0x90
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_contacts_full_dispatches_event():
|
||||||
|
"""A 1-byte CONTACTS_FULL push dispatches EventType.CONTACTS_FULL."""
|
||||||
|
reader, dispatched = _make_reader()
|
||||||
|
frame = bytes([PacketType.CONTACTS_FULL.value])
|
||||||
|
|
||||||
|
await reader.handle_rx(bytearray(frame))
|
||||||
|
|
||||||
|
assert len(dispatched) == 1
|
||||||
|
evt = dispatched[0]
|
||||||
|
assert evt.type == EventType.CONTACTS_FULL
|
||||||
|
assert evt.payload == {}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TUNING_PARAMS handler
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_tuning_params_dispatches_event():
|
||||||
|
"""A 9-byte TUNING_PARAMS frame dispatches with rx_delay and airtime_factor."""
|
||||||
|
reader, dispatched = _make_reader()
|
||||||
|
rx_delay = 500
|
||||||
|
airtime_factor = 200
|
||||||
|
frame = (
|
||||||
|
bytes([PacketType.TUNING_PARAMS.value])
|
||||||
|
+ rx_delay.to_bytes(4, "little")
|
||||||
|
+ airtime_factor.to_bytes(4, "little")
|
||||||
|
)
|
||||||
|
assert len(frame) == 9
|
||||||
|
|
||||||
|
await reader.handle_rx(bytearray(frame))
|
||||||
|
|
||||||
|
assert len(dispatched) == 1
|
||||||
|
evt = dispatched[0]
|
||||||
|
assert evt.type == EventType.TUNING_PARAMS
|
||||||
|
assert evt.payload["rx_delay"] == 500
|
||||||
|
assert evt.payload["airtime_factor"] == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_tuning_params_short_frame_dispatches_error():
|
||||||
|
"""A TUNING_PARAMS frame shorter than 9 bytes dispatches ERROR."""
|
||||||
|
reader, dispatched = _make_reader()
|
||||||
|
# Only 5 bytes — too short
|
||||||
|
frame = bytes([PacketType.TUNING_PARAMS.value]) + b"\x01\x00\x00\x00"
|
||||||
|
|
||||||
|
await reader.handle_rx(bytearray(frame))
|
||||||
|
|
||||||
|
assert len(dispatched) == 1
|
||||||
|
evt = dispatched[0]
|
||||||
|
assert evt.type == EventType.ERROR
|
||||||
|
assert evt.payload["reason"] == "invalid_frame_length"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# send_trace() one-byte pad
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_trace_empty_path_pads_to_11_bytes():
|
||||||
|
"""send_trace() with no path produces an 11-byte packet (not 10)."""
|
||||||
|
from meshcore.commands.messaging import MessagingCommands
|
||||||
|
|
||||||
|
cmd = MessagingCommands.__new__(MessagingCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
await cmd.send_trace(auth_code=0, tag=1, flags=0, path=None)
|
||||||
|
|
||||||
|
assert captured_data is not None
|
||||||
|
# cmd(1) + tag(4) + auth(4) + flags(1) + pad(1) = 11
|
||||||
|
assert len(captured_data) == 11
|
||||||
|
assert captured_data[-1] == 0x00 # The pad byte
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_trace_with_path_no_padding():
|
||||||
|
"""send_trace() with a non-empty path does NOT add padding."""
|
||||||
|
from meshcore.commands.messaging import MessagingCommands
|
||||||
|
|
||||||
|
cmd = MessagingCommands.__new__(MessagingCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
# 2-byte path hash (flags=1 means hash_len=2)
|
||||||
|
await cmd.send_trace(auth_code=0, tag=1, flags=1, path=b"\xAA\xBB")
|
||||||
|
|
||||||
|
assert captured_data is not None
|
||||||
|
# cmd(1) + tag(4) + auth(4) + flags(1) + path(2) = 12 — no pad needed
|
||||||
|
assert len(captured_data) == 12
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Command wrapper: send_raw_data
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_raw_data_wrapper():
|
||||||
|
"""send_raw_data sends CMD 0x19 + payload."""
|
||||||
|
from meshcore.commands.messaging import MessagingCommands
|
||||||
|
|
||||||
|
cmd = MessagingCommands.__new__(MessagingCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
await cmd.send_raw_data(b"\xDE\xAD")
|
||||||
|
|
||||||
|
assert captured_data is not None
|
||||||
|
assert captured_data[0] == 0x19 # CMD_SEND_RAW_DATA
|
||||||
|
assert captured_data[1:] == b"\xDE\xAD"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Command wrapper: has_connection
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_has_connection_wrapper():
|
||||||
|
"""has_connection sends CMD 0x1c."""
|
||||||
|
from meshcore.commands.device import DeviceCommands
|
||||||
|
|
||||||
|
cmd = DeviceCommands.__new__(DeviceCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.OK, {"value": 1})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
await cmd.has_connection()
|
||||||
|
|
||||||
|
assert captured_data is not None
|
||||||
|
assert captured_data == b"\x1c"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Command wrapper: get_tuning
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_tuning_wrapper():
|
||||||
|
"""get_tuning sends CMD 0x2b (GET_TUNING_PARAMS = 43)."""
|
||||||
|
from meshcore.commands.device import DeviceCommands
|
||||||
|
|
||||||
|
cmd = DeviceCommands.__new__(DeviceCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.TUNING_PARAMS, {"rx_delay": 500, "airtime_factor": 200})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
result = await cmd.get_tuning()
|
||||||
|
|
||||||
|
assert captured_data == b"\x2b"
|
||||||
|
assert result.type == EventType.TUNING_PARAMS
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Command wrapper: get_contact_by_key
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_contact_by_key_wrapper():
|
||||||
|
"""get_contact_by_key sends CMD 0x1e + 32-byte pubkey."""
|
||||||
|
from meshcore.commands.contact import ContactCommands
|
||||||
|
|
||||||
|
cmd = ContactCommands.__new__(ContactCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.NEXT_CONTACT, {"public_key": "ab" * 32})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
pubkey = bytes(range(32))
|
||||||
|
await cmd.get_contact_by_key(pubkey)
|
||||||
|
|
||||||
|
assert captured_data is not None
|
||||||
|
assert captured_data[0] == 0x1E # CMD_GET_CONTACT_BY_KEY
|
||||||
|
assert captured_data[1:] == pubkey
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Command wrapper: factory_reset (two-step)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_factory_reset_two_step():
|
||||||
|
"""factory_reset requires a token from request_factory_reset."""
|
||||||
|
from meshcore.commands.device import DeviceCommands
|
||||||
|
|
||||||
|
cmd = DeviceCommands.__new__(DeviceCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.OK, {})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
# Step 1: request token
|
||||||
|
token = await cmd.request_factory_reset()
|
||||||
|
assert isinstance(token, str)
|
||||||
|
assert len(token) == 16 # hex-encoded 8 bytes
|
||||||
|
|
||||||
|
# Step 2: confirm with wrong token fails
|
||||||
|
with pytest.raises(ValueError, match="Invalid or expired"):
|
||||||
|
await cmd.confirm_factory_reset("wrong_token")
|
||||||
|
|
||||||
|
# Step 2: confirm with correct token succeeds
|
||||||
|
await cmd.confirm_factory_reset(token)
|
||||||
|
assert captured_data == b"\x33" # CMD_FACTORY_RESET
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_factory_reset_without_request_fails():
|
||||||
|
"""confirm_factory_reset without request_factory_reset raises ValueError."""
|
||||||
|
from meshcore.commands.device import DeviceCommands
|
||||||
|
|
||||||
|
cmd = DeviceCommands.__new__(DeviceCommands)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Invalid or expired"):
|
||||||
|
await cmd.confirm_factory_reset("any_token")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# GET_STATS enum entry
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_get_stats_enum_exists():
|
||||||
|
"""CommandType.GET_STATS == 56."""
|
||||||
|
assert CommandType.GET_STATS.value == 56
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_stats_core_uses_enum():
|
||||||
|
"""get_stats_core sends CommandType.GET_STATS.value (0x38) + 0x00."""
|
||||||
|
from meshcore.commands.device import DeviceCommands
|
||||||
|
|
||||||
|
cmd = DeviceCommands.__new__(DeviceCommands)
|
||||||
|
|
||||||
|
captured_data = None
|
||||||
|
|
||||||
|
async def mock_send(data, expected_events, timeout=None):
|
||||||
|
nonlocal captured_data
|
||||||
|
captured_data = bytes(data)
|
||||||
|
return Event(EventType.STATS_CORE, {})
|
||||||
|
|
||||||
|
cmd.send = mock_send
|
||||||
|
|
||||||
|
await cmd.get_stats_core()
|
||||||
|
|
||||||
|
assert captured_data is not None
|
||||||
|
assert captured_data[0] == CommandType.GET_STATS.value # 0x38 = 56
|
||||||
|
assert captured_data[1] == 0x00
|
||||||
Reference in New Issue
Block a user