starting rewriting of reader using io.BytesIO instead of fixed indexes

This commit is contained in:
Florent de Lamotte
2025-11-05 13:11:22 +01:00
parent 9b6c799d04
commit b8885e3015

View File

@@ -1,6 +1,7 @@
import logging import logging
import json import json
import time import time
import io
from typing import Any, Dict from typing import Any, Dict
from .events import Event, EventType, EventDispatcher from .events import Event, EventType, EventDispatcher
from .packets import BinaryReqType, PacketType from .packets import BinaryReqType, PacketType
@@ -48,7 +49,8 @@ class MessageReader:
del self.pending_binary_requests[tag] del self.pending_binary_requests[tag]
async def handle_rx(self, data: bytearray): async def handle_rx(self, data: bytearray):
packet_type_value = data[0] dbuf = io.BytesIO(data)
packet_type_value = dbuf.read(1)[0]
logger.debug(f"Received data: {data.hex()}") logger.debug(f"Received data: {data.hex()}")
# Handle command responses # Handle command responses
@@ -78,23 +80,24 @@ class MessageReader:
or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value
): ):
c = {} c = {}
c["public_key"] = data[1:33].hex() c["public_key"] = dbuf.read(32).hex()
c["type"] = data[33] c["type"] = dbuf.read(1)[0]
c["flags"] = data[34] c["flags"] = dbuf.read(1)[0]
c["out_path_len"] = int.from_bytes(data[35:36], signed=True, byteorder="little") plen = int.from_bytes(dbuf.read(1), signed=True, byteorder="little")
plen = int.from_bytes(data[35:36], signed=True, byteorder="little") c["out_path_len"] = plen
if plen == -1: if plen == -1:
plen = 0 plen = 0
c["out_path"] = data[36 : 36 + plen].hex() path = dbuf.read(64)
c["adv_name"] = data[100:132].decode("utf-8", "ignore").replace("\0", "") c["out_path"] = path[0:plen].hex()
c["last_advert"] = int.from_bytes(data[132:136], byteorder="little") c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "")
c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little")
c["adv_lat"] = ( c["adv_lat"] = (
int.from_bytes(data[136:140], byteorder="little", signed=True) / 1e6 int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
) )
c["adv_lon"] = ( c["adv_lon"] = (
int.from_bytes(data[140:144], byteorder="little", signed=True) / 1e6 int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
) )
c["lastmod"] = int.from_bytes(data[144:148], byteorder="little") c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little")
if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value:
await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c))
@@ -103,7 +106,7 @@ class MessageReader:
self.contacts[c["public_key"]] = c self.contacts[c["public_key"]] = c
elif packet_type_value == PacketType.CONTACT_END.value: elif packet_type_value == PacketType.CONTACT_END.value:
lastmod = int.from_bytes(data[1:5], byteorder="little") lastmod = int.from_bytes(dbuf.read(4), byteorder="little")
attributes = { attributes = {
"lastmod": lastmod, "lastmod": lastmod,
} }
@@ -113,38 +116,39 @@ class MessageReader:
elif packet_type_value == PacketType.SELF_INFO.value: elif packet_type_value == PacketType.SELF_INFO.value:
self_info = {} self_info = {}
self_info["adv_type"] = data[1] self_info["adv_type"] = dbuf.read(1)[0]
self_info["tx_power"] = data[2] self_info["tx_power"] = dbuf.read(1)[0]
self_info["max_tx_power"] = data[3] self_info["max_tx_power"] = dbuf.read(1)[0]
self_info["public_key"] = data[4:36].hex() self_info["public_key"] = dbuf.read(32).hex()
self_info["adv_lat"] = ( self_info["adv_lat"] = (
int.from_bytes(data[36:40], byteorder="little", signed=True) / 1e6 int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
) )
self_info["adv_lon"] = ( self_info["adv_lon"] = (
int.from_bytes(data[40:44], byteorder="little", signed=True) / 1e6 int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
) )
self_info["multi_acks"] = data[44] self_info["multi_acks"] = dbuf.read(1)[0]
self_info["adv_loc_policy"] = data[45] self_info["adv_loc_policy"] = dbuf.read(1)[0]
self_info["telemetry_mode_env"] = (data[46] >> 4) & 0b11 telemetry_mode = dbuf.read(1)[0]
self_info["telemetry_mode_loc"] = (data[46] >> 2) & 0b11 self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11
self_info["telemetry_mode_base"] = (data[46]) & 0b11 self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11
self_info["manual_add_contacts"] = data[47] > 0 self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11
self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0
self_info["radio_freq"] = ( self_info["radio_freq"] = (
int.from_bytes(data[48:52], byteorder="little") / 1000 int.from_bytes(dbuf.read(4), byteorder="little") / 1000
) )
self_info["radio_bw"] = ( self_info["radio_bw"] = (
int.from_bytes(data[52:56], byteorder="little") / 1000 int.from_bytes(dbuf.read(4), byteorder="little") / 1000
) )
self_info["radio_sf"] = data[56] self_info["radio_sf"] = dbuf.read(1)[0]
self_info["radio_cr"] = data[57] self_info["radio_cr"] = dbuf.read(1)[0]
self_info["name"] = data[58:].decode("utf-8", "ignore") self_info["name"] = dbuf.read().decode("utf-8", "ignore")
await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info))
elif packet_type_value == PacketType.MSG_SENT.value: elif packet_type_value == PacketType.MSG_SENT.value:
res = {} res = {}
res["type"] = data[1] res["type"] = dbuf.read(1)[0]
res["expected_ack"] = bytes(data[2:6]) res["expected_ack"] = dbuf.read(4)
res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder="little") res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little")
attributes = { attributes = {
"type": res["type"], "type": res["type"],
@@ -156,15 +160,14 @@ class MessageReader:
elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: elif packet_type_value == PacketType.CONTACT_MSG_RECV.value:
res = {} res = {}
res["type"] = "PRIV" res["type"] = "PRIV"
res["pubkey_prefix"] = data[1:7].hex() res["pubkey_prefix"] = dbuf.read(6).hex()
res["path_len"] = data[7] res["path_len"] = dbuf.read(1)[0]
res["txt_type"] = data[8] txt_type = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder="little") res["txt_type"] = txt_type
if data[8] == 2: res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["signature"] = data[13:17].hex() if txt_type == 2:
res["text"] = data[17:].decode("utf-8", "ignore") res["signature"] = dbuf.read(4).hex()
else: res["text"] = dbuf.read().decode("utf-8", "ignore")
res["text"] = data[13:].decode("utf-8", "ignore")
attributes = { attributes = {
"pubkey_prefix": res["pubkey_prefix"], "pubkey_prefix": res["pubkey_prefix"],
@@ -178,16 +181,16 @@ class MessageReader:
elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {} res = {}
res["type"] = "PRIV" res["type"] = "PRIV"
res["SNR"] = int.from_bytes(data[1:2], byteorder="little", signed=True) / 4 res["SNR"] = int.from_bytes(dbuf.read(2), byteorder="little", signed=True) / 4
res["pubkey_prefix"] = data[4:10].hex() dbuf.read(1) # reserved
res["path_len"] = data[10] res["pubkey_prefix"] = dbuf.read(6).hex()
res["txt_type"] = data[11] res["path_len"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder="little") txt_type = dbuf.read(1)[0]
if data[11] == 2: res["txt_type"] = txt_type
res["signature"] = data[16:20].hex() res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["text"] = data[20:].decode("utf-8", "ignore") if txt_type == 2:
else: res["signature"] = dbuf.read(4).hex()
res["text"] = data[16:].decode("utf-8", "ignore") res["text"] = dbuf.read().decode("utf-8", "ignore")
attributes = { attributes = {
"pubkey_prefix": res["pubkey_prefix"], "pubkey_prefix": res["pubkey_prefix"],
@@ -201,11 +204,11 @@ class MessageReader:
elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value:
res = {} res = {}
res["type"] = "CHAN" res["type"] = "CHAN"
res["channel_idx"] = data[1] res["channel_idx"] = dbuf.read(1)[0]
res["path_len"] = data[2] res["path_len"] = dbuf.read(1)[0]
res["txt_type"] = data[3] res["txt_type"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder="little") res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["text"] = data[8:].decode("utf-8", "ignore") res["text"] = dbuf.read().decode("utf-8", "ignore")
attributes = { attributes = {
"channel_idx": res["channel_idx"], "channel_idx": res["channel_idx"],
@@ -219,12 +222,13 @@ class MessageReader:
elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {} res = {}
res["type"] = "CHAN" res["type"] = "CHAN"
res["SNR"] = int.from_bytes(data[1:2], byteorder="little", signed=True) / 4 res["SNR"] = int.from_bytes(dbuf.read(2), byteorder="little", signed=True) / 4
res["channel_idx"] = data[4] dbuf.read(1) # reserved
res["path_len"] = data[5] res["channel_idx"] = dbuf.read(1)[0]
res["txt_type"] = data[6] res["path_len"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder="little") res["txt_type"] = dbuf.read(1)[0]
res["text"] = data[11:].decode("utf-8", "ignore") res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["text"] = dbuf.read().decode("utf-8", "ignore")
attributes = { attributes = {
"channel_idx": res["channel_idx"], "channel_idx": res["channel_idx"],
@@ -236,7 +240,7 @@ class MessageReader:
) )
elif packet_type_value == PacketType.CURRENT_TIME.value: elif packet_type_value == PacketType.CURRENT_TIME.value:
time_value = int.from_bytes(data[1:5], byteorder="little") time_value = int.from_bytes(dbuf.read(4), byteorder="little")
result = {"time": time_value} result = {"time": time_value}
await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result))
@@ -245,34 +249,34 @@ class MessageReader:
await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result))
elif packet_type_value == PacketType.CONTACT_URI.value: elif packet_type_value == PacketType.CONTACT_URI.value:
contact_uri = "meshcore://" + data[1:].hex() contact_uri = "meshcore://" + dbuf.read().hex()
result = {"uri": contact_uri} result = {"uri": contact_uri}
await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result))
elif packet_type_value == PacketType.BATTERY.value: elif packet_type_value == PacketType.BATTERY.value:
battery_level = int.from_bytes(data[1:3], byteorder="little") battery_level = int.from_bytes(dbuf.read(2), byteorder="little")
result = {"level": battery_level} result = {"level": battery_level}
if len(data) > 3: # has storage info as well if len(data) > 3: # has storage info as well
result["used_kb"] = int.from_bytes(data[3:7], byteorder="little") result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
result["total_kb"] = int.from_bytes(data[7:11], byteorder="little") result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) await self.dispatcher.dispatch(Event(EventType.BATTERY, result))
elif packet_type_value == PacketType.DEVICE_INFO.value: elif packet_type_value == PacketType.DEVICE_INFO.value:
res = {} res = {}
res["fw ver"] = data[1] res["fw ver"] = dbuf.read(1)[0]
if data[1] >= 3: if data[1] >= 3:
res["max_contacts"] = data[2] * 2 res["max_contacts"] = dbuf.read(1)[0] * 2
res["max_channels"] = data[3] res["max_channels"] = dbuf.read(1)[0]
res["ble_pin"] = int.from_bytes(data[4:8], byteorder="little") res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["fw_build"] = data[8:20].decode("utf-8", "ignore").replace("\0", "") res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "")
res["model"] = data[20:60].decode("utf-8", "ignore").replace("\0", "") res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "")
res["ver"] = data[60:80].decode("utf-8", "ignore").replace("\0", "") res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "")
await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res))
elif packet_type_value == PacketType.CUSTOM_VARS.value: elif packet_type_value == PacketType.CUSTOM_VARS.value:
logger.debug(f"received custom vars response: {data.hex()}") logger.debug(f"received custom vars response: {data.hex()}")
res = {} res = {}
rawdata = data[1:].decode("utf-8", "ignore") rawdata = dbuf.read().decode("utf-8", "ignore")
if not rawdata == "": if not rawdata == "":
pairs = rawdata.split(",") pairs = rawdata.split(",")
for p in pairs: for p in pairs:
@@ -284,30 +288,30 @@ class MessageReader:
elif packet_type_value == PacketType.CHANNEL_INFO.value: elif packet_type_value == PacketType.CHANNEL_INFO.value:
logger.debug(f"received channel info response: {data.hex()}") logger.debug(f"received channel info response: {data.hex()}")
res = {} res = {}
res["channel_idx"] = data[1] res["channel_idx"] = dbuf.read(1)[0]
# Channel name is null-terminated, so find the first null byte # Channel name is null-terminated, so find the first null byte
name_bytes = data[2:34] name_bytes = dbuf.read(32)
null_pos = name_bytes.find(0) null_pos = name_bytes.find(0)
if null_pos >= 0: if null_pos >= 0:
res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore")
else: else:
res["channel_name"] = name_bytes.decode("utf-8", "ignore") res["channel_name"] = name_bytes.decode("utf-8", "ignore")
res["channel_secret"] = data[34:50] res["channel_secret"] = dbuf.read(16)
await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res))
# Push notifications # Push notifications
elif packet_type_value == PacketType.ADVERTISEMENT.value: elif packet_type_value == PacketType.ADVERTISEMENT.value:
logger.debug("Advertisement received") logger.debug("Advertisement received")
res = {} res = {}
res["public_key"] = data[1:33].hex() res["public_key"] = dbuf.read(32).hex()
await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res))
elif packet_type_value == PacketType.PATH_UPDATE.value: elif packet_type_value == PacketType.PATH_UPDATE.value:
logger.debug("Code path update") logger.debug("Code path update")
res = {} res = {}
res["public_key"] = data[1:33].hex() res["public_key"] = dbuf.read(32).hex()
await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res))
elif packet_type_value == PacketType.ACK.value: elif packet_type_value == PacketType.ACK.value:
@@ -315,7 +319,7 @@ class MessageReader:
ack_data = {} ack_data = {}
if len(data) >= 5: if len(data) >= 5:
ack_data["code"] = bytes(data[1:5]).hex() ack_data["code"] = dbuf.read(4).hex()
attributes = {"code": ack_data.get("code", "")} attributes = {"code": ack_data.get("code", "")}
@@ -327,21 +331,22 @@ class MessageReader:
elif packet_type_value == PacketType.RAW_DATA.value: elif packet_type_value == PacketType.RAW_DATA.value:
res = {} res = {}
res["SNR"] = data[1] / 4 res["SNR"] = dbuf.read(1)[0] / 4
res["RSSI"] = data[2] res["RSSI"] = dbuf.read(1)[0]
res["payload"] = data[4:].hex() res["payload"] = dbuf.read(4).hex()
logger.debug("Received raw data") logger.debug("Received raw data")
print(res) print(res)
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
elif packet_type_value == PacketType.LOGIN_SUCCESS.value: elif packet_type_value == PacketType.LOGIN_SUCCESS.value:
res = {} res = {}
attributes = {}
if len(data) > 1: if len(data) > 1:
res["permissions"] = data[1] perms = dbuf.read(1)[0]
res["is_admin"] = (data[1] & 1) == 1 # Check if admin bit is set res["permissions"] = perms
res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set
if len(data) > 7: res["pubkey_prefix"] = dbuf.read(6).hex()
res["pubkey_prefix"] = data[2:8].hex()
attributes = {"pubkey_prefix": res.get("pubkey_prefix")} attributes = {"pubkey_prefix": res.get("pubkey_prefix")}
@@ -351,9 +356,12 @@ class MessageReader:
elif packet_type_value == PacketType.LOGIN_FAILED.value: elif packet_type_value == PacketType.LOGIN_FAILED.value:
res = {} res = {}
attributes = {}
pbuf.read(1)
if len(data) > 7: if len(data) > 7:
res["pubkey_prefix"] = data[2:8].hex() res["pubkey_prefix"] = pbuf.read(6).hex()
attributes = {"pubkey_prefix": res.get("pubkey_prefix")} attributes = {"pubkey_prefix": res.get("pubkey_prefix")}