Merge pull request #25 from meshcore-dev/awolden/add-binary-handling-to-reader

Awolden/add binary handling to reader
This commit is contained in:
fdlamotte
2025-09-05 07:40:30 +02:00
committed by GitHub
9 changed files with 581 additions and 177 deletions

176
README.md
View File

@@ -399,6 +399,182 @@ async def channel_handler(event):
meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler) meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler)
``` ```
## API Reference
### Event Types
All events in MeshCore are represented by the `EventType` enum. These events are dispatched by the library and can be subscribed to:
| Event Type | String Value | Description | Typical Payload |
|------------|-------------|-------------|-----------------|
| **Device & Status Events** |||
| `SELF_INFO` | `"self_info"` | Device's own information after appstart | Device configuration, public key, coordinates |
| `DEVICE_INFO` | `"device_info"` | Device capabilities and firmware info | Firmware version, model, max contacts/channels |
| `BATTERY` | `"battery_info"` | Battery level and storage info | Battery level, used/total storage |
| `CURRENT_TIME` | `"time_update"` | Device time response | Current timestamp |
| `STATUS_RESPONSE` | `"status_response"` | Device status statistics | Battery, TX queue, noise floor, packet counts |
| `CUSTOM_VARS` | `"custom_vars"` | Custom variable responses | Key-value pairs of custom variables |
| **Contact Events** |||
| `CONTACTS` | `"contacts"` | Contact list response | Dictionary of contacts by public key |
| `NEW_CONTACT` | `"new_contact"` | New contact discovered | Contact information |
| `CONTACT_URI` | `"contact_uri"` | Contact export URI | Shareable contact URI |
| **Messaging Events** |||
| `CONTACT_MSG_RECV` | `"contact_message"` | Direct message received | Message text, sender prefix, timestamp |
| `CHANNEL_MSG_RECV` | `"channel_message"` | Channel message received | Message text, channel index, timestamp |
| `MSG_SENT` | `"message_sent"` | Message send confirmation | Expected ACK code, suggested timeout |
| `NO_MORE_MSGS` | `"no_more_messages"` | No pending messages | Empty payload |
| `MESSAGES_WAITING` | `"messages_waiting"` | Messages available notification | Empty payload |
| **Network Events** |||
| `ADVERTISEMENT` | `"advertisement"` | Node advertisement detected | Public key of advertising node |
| `PATH_UPDATE` | `"path_update"` | Routing path update | Public key and path information |
| `ACK` | `"acknowledgement"` | Message acknowledgment | ACK code |
| `PATH_RESPONSE` | `"path_response"` | Path discovery response | Inbound/outbound path data |
| `TRACE_DATA` | `"trace_data"` | Route trace information | Path with SNR data for each hop |
| **Telemetry Events** |||
| `TELEMETRY_RESPONSE` | `"telemetry_response"` | Telemetry data response | LPP-formatted sensor data |
| `MMA_RESPONSE` | `"mma_response"` | Memory Management Area data | Min/max/avg telemetry over time range |
| `ACL_RESPONSE` | `"acl_response"` | Access Control List data | List of keys and permissions |
| **Channel Events** |||
| `CHANNEL_INFO` | `"channel_info"` | Channel configuration | Channel name, secret, index |
| **Raw Data Events** |||
| `RAW_DATA` | `"raw_data"` | Raw radio data | SNR, RSSI, payload hex |
| `RX_LOG_DATA` | `"rx_log_data"` | RF log data | SNR, RSSI, raw payload |
| `LOG_DATA` | `"log_data"` | Generic log data | Various log information |
| **Binary Protocol Events** |||
| `BINARY_RESPONSE` | `"binary_response"` | Generic binary response | Tag and hex data |
| **Authentication Events** |||
| `LOGIN_SUCCESS` | `"login_success"` | Successful login | Permissions, admin status, pubkey prefix |
| `LOGIN_FAILED` | `"login_failed"` | Failed login attempt | Pubkey prefix |
| **Command Response Events** |||
| `OK` | `"command_ok"` | Command successful | Success confirmation, optional value |
| `ERROR` | `"command_error"` | Command failed | Error reason or code |
| **Connection Events** |||
| `CONNECTED` | `"connected"` | Connection established | Connection details, reconnection status |
| `DISCONNECTED` | `"disconnected"` | Connection lost | Disconnection reason |
### Available Commands
All commands are async methods that return `Event` objects. Commands are organized into functional groups:
#### Device Commands (`meshcore.commands.*`)
| Command | Parameters | Returns | Description |
|---------|------------|---------|-------------|
| **Device Information** ||||
| `send_appstart()` | None | `SELF_INFO` | Get device self-information and configuration |
| `send_device_query()` | None | `DEVICE_INFO` | Query device capabilities and firmware info |
| `get_bat()` | None | `BATTERY` | Get battery level and storage information |
| `get_time()` | None | `CURRENT_TIME` | Get current device time |
| `get_self_telemetry()` | None | `TELEMETRY_RESPONSE` | Get device's own telemetry data |
| `get_custom_vars()` | None | `CUSTOM_VARS` | Retrieve all custom variables |
| **Device Configuration** ||||
| `set_name(name)` | `name: str` | `OK` | Set device name/identifier |
| `set_coords(lat, lon)` | `lat: float, lon: float` | `OK` | Set device GPS coordinates |
| `set_time(val)` | `val: int` | `OK` | Set device time (Unix timestamp) |
| `set_tx_power(val)` | `val: int` | `OK` | Set radio transmission power level |
| `set_devicepin(pin)` | `pin: int` | `OK` | Set device PIN for security |
| `set_custom_var(key, value)` | `key: str, value: str` | `OK` | Set custom variable |
| **Radio Configuration** ||||
| `set_radio(freq, bw, sf, cr)` | `freq: float, bw: float, sf: int, cr: int` | `OK` | Configure radio (freq MHz, bandwidth kHz, spreading factor, coding rate 5-8) |
| `set_tuning(rx_dly, af)` | `rx_dly: int, af: int` | `OK` | Set radio tuning parameters |
| **Telemetry Configuration** ||||
| `set_telemetry_mode_base(mode)` | `mode: int` | `OK` | Set base telemetry mode |
| `set_telemetry_mode_loc(mode)` | `mode: int` | `OK` | Set location telemetry mode |
| `set_telemetry_mode_env(mode)` | `mode: int` | `OK` | Set environmental telemetry mode |
| `set_manual_add_contacts(enabled)` | `enabled: bool` | `OK` | Enable/disable manual contact addition |
| `set_advert_loc_policy(policy)` | `policy: int` | `OK` | Set location advertisement policy |
| **Channel Management** ||||
| `get_channel(channel_idx)` | `channel_idx: int` | `CHANNEL_INFO` | Get channel configuration |
| `set_channel(channel_idx, name, secret)` | `channel_idx: int, name: str, secret: bytes` | `OK` | Configure channel (secret must be 16 bytes) |
| **Device Actions** ||||
| `send_advert(flood=False)` | `flood: bool` | `OK` | Send advertisement (optionally flood network) |
| `reboot()` | None | None | Reboot device (no response expected) |
#### Contact Commands (`meshcore.commands.*`)
| Command | Parameters | Returns | Description |
|---------|------------|---------|-------------|
| **Contact Management** ||||
| `get_contacts(lastmod=0)` | `lastmod: int` | `CONTACTS` | Get contact list (filter by last modification time) |
| `add_contact(contact)` | `contact: dict` | `OK` | Add new contact to device |
| `update_contact(contact, path, flags)` | `contact: dict, path: bytes, flags: int` | `OK` | Update existing contact |
| `remove_contact(key)` | `key: str/bytes` | `OK` | Remove contact by public key |
| **Contact Operations** ||||
| `reset_path(key)` | `key: str/bytes` | `OK` | Reset routing path for contact |
| `share_contact(key)` | `key: str/bytes` | `OK` | Share contact with network |
| `export_contact(key=None)` | `key: str/bytes/None` | `CONTACT_URI` | Export contact as URI (None exports node) |
| `import_contact(card_data)` | `card_data: bytes` | `OK` | Import contact from card data |
| **Contact Modification** ||||
| `change_contact_path(contact, path)` | `contact: dict, path: bytes` | `OK` | Change routing path for contact |
| `change_contact_flags(contact, flags)` | `contact: dict, flags: int` | `OK` | Change contact flags/settings |
#### Messaging Commands (`meshcore.commands.*`)
| Command | Parameters | Returns | Description |
|---------|------------|---------|-------------|
| **Message Handling** ||||
| `get_msg(timeout=None)` | `timeout: float` | `CONTACT_MSG_RECV/CHANNEL_MSG_RECV/NO_MORE_MSGS` | Get next pending message |
| `send_msg(dst, msg, timestamp=None)` | `dst: contact/str/bytes, msg: str, timestamp: int` | `MSG_SENT` | Send direct message |
| `send_cmd(dst, cmd, timestamp=None)` | `dst: contact/str/bytes, cmd: str, timestamp: int` | `MSG_SENT` | Send command message |
| `send_chan_msg(chan, msg, timestamp=None)` | `chan: int, msg: str, timestamp: int` | `MSG_SENT` | Send channel message |
| **Authentication** ||||
| `send_login(dst, pwd)` | `dst: contact/str/bytes, pwd: str` | `MSG_SENT` | Send login request |
| `send_logout(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Send logout request |
| **Information Requests** ||||
| `send_statusreq(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Request status from contact |
| `send_telemetry_req(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Request telemetry from contact |
| **Advanced Messaging** ||||
| `send_binary_req(dst, bin_data)` | `dst: contact/str/bytes, bin_data: bytes` | `MSG_SENT` | Send binary data request |
| `send_path_discovery(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Initiate path discovery |
| `send_trace(auth_code, tag, flags, path=None)` | `auth_code: int, tag: int, flags: int, path: list` | `MSG_SENT` | Send route trace packet |
#### Binary Protocol Commands (`meshcore.commands.*`)
| Command | Parameters | Returns | Description |
|---------|------------|---------|-------------|
| `req_status(contact, timeout=0)` | `contact: dict, timeout: float` | `STATUS_RESPONSE` | Get detailed status via binary protocol |
| `req_telemetry(contact, timeout=0)` | `contact: dict, timeout: float` | `TELEMETRY_RESPONSE` | Get telemetry via binary protocol |
| `req_mma(contact, start, end, timeout=0)` | `contact: dict, start: int, end: int, timeout: float` | `MMA_RESPONSE` | Get historical telemetry data |
| `req_acl(contact, timeout=0)` | `contact: dict, timeout: float` | `ACL_RESPONSE` | Get access control list |
### Helper Methods
| Method | Returns | Description |
|--------|---------|-------------|
| `get_contact_by_name(name)` | `dict/None` | Find contact by advertisement name |
| `get_contact_by_key_prefix(prefix)` | `dict/None` | Find contact by partial public key |
| `is_connected` | `bool` | Check if device is currently connected |
| `subscribe(event_type, callback, filters=None)` | `Subscription` | Subscribe to events with optional filtering |
| `unsubscribe(subscription)` | None | Remove event subscription |
| `wait_for_event(event_type, filters=None, timeout=None)` | `Event/None` | Wait for specific event |
### Event Filtering
Events can be filtered by their attributes when subscribing:
```python
# Filter by public key prefix
meshcore.subscribe(
EventType.CONTACT_MSG_RECV,
handler,
attribute_filters={"pubkey_prefix": "a1b2c3d4e5f6"}
)
# Filter by channel index
meshcore.subscribe(
EventType.CHANNEL_MSG_RECV,
handler,
attribute_filters={"channel_idx": 0}
)
# Filter acknowledgments by code
meshcore.subscribe(
EventType.ACK,
handler,
attribute_filters={"code": "12345678"}
)
```
## Examples in the Repo ## Examples in the Repo
Check the `examples/` directory for more: Check the `examples/` directory for more:

View File

@@ -3,6 +3,8 @@ import logging
import random import random
from typing import Any, Callable, Coroutine, Dict, List, Optional, Union from typing import Any, Callable, Coroutine, Dict, List, Optional, Union
from meshcore.packets import BinaryReqType
from ..events import Event, EventDispatcher, EventType from ..events import Event, EventDispatcher, EventType
from ..reader import MessageReader from ..reader import MessageReader
@@ -144,3 +146,24 @@ class CommandHandlerBase:
return Event(EventType.ERROR, {"error": str(e)}) return Event(EventType.ERROR, {"error": str(e)})
# For commands that don't expect events, return a success event # For commands that don't expect events, return a success event
return Event(EventType.OK, {}) return Event(EventType.OK, {})
# attached at base because its a common method
async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, timeout=None) -> Event:
dst_bytes = _validate_destination(dst, prefix_length=32)
pubkey_prefix = _validate_destination(dst, prefix_length=6)
logger.debug(f"Binary request to {dst_bytes.hex()}")
data = b"\x32" + dst_bytes + request_type.value.to_bytes(1, "little", signed=False) + (data if data else b"")
result = await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
# Register the request with the reader if we have both reader and request_type
if (result.type == EventType.MSG_SENT and
self._reader is not None and
request_type is not None):
exp_tag = result.payload["expected_ack"].hex()
# Use provided timeout or fallback to suggested timeout (with 5s default)
actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0
self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout)
return result

