G6: N09 — add command wrappers for orphaned CommandType entries

Why: Five CommandType enum entries had no user-facing SDK method:
SEND_RAW_DATA (25), HAS_CONNECTION (28), GET_CONTACT_BY_KEY (30),
GET_TUNING_PARAMS (43), FACTORY_RESET (51). Added send_raw_data() in
MessagingCommands, has_connection()/get_tuning()/request_factory_reset()/
confirm_factory_reset() in DeviceCommands, get_contact_by_key() in
ContactCommands. FACTORY_RESET uses a two-step token pattern as a
Python-side safety measure against accidental invocation.

Refs: Forensics report finding N09 (also N03 for get_tuning)
This commit is contained in:
Matthew Wolter
2026-04-12 04:14:31 -07:00
parent 1f319159b6
commit 6f1b48cc50
2 changed files with 94 additions and 6 deletions

View File

@@ -185,6 +185,24 @@ class ContactCommands(CommandHandlerBase):
data = b"\x3B" data = b"\x3B"
return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR]) return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR])
async def get_contact_by_key(self, pubkey: bytes) -> Event:
"""N09: Retrieve a single contact by its public key (CMD 30).
Args:
pubkey: 32-byte public key of the contact.
Returns:
Event with the contact data (same format as CONTACT/NEXT_CONTACT),
or ERROR if not found.
"""
if not isinstance(pubkey, (bytes, bytearray)):
raise TypeError("pubkey must be bytes-like")
# Truncate or pad to 32 bytes
key_bytes = bytes(pubkey[:32])
logger.debug(f"Getting contact by key: {key_bytes.hex()}")
data = b"\x1e" + key_bytes
return await self.send(data, [EventType.NEXT_CONTACT, EventType.ERROR])
async def get_advert_path(self, key: DestinationType) -> Event: async def get_advert_path(self, key: DestinationType) -> Event:
key_bytes = _validate_destination(key, prefix_length=32) key_bytes = _validate_destination(key, prefix_length=32)
logger.debug(f"getting advert path for: {key} {key_bytes.hex()}") logger.debug(f"getting advert path for: {key} {key_bytes.hex()}")

View File

@@ -4,6 +4,7 @@ from hashlib import sha256
from typing import Optional from typing import Optional
from ..events import Event, EventType from ..events import Event, EventType
from ..packets import CommandType
from .base import CommandHandlerBase, DestinationType, _validate_destination from .base import CommandHandlerBase, DestinationType, _validate_destination
logger = logging.getLogger("meshcore") logger = logging.getLogger("meshcore")
@@ -273,20 +274,89 @@ class DeviceCommands(CommandHandlerBase):
return await self.sign_finish(timeout=timeout, data_size=len(data)) return await self.sign_finish(timeout=timeout, data_size=len(data))
async def has_connection(self) -> Event:
"""N09: Check if the device has an active connection (CMD 28).
Returns:
Event with a 1-byte response indicating connection status,
or ERROR.
"""
logger.debug("Checking device connection status")
return await self.send(b"\x1c", [EventType.OK, EventType.ERROR])
async def get_tuning(self) -> Event:
"""N03/N09: Request current tuning parameters (CMD_GET_TUNING_PARAMS = 43).
Firmware responds with RESP_CODE_TUNING_PARAMS (23): 9 bytes containing
rx_delay (4 bytes LE) and airtime_factor (4 bytes LE).
Returns:
Event of type TUNING_PARAMS with rx_delay and airtime_factor,
or ERROR.
"""
logger.debug("Getting tuning parameters")
return await self.send(b"\x2b", [EventType.TUNING_PARAMS, EventType.ERROR])
async def request_factory_reset(self) -> str:
"""N09: Request a factory reset token (step 1 of 2).
This method returns a confirmation token string. Pass it to
``confirm_factory_reset(token)`` to actually execute the reset.
The two-step pattern is a Python-side safety measure; the firmware
itself has no token verification.
Returns:
A confirmation token string to pass to confirm_factory_reset().
"""
import secrets
token = secrets.token_hex(8)
logger.warning(
"Factory reset requested. Call confirm_factory_reset('%s') to proceed. "
"This will ERASE ALL DATA on the device.", token
)
# Store the token on the instance for validation
self._factory_reset_token = token
return token
async def confirm_factory_reset(self, token: str) -> Event:
"""N09: Execute factory reset after token confirmation (step 2 of 2).
Args:
token: The token returned by request_factory_reset().
Returns:
Event with OK or ERROR.
Raises:
ValueError: If the token does not match.
"""
expected = getattr(self, "_factory_reset_token", None)
if expected is None or token != expected:
raise ValueError(
"Invalid or expired factory reset token. "
"Call request_factory_reset() first."
)
self._factory_reset_token = None # Consume the token
logger.warning("Executing factory reset — all device data will be erased")
return await self.send(b"\x33", [EventType.OK, EventType.ERROR])
async def get_stats_core(self) -> Event: async def get_stats_core(self) -> Event:
logger.debug("Getting core statistics") logger.debug("Getting core statistics")
# CMD_GET_STATS (56) + STATS_TYPE_CORE (0) # R04: Use CommandType enum instead of literal bytes
return await self.send(b"\x38\x00", [EventType.STATS_CORE, EventType.ERROR]) cmd = bytes([CommandType.GET_STATS.value, 0x00]) # GET_STATS + STATS_TYPE_CORE
return await self.send(cmd, [EventType.STATS_CORE, EventType.ERROR])
async def get_stats_radio(self) -> Event: async def get_stats_radio(self) -> Event:
logger.debug("Getting radio statistics") logger.debug("Getting radio statistics")
# CMD_GET_STATS (56) + STATS_TYPE_RADIO (1) # R04: Use CommandType enum instead of literal bytes
return await self.send(b"\x38\x01", [EventType.STATS_RADIO, EventType.ERROR]) cmd = bytes([CommandType.GET_STATS.value, 0x01]) # GET_STATS + STATS_TYPE_RADIO
return await self.send(cmd, [EventType.STATS_RADIO, EventType.ERROR])
async def get_stats_packets(self) -> Event: async def get_stats_packets(self) -> Event:
logger.debug("Getting packet statistics") logger.debug("Getting packet statistics")
# CMD_GET_STATS (56) + STATS_TYPE_PACKETS (2) # R04: Use CommandType enum instead of literal bytes
return await self.send(b"\x38\x02", [EventType.STATS_PACKETS, EventType.ERROR]) cmd = bytes([CommandType.GET_STATS.value, 0x02]) # GET_STATS + STATS_TYPE_PACKETS
return await self.send(cmd, [EventType.STATS_PACKETS, EventType.ERROR])
async def get_allowed_repeat_freq(self) -> Event: async def get_allowed_repeat_freq(self) -> Event:
logger.debug("Getting allowed repeat freqs") logger.debug("Getting allowed repeat freqs")