View File

@@ -1,158 +1,112 @@
import logging import logging
from enum import Enum
import json
from .base import CommandHandlerBase from .base import CommandHandlerBase
from ..events import EventType from ..events import EventType
from cayennelpp import LppFrame, LppData from ..packets import BinaryReqType
from cayennelpp.lpp_type import LppType
from ..lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val
logger = logging.getLogger("meshcore") logger = logging.getLogger("meshcore")
class BinaryReqType(Enum):
STATUS = 0x01
KEEP_ALIVE = 0x02
TELEMETRY = 0x03
MMA = 0x04
ACL = 0x05
def lpp_parse(buf):
"""Parse a given byte string and return as a LppFrame object."""
i = 0
lpp_data_list = []
while i < len(buf) and buf[i] != 0:
lppdata = LppData.from_bytes(buf[i:])
lpp_data_list.append(lppdata)
i = i + len(lppdata)
return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder))
def lpp_parse_mma(buf):
i = 0
res = []
while i < len(buf) and buf[i] != 0:
chan = buf[i]
i = i + 1
type = buf[i]
lpp_type = LppType.get_lpp_type(type)
size = lpp_type.size
i = i + 1
min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
res.append(
{
"channel": chan,
"type": my_lpp_types[type][0],
"min": min,
"max": max,
"avg": avg,
}
)
return res
def parse_acl(buf):
i = 0
res = []
while i + 7 <= len(buf):
key = buf[i : i + 6].hex()
perm = buf[i + 6]
if key != "000000000000":
res.append({"key": key, "perm": perm})
i = i + 7
return res
class BinaryCommandHandler(CommandHandlerBase): class BinaryCommandHandler(CommandHandlerBase):
"""Helper functions to handle binary requests through binary commands""" """Helper functions to handle binary requests through binary commands"""
async def req_binary(self, contact, request, timeout=0):
res = await self.send_binary_req(contact, request)
logger.debug(res)
if res.type == EventType.ERROR:
logger.error("Error while requesting binary data")
return None
else:
exp_tag = res.payload["expected_ack"].hex()
timeout = (
res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
)
res2 = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": exp_tag},
timeout=timeout,
)
logger.debug(res2)
if res2 is None:
return None
else:
return res2.payload
async def req_status(self, contact, timeout=0): async def req_status(self, contact, timeout=0):
code = BinaryReqType.STATUS.value res = await self.send_binary_req(
req = code.to_bytes(1, "little", signed=False) contact,
rep = await self.req_binary(contact, req, timeout) BinaryReqType.STATUS,
timeout=timeout
if rep is None : )
if res.type == EventType.ERROR:
return None return None
else:
data=bytes.fromhex(rep["data"]) exp_tag = res.payload["expected_ack"].hex()
res = {} timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
res["pubkey_pre"] = contact["public_key"][0:12]
res["bat"] = int.from_bytes(data[0:2], byteorder="little") if self.dispatcher is None:
res["tx_queue_len"] = int.from_bytes(data[2:4], byteorder="little") return None
res["noise_floor"] = int.from_bytes(data[4:6], byteorder="little", signed=True)
res["last_rssi"] = int.from_bytes(data[6:8], byteorder="little", signed=True) status_event = await self.dispatcher.wait_for_event(
res["nb_recv"] = int.from_bytes(data[8:12], byteorder="little", signed=False) EventType.STATUS_RESPONSE,
res["nb_sent"] = int.from_bytes(data[12:16], byteorder="little", signed=False) attribute_filters={"tag": exp_tag},
res["airtime"] = int.from_bytes(data[16:20], byteorder="little") timeout=timeout,
res["uptime"] = int.from_bytes(data[20:24], byteorder="little") )
res["sent_flood"] = int.from_bytes(data[24:28], byteorder="little")
res["sent_direct"] = int.from_bytes(data[28:32], byteorder="little") return status_event.payload if status_event else None
res["recv_flood"] = int.from_bytes(data[32:36], byteorder="little")
res["recv_direct"] = int.from_bytes(data[36:40], byteorder="little")
res["full_evts"] = int.from_bytes(data[40:42], byteorder="little")
res["last_snr"] = (int.from_bytes(data[42:44], byteorder="little", signed=True) / 4)
res["direct_dups"] = int.from_bytes(data[44:46], byteorder="little")
res["flood_dups"] = int.from_bytes(data[46:48], byteorder="little")
res["rx_airtime"] = int.from_bytes(data[48:52], byteorder="little")
return res if res["uptime"] > 0 else None
async def req_telemetry(self, contact, timeout=0): async def req_telemetry(self, contact, timeout=0):
code = BinaryReqType.TELEMETRY.value res = await self.send_binary_req(
req = code.to_bytes(1, "little", signed=False) contact,
res = await self.req_binary(contact, req, timeout) BinaryReqType.TELEMETRY,
if res is None: timeout=timeout
)
if res.type == EventType.ERROR:
return None return None
else:
return lpp_parse(bytes.fromhex(res["data"])) timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
if self.dispatcher is None:
return None
# Listen for TELEMETRY_RESPONSE event
telem_event = await self.dispatcher.wait_for_event(
EventType.TELEMETRY_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
return telem_event.payload["lpp"] if telem_event else None
async def req_mma(self, contact, start, end, timeout=0): async def req_mma(self, contact, start, end, timeout=0):
code = BinaryReqType.MMA.value
req = ( req = (
code.to_bytes(1, "little", signed=False) start.to_bytes(4, "little", signed=False)
+ start.to_bytes(4, "little", signed=False)
+ end.to_bytes(4, "little", signed=False) + end.to_bytes(4, "little", signed=False)
+ b"\0\0" + b"\0\0"
) )
res = await self.req_binary(contact, req, timeout) res = await self.send_binary_req(
if res is None: contact,
BinaryReqType.MMA,
data=req,
timeout=timeout
)
if res.type == EventType.ERROR:
return None return None
else:
return lpp_parse_mma(bytes.fromhex(res["data"])[4:]) timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
if self.dispatcher is None:
return None
# Listen for MMA_RESPONSE
mma_event = await self.dispatcher.wait_for_event(
EventType.MMA_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
return mma_event.payload["mma_data"] if mma_event else None
async def req_acl(self, contact, timeout=0): async def req_acl(self, contact, timeout=0):
code = BinaryReqType.ACL.value req = b"\0\0"
req = code.to_bytes(1, "little", signed=False) + b"\0\0" res = await self.send_binary_req(
res = await self.req_binary(contact, req, timeout) contact,
if res is None: BinaryReqType.ACL,
data=req,
timeout=timeout
)
if res.type == EventType.ERROR:
return None return None
else:
return parse_acl(bytes.fromhex(res["data"])) timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
if self.dispatcher is None:
return None
# Listen for ACL_RESPONSE event with matching pubkey
contact_pubkey_prefix = contact["public_key"][0:12]
acl_event = await self.dispatcher.wait_for_event(
EventType.ACL_RESPONSE,
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
timeout=timeout,
)
return acl_event.payload["acl_data"] if acl_event else None

View File

@@ -97,12 +97,6 @@ class MessagingCommands(CommandHandlerBase):
data = b"\x27\x00\x00\x00" + dst_bytes data = b"\x27\x00\x00\x00" + dst_bytes
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_binary_req(self, dst: DestinationType, bin_data) -> Event:
dst_bytes = _validate_destination(dst, prefix_length=32)
logger.debug(f"Binary request to {dst_bytes.hex()}")
data = b"\x32" + dst_bytes + bin_data
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_path_discovery(self, dst: DestinationType) -> Event: async def send_path_discovery(self, dst: DestinationType) -> Event:
dst_bytes = _validate_destination(dst, prefix_length=32) dst_bytes = _validate_destination(dst, prefix_length=32)
logger.debug(f"Path discovery request for {dst_bytes.hex()}") logger.debug(f"Path discovery request for {dst_bytes.hex()}")

View File

@@ -36,6 +36,8 @@ class EventType(Enum):
RX_LOG_DATA = "rx_log_data" RX_LOG_DATA = "rx_log_data"
TELEMETRY_RESPONSE = "telemetry_response" TELEMETRY_RESPONSE = "telemetry_response"
BINARY_RESPONSE = "binary_response" BINARY_RESPONSE = "binary_response"
MMA_RESPONSE = "mma_response"
ACL_RESPONSE = "acl_response"
CUSTOM_VARS = "custom_vars" CUSTOM_VARS = "custom_vars"
CHANNEL_INFO = "channel_info" CHANNEL_INFO = "channel_info"
PATH_RESPONSE = "path_response" PATH_RESPONSE = "path_response"

View File

@@ -1,5 +1,11 @@
from enum import Enum from enum import Enum
class BinaryReqType(Enum):
STATUS = 0x01
KEEP_ALIVE = 0x02
TELEMETRY = 0x03
MMA = 0x04
ACL = 0x05
# Packet prefixes for the protocol # Packet prefixes for the protocol
class PacketType(Enum): class PacketType(Enum):

109
src/meshcore/parsing.py Normal file
View File

@@ -0,0 +1,109 @@
import logging
from enum import Enum
import json
from cayennelpp import LppFrame, LppData
from cayennelpp.lpp_type import LppType
from .lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val
logger = logging.getLogger("meshcore")
def lpp_parse(buf):
"""Parse a given byte string and return as a LppFrame object."""
i = 0
lpp_data_list = []
while i < len(buf) and buf[i] != 0:
lppdata = LppData.from_bytes(buf[i:])
lpp_data_list.append(lppdata)
i = i + len(lppdata)
return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder))
def lpp_parse_mma(buf):
i = 0
res = []
while i < len(buf) and buf[i] != 0:
chan = buf[i]
i = i + 1
type = buf[i]
lpp_type = LppType.get_lpp_type(type)
if lpp_type is None:
logger.error(f"Unknown LPP type: {type}")
return None
size = lpp_type.size
i = i + 1
min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
res.append(
{
"channel": chan,
"type": my_lpp_types[type][0],
"min": min,
"max": max,
"avg": avg,
}
)
return res
def parse_acl(buf):
i = 0
res = []
while i + 7 <= len(buf):
key = buf[i : i + 6].hex()
perm = buf[i + 6]
if key != "000000000000":
res.append({"key": key, "perm": perm})
i = i + 7
return res
def parse_status(data, pubkey_prefix=None, offset=0):
"""
Parse binary data into a dictionary of fields.
Args:
data: bytes object containing the data to parse
pubkey_prefix: Either a string prefix or None (if None, extract from data)
offset: Starting offset for field parsing (0 or 8)
Returns:
Dictionary with parsed fields
"""
res = {}
# Handle pubkey
if pubkey_prefix is None:
# Extract from data (format 1)
res["pubkey_pre"] = data[2:8].hex()
offset = 8 # Fields start at offset 8
else:
# Use provided prefix (format 2)
res["pubkey_pre"] = pubkey_prefix
# offset stays as provided (typically 0)
# Parse all fields with the given offset
res["bat"] = int.from_bytes(data[offset:offset+2], byteorder="little")
res["tx_queue_len"] = int.from_bytes(data[offset+2:offset+4], byteorder="little")
res["noise_floor"] = int.from_bytes(data[offset+4:offset+6], byteorder="little", signed=True)
res["last_rssi"] = int.from_bytes(data[offset+6:offset+8], byteorder="little", signed=True)
res["nb_recv"] = int.from_bytes(data[offset+8:offset+12], byteorder="little", signed=False)
res["nb_sent"] = int.from_bytes(data[offset+12:offset+16], byteorder="little", signed=False)
res["airtime"] = int.from_bytes(data[offset+16:offset+20], byteorder="little")
res["uptime"] = int.from_bytes(data[offset+20:offset+24], byteorder="little")
res["sent_flood"] = int.from_bytes(data[offset+24:offset+28], byteorder="little")
res["sent_direct"] = int.from_bytes(data[offset+28:offset+32], byteorder="little")
res["recv_flood"] = int.from_bytes(data[offset+32:offset+36], byteorder="little")
res["recv_direct"] = int.from_bytes(data[offset+36:offset+40], byteorder="little")
res["full_evts"] = int.from_bytes(data[offset+40:offset+42], byteorder="little")
res["last_snr"] = int.from_bytes(data[offset+42:offset+44], byteorder="little", signed=True) / 4
res["direct_dups"] = int.from_bytes(data[offset+44:offset+46], byteorder="little")
res["flood_dups"] = int.from_bytes(data[offset+46:offset+48], byteorder="little")
res["rx_airtime"] = int.from_bytes(data[offset+48:offset+52], byteorder="little")
return res

View File

@@ -1,8 +1,10 @@
import logging import logging
import json import json
import time
from typing import Any, Dict from typing import Any, Dict
from .events import Event, EventType, EventDispatcher from .events import Event, EventType, EventDispatcher
from .packets import PacketType from .packets import BinaryReqType, PacketType
from .parsing import lpp_parse, lpp_parse_mma, parse_acl, parse_status
from cayennelpp import LppFrame, LppData from cayennelpp import LppFrame, LppData
from meshcore.lpp_json_encoder import lpp_json_encoder from meshcore.lpp_json_encoder import lpp_json_encoder
@@ -17,6 +19,34 @@ class MessageReader:
self.contacts = {} # Temporary storage during contact list building self.contacts = {} # Temporary storage during contact list building
self.contact_nb = 0 # Used for contact processing self.contact_nb = 0 # Used for contact processing
# Track pending binary requests by tag for proper response parsing
self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at}
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float):
"""Register a pending binary request for proper response parsing"""
# Clean up expired requests before adding new one
self.cleanup_expired_requests()
expires_at = time.time() + timeout_seconds
self.pending_binary_requests[tag] = {
"request_type": request_type,
"pubkey_prefix": prefix,
"expires_at": expires_at
}
logger.debug(f"Registered binary request: tag={tag}, type={request_type}, expires in {timeout_seconds}s")
def cleanup_expired_requests(self):
"""Remove expired binary requests"""
current_time = time.time()
expired_tags = [
tag for tag, info in self.pending_binary_requests.items()
if current_time > info["expires_at"]
]
for tag in expired_tags:
logger.debug(f"Cleaning up expired binary request: tag={tag}")
del self.pending_binary_requests[tag]
async def handle_rx(self, data: bytearray): async def handle_rx(self, data: bytearray):
packet_type_value = data[0] packet_type_value = data[0]
logger.debug(f"Received data: {data.hex()}") logger.debug(f"Received data: {data.hex()}")
@@ -330,36 +360,13 @@ class MessageReader:
) )
elif packet_type_value == PacketType.STATUS_RESPONSE.value: elif packet_type_value == PacketType.STATUS_RESPONSE.value:
res = {} res = parse_status(data, offset=8)
res["pubkey_pre"] = data[2:8].hex() data_hex = data[8:].hex()
res["bat"] = int.from_bytes(data[8:10], byteorder="little") logger.debug(f"Status response: {data_hex}")
res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder="little")
res["noise_floor"] = int.from_bytes(
data[12:14], byteorder="little", signed=True
)
res["last_rssi"] = int.from_bytes(
data[14:16], byteorder="little", signed=True
)
res["nb_recv"] = int.from_bytes(
data[16:20], byteorder="little", signed=False
)
res["nb_sent"] = int.from_bytes(
data[20:24], byteorder="little", signed=False
)
res["airtime"] = int.from_bytes(data[24:28], byteorder="little")
res["uptime"] = int.from_bytes(data[28:32], byteorder="little")
res["sent_flood"] = int.from_bytes(data[32:36], byteorder="little")
res["sent_direct"] = int.from_bytes(data[36:40], byteorder="little")
res["recv_flood"] = int.from_bytes(data[40:44], byteorder="little")
res["recv_direct"] = int.from_bytes(data[44:48], byteorder="little")
res["full_evts"] = int.from_bytes(data[48:50], byteorder="little")
res["last_snr"] = (
int.from_bytes(data[50:52], byteorder="little", signed=True) / 4
)
res["direct_dups"] = int.from_bytes(data[52:54], byteorder="little")
res["flood_dups"] = int.from_bytes(data[54:56], byteorder="little")
res["rx_airtime"] = int.from_bytes(data[56:60], byteorder="little")
attributes = {
"pubkey_prefix": res["pubkey_pre"],
}
data_hex = data[8:].hex() data_hex = data[8:].hex()
logger.debug(f"Status response: {data_hex}") logger.debug(f"Status response: {data_hex}")
@@ -490,17 +497,61 @@ class MessageReader:
elif packet_type_value == PacketType.BINARY_RESPONSE.value: elif packet_type_value == PacketType.BINARY_RESPONSE.value:
logger.debug(f"Received binary data: {data.hex()}") logger.debug(f"Received binary data: {data.hex()}")
res = {} tag = data[2:6].hex()
response_data = data[6:]
res["tag"] = data[2:6].hex()
res["data"] = data[6:].hex()
attributes = {"tag": res["tag"]}
# Always dispatch generic BINARY_RESPONSE
binary_res = {"tag": tag, "data": response_data.hex()}
await self.dispatcher.dispatch( await self.dispatcher.dispatch(
Event(EventType.BINARY_RESPONSE, res, attributes) Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag})
) )
# Check for tracked request type and dispatch specific response
if tag in self.pending_binary_requests:
request_type = self.pending_binary_requests[tag]["request_type"]
pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"]
del self.pending_binary_requests[tag]
logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}")
if request_type == BinaryReqType.STATUS and len(response_data) >= 52:
res = {}
res = parse_status(response_data, pubkey_prefix=pubkey_prefix)
await self.dispatcher.dispatch(
Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag})
)
elif request_type == BinaryReqType.TELEMETRY:
try:
lpp = lpp_parse(response_data)
telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix}
await self.dispatcher.dispatch(
Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res)
)
except Exception as e:
logger.error(f"Error parsing binary telemetry response: {e}")
elif request_type == BinaryReqType.MMA:
try:
mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header
mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix}
await self.dispatcher.dispatch(
Event(EventType.MMA_RESPONSE, mma_res, mma_res)
)
except Exception as e:
logger.error(f"Error parsing binary MMA response: {e}")
elif request_type == BinaryReqType.ACL:
try:
acl_result = parse_acl(response_data)
acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix}
await self.dispatcher.dispatch(
Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix})
)
except Exception as e:
logger.error(f"Error parsing binary ACL response: {e}")
else:
logger.debug(f"No tracked request found for binary response tag {tag}")
elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value:
logger.debug(f"Received path discovery response: {data.hex()}") logger.debug(f"Received path discovery response: {data.hex()}")
res = {} res = {}

89
tests/unit/test_reader.py Normal file
View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python3
import asyncio
from unittest.mock import AsyncMock
from meshcore.events import EventType
from meshcore.reader import MessageReader
class MockDispatcher:
def __init__(self):
self.dispatched_events = []
async def dispatch(self, event):
self.dispatched_events.append(event)
print(f"Dispatched: {event.type} with payload keys: {list(event.payload.keys()) if hasattr(event.payload, 'keys') else event.payload}")
import pytest
@pytest.mark.asyncio
async def test_binary_response():
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
packet_hex = "8c00417db968993acd42fc77c3bbd1f08b9b84c39756410c58cd03077162bcb489031869586ab4b103000000000000000000"
packet_data = bytearray.fromhex(packet_hex)
print(f"Testing packet: {packet_hex}")
print(f"Packet type: 0x{packet_data[0]:02x} (should be 0x8c for BINARY_RESPONSE)")
# Register the binary request first
tag = "417db968"
from meshcore.parsing import BinaryReqType
reader.register_binary_request(tag, BinaryReqType.ACL, 10.0)
print(f"Registered ACL request with tag {tag}")
await reader.handle_rx(packet_data)
# Check what was dispatched
print(f"\nTotal events dispatched: {len(mock_dispatcher.dispatched_events)}")
# Verify BINARY_RESPONSE was dispatched
binary_responses = [e for e in mock_dispatcher.dispatched_events if e.type == EventType.BINARY_RESPONSE]
assert len(binary_responses) == 1, f"Expected 1 BINARY_RESPONSE, got {len(binary_responses)}"
print("✅ BINARY_RESPONSE event dispatched correctly")
# Check the binary response payload
binary_event = binary_responses[0]
assert "tag" in binary_event.payload, "BINARY_RESPONSE should have 'tag' in payload"
assert "data" in binary_event.payload, "BINARY_RESPONSE should have 'data' in payload"
print(f"✅ Binary response tag: {binary_event.payload['tag']}")
print(f"✅ Binary response data: {binary_event.payload['data']}")
# Check if a specific parsed event was also dispatched
other_events = [e for e in mock_dispatcher.dispatched_events if e.type != EventType.BINARY_RESPONSE]
if other_events:
print(f"✅ Additional parsed event dispatched: {other_events[0].type}")
print(f" Payload keys: {list(other_events[0].payload.keys()) if hasattr(other_events[0].payload, 'keys') else other_events[0].payload}")
else:
print("⚠️ No additional parsed event dispatched")
# Parse the response data to see what request type it is
response_data = packet_data[6:]
if response_data:
request_type = response_data[0]
print(f"Request type in response: 0x{request_type:02x} ({request_type})")
# Map request types to expected events
from meshcore.parsing import BinaryReqType
if request_type == BinaryReqType.STATUS.value:
expected_event = EventType.STATUS_RESPONSE
elif request_type == BinaryReqType.TELEMETRY.value:
expected_event = EventType.TELEMETRY_RESPONSE
elif request_type == BinaryReqType.MMA.value:
expected_event = EventType.MMA_RESPONSE
elif request_type == BinaryReqType.ACL.value:
expected_event = EventType.ACL_RESPONSE
else:
expected_event = None
if expected_event:
specific_events = [e for e in mock_dispatcher.dispatched_events if e.type == expected_event]
if specific_events:
print(f"✅ Expected {expected_event} event was dispatched")
else:
print(f"❌ Expected {expected_event} event was NOT dispatched")
else:
print(f"⚠️ Unknown request type {request_type}, no specific event expected")
if __name__ == "__main__":
asyncio.run(test_binary_response())