Files
meshcore_py/src/meshcore/reader.py
2026-03-07 21:05:00 -04:00

1069 lines
47 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import json
import struct
import time
import io
from typing import Any, Dict
from .events import Event, EventType, EventDispatcher, ErrorMessages
from .packets import BinaryReqType, PacketType, ControlType
from .parsing import lpp_parse, lpp_parse_mma, parse_acl, parse_status
from cayennelpp import LppFrame, LppData
from meshcore.lpp_json_encoder import lpp_json_encoder
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
logger = logging.getLogger("meshcore")
PAYLOAD_TYPENAMES = ["REQ", "RESPONSE", "TEXT_MSG", "ACK", "ADVERT", "GRP_TXT", "GRP_DATA", "ANON_REQ", "PATH", "TRACE", "MULTIPART", "CONTROL"]
ROUTE_TYPENAMES = ["TC_FLOOD", "FLOOD", "DIRECT", "TC_DIRECT"]
CONTACT_TYPENAMES = ["NONE","CLI","REP","ROOM","SENS"]
class MessageReader:
def __init__(self, dispatcher: EventDispatcher):
self.dispatcher = dispatcher
# We're only keeping state here that's needed for processing
# before events are dispatched
self.contacts = {} # Temporary storage during contact list building
self.contact_nb = 0 # Used for contact processing
# Track pending binary requests by tag for proper response parsing
self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at}
self.channels = [{} for _ in range(40)] # keep our own copy of channels, 40 elements by default
self.decrypt_channels = True
self.channels_log = [] # stores the channel msg events
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}, is_anon=False):
"""Register a pending binary request for proper response parsing"""
# Clean up expired requests before adding new one
self.cleanup_expired_requests()
expires_at = time.time() + timeout_seconds
self.pending_binary_requests[tag] = {
"request_type": request_type,
"pubkey_prefix": prefix,
"expires_at": expires_at,
"is_anon": is_anon,
"context": context # optional info we want to keep from req to resp
}
logger.debug(f"Registered binary request: tag={tag}, type={request_type}, expires in {timeout_seconds}s")
def cleanup_expired_requests(self):
"""Remove expired binary requests"""
current_time = time.time()
expired_tags = [
tag for tag, info in self.pending_binary_requests.items()
if current_time > info["expires_at"]
]
for tag in expired_tags:
logger.debug(f"Cleaning up expired binary request: tag={tag}")
del self.pending_binary_requests[tag]
async def handle_rx(self, data: bytearray):
dbuf = io.BytesIO(data)
try:
packet_type_value = dbuf.read(1)[0]
except IndexError as e:
logger.warning(f"Received empty packet: {e}")
return
logger.debug(f"Received data: {data.hex()}")
# Handle command responses
if packet_type_value == PacketType.OK.value:
result: Dict[str, Any] = {}
if len(data) == 5:
result["value"] = int.from_bytes(data[1:5], byteorder="little")
# Dispatch event for the OK response
await self.dispatcher.dispatch(Event(EventType.OK, result))
elif packet_type_value == PacketType.ERROR.value:
if len(data) > 1:
result = {
"error_code": data[1],
"code_string": ErrorMessages[data[1]],
}
else:
result = {}
# Dispatch event for the ERROR response
await self.dispatcher.dispatch(Event(EventType.ERROR, result))
elif packet_type_value == PacketType.CONTACT_START.value:
self.contact_nb = int.from_bytes(data[1:5], byteorder="little")
self.contacts = {}
elif (
packet_type_value == PacketType.CONTACT.value
or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value
):
c = {}
c["public_key"] = dbuf.read(32).hex()
c["type"] = dbuf.read(1)[0]
c["flags"] = dbuf.read(1)[0]
plen = int.from_bytes(dbuf.read(1), signed=False, byteorder="little")
if plen == 255: # flood
c["out_path_hash_mode"] = -1
c["out_path_len"] = -1 # 6 LSB
else:
c["out_path_hash_mode"] = plen >> 6
c["out_path_len"] = plen & 0x3F # 6 LSB
c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex()
c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "")
c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little")
c["adv_lat"] = (
int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
)
c["adv_lon"] = (
int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
)
c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little")
if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value:
await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c))
else:
await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c))
self.contacts[c["public_key"]] = c
elif packet_type_value == PacketType.ADVERT_PATH.value :
r = {}
r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False)
plen = int.from_bytes(dbuf.read(1), "little", signed=False)
if plen == 255: # flood, should not happen
r["path_hash_mode"] = -1
r["path_len"] = -1
else:
r["path_hash_mode"] = plen >> 6 # 2 upper bytes
r["path_len"] = plen & 0x3F
r["path"] = dbuf.read().replace(b"\0", b"").hex()
await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r))
elif packet_type_value == PacketType.CONTACT_END.value:
lastmod = int.from_bytes(dbuf.read(4), byteorder="little")
attributes = {
"lastmod": lastmod,
}
await self.dispatcher.dispatch(
Event(EventType.CONTACTS, self.contacts, attributes)
)
elif packet_type_value == PacketType.SELF_INFO.value:
self_info = {}
self_info["adv_type"] = dbuf.read(1)[0]
self_info["tx_power"] = dbuf.read(1)[0]
self_info["max_tx_power"] = dbuf.read(1)[0]
self_info["public_key"] = dbuf.read(32).hex()
self_info["adv_lat"] = (
int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
)
self_info["adv_lon"] = (
int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6
)
self_info["multi_acks"] = dbuf.read(1)[0]
self_info["adv_loc_policy"] = dbuf.read(1)[0]
telemetry_mode = dbuf.read(1)[0]
self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11
self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11
self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11
self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0
self_info["radio_freq"] = (
int.from_bytes(dbuf.read(4), byteorder="little") / 1000
)
self_info["radio_bw"] = (
int.from_bytes(dbuf.read(4), byteorder="little") / 1000
)
self_info["radio_sf"] = dbuf.read(1)[0]
self_info["radio_cr"] = dbuf.read(1)[0]
self_info["name"] = dbuf.read().decode("utf-8", "ignore")
await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info))
elif packet_type_value == PacketType.MSG_SENT.value:
res = {}
res["type"] = dbuf.read(1)[0]
res["expected_ack"] = dbuf.read(4)
res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little")
attributes = {
"type": res["type"],
"expected_ack": res["expected_ack"].hex(),
}
await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes))
elif packet_type_value == PacketType.CONTACT_MSG_RECV.value:
res = {}
res["type"] = "PRIV"
res["pubkey_prefix"] = dbuf.read(6).hex()
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
txt_type = dbuf.read(1)[0]
res["txt_type"] = txt_type
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
if txt_type == 2:
res["signature"] = dbuf.read(4).hex()
res["text"] = dbuf.read().decode("utf-8", "ignore")
attributes = {
"pubkey_prefix": res["pubkey_prefix"],
"txt_type": res["txt_type"],
}
evt_type = EventType.CONTACT_MSG_RECV
await self.dispatcher.dispatch(Event(evt_type, res, attributes))
elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "PRIV"
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
dbuf.read(2) # reserved
res["pubkey_prefix"] = dbuf.read(6).hex()
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
txt_type = dbuf.read(1)[0]
res["txt_type"] = txt_type
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
if txt_type == 2:
res["signature"] = dbuf.read(4).hex()
res["text"] = dbuf.read().decode("utf-8", "ignore")
attributes = {
"pubkey_prefix": res["pubkey_prefix"],
"txt_type": res["txt_type"],
}
await self.dispatcher.dispatch(
Event(EventType.CONTACT_MSG_RECV, res, attributes)
)
elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value:
res = {}
res["type"] = "CHAN"
res["channel_idx"] = dbuf.read(1)[0]
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
res["txt_type"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False)
text = dbuf.read().strip(b"\0")
res["text"] = text.decode("utf-8", "ignore")
# search for text in log_channels
txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False)
if self.decrypt_channels:
logged = next((l for l in reversed(self.channels_log) if 'msg_hash' in l and l['msg_hash'] == txt_hash), None)
if not logged is None:
res["path"] = logged["path"]
res["RSSI"] = logged["rssi"]
res["SNR"] = logged["snr"]
res["recv_time"] = logged["recv_time"]
res["attempt"] = logged["attempt"]
attributes = {
"channel_idx": res["channel_idx"],
"txt_type": res["txt_type"],
}
await self.dispatcher.dispatch(
Event(EventType.CHANNEL_MSG_RECV, res, attributes)
)
elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "CHAN"
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
dbuf.read(2) # reserved
res["channel_idx"] = dbuf.read(1)[0]
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
res["txt_type"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False)
text = dbuf.read()
res["text"] = text.decode("utf-8", "ignore")
# search for text in log_channels
txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False)
res["txt_hash"] = txt_hash
logged = next((l for l in reversed(self.channels_log) if 'msg_hash' in l and l['msg_hash'] == txt_hash), None)
if self.decrypt_channels:
if not logged is None:
res["path"] = logged["path"]
res["RSSI"] = logged["rssi"]
res["recv_time"] = logged["recv_time"]
res["attempt"] = logged["attempt"]
attributes = {
"channel_idx": res["channel_idx"],
"txt_type": res["txt_type"],
}
await self.dispatcher.dispatch(
Event(EventType.CHANNEL_MSG_RECV, res, attributes)
)
elif packet_type_value == PacketType.CURRENT_TIME.value:
time_value = int.from_bytes(dbuf.read(4), byteorder="little")
result = {"time": time_value}
await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result))
elif packet_type_value == PacketType.NO_MORE_MSGS.value:
result = {"messages_available": False}
await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result))
elif packet_type_value == PacketType.CONTACT_URI.value:
contact_uri = "meshcore://" + dbuf.read().hex()
result = {"uri": contact_uri}
await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result))
elif packet_type_value == PacketType.BATTERY.value:
battery_level = int.from_bytes(dbuf.read(2), byteorder="little")
result = {"level": battery_level}
if len(data) > 3: # has storage info as well
result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
await self.dispatcher.dispatch(Event(EventType.BATTERY, result))
elif packet_type_value == PacketType.DEVICE_INFO.value:
res = {}
fw_ver = dbuf.read(1)[0]
res["fw ver"] = fw_ver
if fw_ver >= 3:
res["max_contacts"] = dbuf.read(1)[0] * 2
res["max_channels"] = dbuf.read(1)[0]
res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "")
res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "")
res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "")
if fw_ver >= 9: # has repeater mode
rpt = dbuf.read(1)
if len(rpt) > 0:
res["repeat"] = (rpt[0] != 0)
if fw_ver >= 10: # has path_hash_mode
path_hash_mode = dbuf.read(1)[0]
res["path_hash_mode"] = path_hash_mode
await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res))
elif packet_type_value == PacketType.CUSTOM_VARS.value:
logger.debug(f"received custom vars response: {data.hex()}")
res = {}
rawdata = dbuf.read().decode("utf-8", "ignore")
if not rawdata == "":
pairs = rawdata.split(",")
for p in pairs:
psplit = p.split(":")
res[psplit[0]] = psplit[1]
logger.debug(f"got custom vars : {res}")
await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res))
elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24)
logger.debug(f"received stats response: {data.hex()}")
# RESP_CODE_STATS: All stats responses use code 24 with sub-type byte
# Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets)
if len(data) < 2:
logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
return
stats_type = data[1]
if stats_type == 0: # STATS_TYPE_CORE
# RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total
# Format: <B B H I H B (response_code, stats_type, battery_mv, uptime_secs, errors, queue_len)
if len(data) < 11:
logger.error(f"Stats core response too short: {len(data)} bytes, expected 11")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
else:
try:
battery_mv, uptime_secs, errors, queue_len = struct.unpack('<H I H B', data[2:11])
res = {
'battery_mv': battery_mv,
'uptime_secs': uptime_secs,
'errors': errors,
'queue_len': queue_len
}
logger.debug(f"parsed stats core: {res}")
await self.dispatcher.dispatch(Event(EventType.STATS_CORE, res))
except struct.error as e:
logger.error(f"Error parsing stats core binary frame: {e}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"binary_parse_error: {e}"}))
elif stats_type == 1: # STATS_TYPE_RADIO
# RESP_CODE_STATS + STATS_TYPE_RADIO: 14 bytes total
# Format: <B B h b b I I (response_code, stats_type, noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
if len(data) < 14:
logger.error(f"Stats radio response too short: {len(data)} bytes, expected 14")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
else:
try:
noise_floor, last_rssi, last_snr_scaled, tx_air_secs, rx_air_secs = struct.unpack('<h b b I I', data[2:14])
res = {
'noise_floor': noise_floor,
'last_rssi': last_rssi,
'last_snr': last_snr_scaled / 4.0, # Unscale SNR (was multiplied by 4)
'tx_air_secs': tx_air_secs,
'rx_air_secs': rx_air_secs
}
logger.debug(f"parsed stats radio: {res}")
await self.dispatcher.dispatch(Event(EventType.STATS_RADIO, res))
except struct.error as e:
logger.error(f"Error parsing stats radio binary frame: {e}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"binary_parse_error: {e}"}))
elif stats_type == 2: # STATS_TYPE_PACKETS
# RESP_CODE_STATS + STATS_TYPE_PACKETS: 26 bytes (legacy) or 30 bytes (includes recv_errors)
# Format: <B B I I I I I I [I] (response_code, stats_type, recv, sent, flood_tx, direct_tx, flood_rx, direct_rx [, recv_errors])
logger.debug(f"stats packets payload len={len(data)} (expected 26 or 30)")
if len(data) < 26:
logger.error(f"Stats packets response too short: {len(data)} bytes, expected 26 or 30")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
else:
try:
recv, sent, flood_tx, direct_tx, flood_rx, direct_rx = struct.unpack('<I I I I I I', data[2:26])
res = {
'recv': recv,
'sent': sent,
'flood_tx': flood_tx,
'direct_tx': direct_tx,
'flood_rx': flood_rx,
'direct_rx': direct_rx
}
if len(data) >= 30:
(recv_errors,) = struct.unpack('<I', data[26:30])
res['recv_errors'] = recv_errors
else:
res['recv_errors'] = None # legacy 26-byte frame
logger.debug(f"parsed stats packets: {res}")
await self.dispatcher.dispatch(Event(EventType.STATS_PACKETS, res))
except struct.error as e:
logger.error(f"Error parsing stats packets binary frame: {e}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"binary_parse_error: {e}"}))
else:
logger.error(f"Unknown stats type: {stats_type}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"unknown_stats_type: {stats_type}"}))
elif packet_type_value == PacketType.AUTOADD_CONFIG.value:
logger.debug(f"received autoadd config response: {data.hex()}")
res = {}
res["config"] = dbuf.read(1)[0]
await self.dispatcher.dispatch(Event(EventType.AUTOADD_CONFIG, res, res))
elif packet_type_value == PacketType.CHANNEL_INFO.value:
logger.debug(f"received channel info response: {data.hex()}")
res = {}
idx = dbuf.read(1)[0]
res["channel_idx"] = idx
if len(self.channels) <= idx:
self.channels.extend([{} for _ in range(1 + idx - len(self.channels))])
# Channel name is null-terminated, so find the first null byte
name_bytes = dbuf.read(32)
null_pos = name_bytes.find(0)
if null_pos >= 0:
res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore")
else:
res["channel_name"] = name_bytes.decode("utf-8", "ignore")
res["channel_secret"] = dbuf.read(16)
res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2]
self.channels[idx] = res
await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res))
# Push notifications
elif packet_type_value == PacketType.ADVERTISEMENT.value:
logger.debug("Advertisement received")
res = {}
res["public_key"] = dbuf.read(32).hex()
await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res))
elif packet_type_value == PacketType.PATH_UPDATE.value:
logger.debug("Code path update")
res = {}
res["public_key"] = dbuf.read(32).hex()
await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res))
elif packet_type_value == PacketType.ACK.value:
logger.debug("Received ACK")
ack_data = {}
if len(data) >= 5:
ack_data["code"] = dbuf.read(4).hex()
attributes = {"code": ack_data.get("code", "")}
await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes))
elif packet_type_value == PacketType.MESSAGES_WAITING.value:
logger.debug("Msgs are waiting")
await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {}))
elif packet_type_value == PacketType.RAW_DATA.value:
res = {}
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
res["payload"] = dbuf.read(4).hex()
logger.debug("Received raw data")
print(res)
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
elif packet_type_value == PacketType.LOGIN_SUCCESS.value:
res = {}
attributes = {}
if len(data) > 1:
perms = dbuf.read(1)[0]
res["permissions"] = perms
res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set
res["pubkey_prefix"] = dbuf.read(6).hex()
attributes = {"pubkey_prefix": res.get("pubkey_prefix")}
await self.dispatcher.dispatch(
Event(EventType.LOGIN_SUCCESS, res, attributes)
)
elif packet_type_value == PacketType.LOGIN_FAILED.value:
res = {}
attributes = {}
pbuf.read(1)
if len(data) > 7:
res["pubkey_prefix"] = pbuf.read(6).hex()
attributes = {"pubkey_prefix": res.get("pubkey_prefix")}
await self.dispatcher.dispatch(
Event(EventType.LOGIN_FAILED, res, attributes)
)
elif packet_type_value == PacketType.STATUS_RESPONSE.value:
res = parse_status(data, offset=8)
data_hex = data[8:].hex()
logger.debug(f"Status response: {data_hex}")
attributes = {
"pubkey_prefix": res["pubkey_pre"],
}
await self.dispatcher.dispatch(
Event(EventType.STATUS_RESPONSE, res, attributes)
)
elif packet_type_value == PacketType.LOG_DATA.value:
logger.debug(f"Received RF log data: {data.hex()}")
# Parse as raw RX data
log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()}
attributes = {}
recv_time = int(time.time())
log_data["recv_time"] = recv_time
attributes["recv_time"] = recv_time
# First byte is SNR (signed byte, multiplied by 4)
if len(data) > 1:
snr_byte = dbuf.read(1)[0]
# Convert to signed value
snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0
log_data["snr"] = snr
# Second byte is RSSI (signed byte)
if len(data) > 2:
rssi_byte = dbuf.read(1)[0]
# Convert to signed value
rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256
log_data["rssi"] = rssi
# Remaining bytes are the raw data payload
payload = None
if len(data) > 3:
payload=dbuf.read()
log_data["payload"] = payload.hex()
log_data["payload_length"] = len(payload)
# Parse payload and get some info from it
if not payload is None:
pbuf = io.BytesIO(payload)
header = pbuf.read(1)[0]
route_type = header & 0x03
payload_type = (header & 0x3c) >> 2
payload_ver = (header & 0xc0) >> 6
transport_code = None
if route_type == 0x00 or route_type == 0x03: # has transport code
transport_code = pbuf.read(4) # discard transport code
path_byte = pbuf.read(1)[0]
path_hash_size = ((path_byte & 0xC0) >> 6) + 1
path_len = (path_byte & 0x3F)
# here path_len is number of hops, not number of bytes
path = pbuf.read(path_len*path_hash_size).hex() # Beware of traces where pathes are mixed
try :
route_typename = ROUTE_TYPENAMES[route_type]
except IndexError:
logger.debug(f"Unknown route type {route_type}")
route_typename = "UNK"
try :
payload_typename = PAYLOAD_TYPENAMES[payload_type]
except IndexError:
logger.debug(f"Unknown payload type {payload_type}")
payload_typename = "UNK"
pkt_payload = pbuf.read()
pkt_hash = int.from_bytes(SHA256.new(pkt_payload).digest()[0:4], "little", signed=False)
log_data["header"] = header
log_data["route_type"] = route_type
attributes["route_type"] = route_type
log_data["route_typename"] = route_typename
log_data["payload_type"] = payload_type
attributes["payload_type"] = payload_type
log_data["payload_typename"]= payload_typename
log_data["payload_ver"] = payload_ver
if not transport_code is None:
log_data["transport_code"] = transport_code.hex()
log_data["path_len"] = path_len
attributes["path_len"] = path_len
log_data["path_hash_size"] = path_hash_size
log_data["path"] = path
attributes["path"] = path
log_data["pkt_payload"] = pkt_payload
log_data["pkt_hash"] = pkt_hash
if not payload is None and payload_type == 0x05: # flood msg / channel
pk_buf = io.BytesIO(pkt_payload)
chan_hash = pk_buf.read(1).hex()
cipher_mac = pk_buf.read(2)
msg = pk_buf.read() # until the end of buffer
channel = None
for c in self.channels:
if "channel_hash" in c and c["channel_hash"] == chan_hash : # validate against MAC
h = HMAC.new(c["channel_secret"], digestmod=SHA256)
h.update(msg)
if h.digest()[0:2] == cipher_mac:
channel = c
break
chan_name = ""
if channel is None :
chan_name = chan_hash
else:
chan_name = channel["channel_name"]
log_data["chan_hash"] = chan_hash
log_data["cipher_mac"] = cipher_mac.hex()
log_data["crypted"] = msg.hex()
log_data["chan_name"] = chan_name
if not channel is None and self.decrypt_channels:
# search for the same packet
logged = next((l for l in reversed(self.channels_log) if 'pkt_hash' in l and l['pkt_hash'] == pkt_hash), None)
if logged is None:
# not found: decrypt the text and hash it
aes_key = channel["channel_secret"]
cipher = AES.new(aes_key, AES.MODE_ECB)
uncrypted = cipher.decrypt(msg)
timestamp = int.from_bytes(uncrypted[0:4], "little", signed=False)
attempt = uncrypted[4] & 3
txt_type = int.from_bytes(uncrypted[4:4], "little", signed=False) >> 2
message = uncrypted[5:].strip(b"\0")
msg_hash = int.from_bytes(SHA256.new(timestamp.to_bytes(4, "little", signed=False) + message).digest()[0:4], "little", signed=False)
log_data["message"] = message.decode("utf-8", "ignore")
log_data["msg_hash"] = msg_hash
log_data["sender_timestamp"] = timestamp
log_data["attempt"] = attempt
log_data["txt_type"] = txt_type
else:
# found: copy
log_data["message"] = logged["message"]
log_data["msg_hash"] = logged["msg_hash"]
log_data["sender_timestamp"] = logged["sender_timestamp"]
log_data["attempt"] = logged["attempt"]
log_data["txt_type"] = logged["txt_type"]
self.channels_log.append(log_data)
if len(self.channels_log) > 100:
del self.channels_log[:25]
elif not payload is None and payload_type == 0x04: # Advert
pk_buf = io.BytesIO(pkt_payload)
adv_key = pk_buf.read(32).hex()
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
signature = pk_buf.read(64).hex()
flags = pk_buf.read(1)[0]
adv_type = flags & 0x0F
adv_lat = None
adv_lon = None
adv_feat1 = None
adv_feat2 = None
if flags & 0x10 > 0: #has location
adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
if flags & 0x20 > 0: #has feature1
adv_feat1 = pk_buf.read(2).hex()
if flags & 0x40 > 0: #has feature2
adv_feat2 = pk_buf.read(2).hex()
if flags & 0x80 > 0: #has name
adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00")
log_data["adv_name"] = adv_name
log_data["adv_key"] = adv_key
log_data["adv_timestamp"] = adv_timestamp
log_data["signature"] = signature
log_data["adv_flags"] = flags
log_data["adv_type"] = adv_type
if not adv_lat is None :
log_data["adv_lat"] = adv_lat
if not adv_lon is None :
log_data["adv_lon"] = adv_lon
if not adv_feat1 is None:
log_data["adv_feat1"] = adv_feat1
if not adv_feat2 is None:
log_data["adv_feat2"] = adv_feat2
# Dispatch as RF log data
await self.dispatcher.dispatch(
Event(EventType.RX_LOG_DATA, log_data, attributes)
)
elif packet_type_value == PacketType.TRACE_DATA.value:
logger.debug(f"Received trace data: {data.hex()}")
res = {}
# According to the source, format is:
# 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr
reserved = dbuf.read(1)[0]
path_len = dbuf.read(1)[0]
flags = dbuf.read(1)[0]
tag = int.from_bytes(dbuf.read(4), byteorder="little")
auth_code = int.from_bytes(dbuf.read(4), byteorder="little")
path_hash_len = 1 << (flags&3)
path_len = int(path_len / path_hash_len)
# Initialize result
res["tag"] = tag
res["auth"] = auth_code
res["flags"] = flags
res["path_len"] = path_len
# Process path as array of objects with hash and SNR
path_nodes = []
if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1:
# Extract path with hash and SNR pairs
for i in range(path_len):
node = {
"hash": dbuf.read(path_hash_len).hex(),
}
path_nodes.append(node)
for n in path_nodes:
node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
n["snr"] = node_snr / 4.0
# Add the final node (our device) with its SNR
final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0
path_nodes.append({"snr": final_snr})
res["path"] = path_nodes
logger.debug(f"Parsed trace data: {res}")
attributes = {
"tag": res["tag"],
"auth_code": res["auth"],
}
await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes))
elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value:
logger.debug(f"Received telemetry data: {data.hex()}")
res = {}
dbuf.read(1)
res["pubkey_pre"] = dbuf.read(6).hex()
buf = dbuf.read()
"""Parse a given byte string and return as a LppFrame object."""
i = 0
lpp_data_list = []
while i < len(buf) and buf[i] != 0:
lppdata = LppData.from_bytes(buf[i:])
lpp_data_list.append(lppdata)
i = i + len(lppdata)
lpp = json.loads(
json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder)
)
res["lpp"] = lpp
attributes = {
"raw": buf.hex(),
"pubkey_prefix": res["pubkey_pre"],
}
await self.dispatcher.dispatch(
Event(EventType.TELEMETRY_RESPONSE, res, attributes)
)
elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value:
res = {}
freqs = []
cont = True
try:
while cont:
min = int.from_bytes(dbuf.read(4), "little", signed=False)
max = int.from_bytes(dbuf.read(4), "little", signed=False)
if min == 0 or max == 0:
cont = False
else:
freqs.append({"min" : min, "max": max})
except e:
print(e)
res["freqs"] = freqs
await self.dispatcher.dispatch(
Event(EventType.ALLOWED_REPEAT_FREQ, res)
)
elif packet_type_value == PacketType.BINARY_RESPONSE.value:
dbuf.read(1)
tag = dbuf.read(4).hex()
response_data = dbuf.read()
logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}")
# Always dispatch generic BINARY_RESPONSE
binary_res = {"tag": tag, "data": response_data.hex()}
await self.dispatcher.dispatch(
Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag})
)
# Check for tracked request type and dispatch specific response
if tag in self.pending_binary_requests:
request_type = self.pending_binary_requests[tag]["request_type"]
is_anon = self.pending_binary_requests[tag]["is_anon"]
pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"]
context = self.pending_binary_requests[tag]["context"]
del self.pending_binary_requests[tag]
logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}")
if not is_anon:
if request_type == BinaryReqType.STATUS and len(response_data) >= 52:
res = {}
res = parse_status(response_data, pubkey_prefix=pubkey_prefix)
await self.dispatcher.dispatch(
Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag})
)
elif request_type == BinaryReqType.TELEMETRY :
try:
lpp = lpp_parse(response_data)
telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix}
await self.dispatcher.dispatch(
Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res)
)
except Exception as e:
logger.error(f"Error parsing binary telemetry response: {e}")
elif request_type == BinaryReqType.MMA:
try:
mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header
mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix}
await self.dispatcher.dispatch(
Event(EventType.MMA_RESPONSE, mma_res, mma_res)
)
except Exception as e:
logger.error(f"Error parsing binary MMA response: {e}")
elif request_type == BinaryReqType.ACL:
try:
acl_result = parse_acl(response_data)
acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix}
await self.dispatcher.dispatch(
Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix})
)
except Exception as e:
logger.error(f"Error parsing binary ACL response: {e}")
elif request_type == BinaryReqType.NEIGHBOURS:
try:
pk_plen = context["pubkey_prefix_length"]
bbuf = io.BytesIO(response_data)
res = {
"pubkey_prefix": pubkey_prefix,
"tag": tag
}
res.update(context) # add context in result
res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True)
results_count = int.from_bytes(bbuf.read(2), "little", signed=True)
res["results_count"] = results_count
neighbours_list = []
for _ in range (results_count):
neighb = {}
neighb["pubkey"] = bbuf.read(pk_plen).hex()
neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True)
neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4
neighbours_list.append(neighb)
res["neighbours"] = neighbours_list
await self.dispatcher.dispatch(
Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix})
)
except Exception as e:
logger.error(f"Error parsing binary NEIGHBOURS response: {e}")
else:
logger.debug(f"No tracked request found for binary response tag {tag}")
elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value:
logger.debug(f"Received path discovery response: {data.hex()}")
res = {}
dbuf.read(1)
res["pubkey_pre"] = dbuf.read(6).hex()
opl = dbuf.read(1)[0]
opl_hlen = ((opl & 0xc0) >> 6) + 1
opl = opl & 0xbf
res["out_path_len"] = opl
res["out_path_hash_len"] = opl_hlen
res["out_path"] = dbuf.read(opl*opl_hlen).hex()
ipl = dbuf.read(1)[0]
ipl_hlen = ((ipl & 0xc0) >> 6) + 1
ipl = ipl & 0xbf
res["in_path_len"] = ipl
res["in_path_hash_len"] = ipl_hlen
res["in_path"] = dbuf.read(ipl*ipl_hlen).hex()
attributes = {"pubkey_pre": res["pubkey_pre"]}
await self.dispatcher.dispatch(
Event(EventType.PATH_RESPONSE, res, attributes)
)
elif packet_type_value == PacketType.PRIVATE_KEY.value:
logger.debug(f"Received private key response: {data.hex()}")
if len(data) >= 65: # 1 byte response code + 64 bytes private key
private_key = dbuf.read(64) # Extract 64-byte private key
res = {"private_key": private_key}
await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res))
else:
logger.error(f"Invalid private key response length: {len(data)}")
elif packet_type_value == PacketType.SIGN_START.value:
logger.debug(f"Received sign start response: {data.hex()}")
# Payload: 1 reserved byte, 4-byte max length
dbuf.read(1)
max_len = int.from_bytes(dbuf.read(4), "little")
res = {"max_length": max_len}
await self.dispatcher.dispatch(Event(EventType.SIGN_START, res))
elif packet_type_value == PacketType.SIGNATURE.value:
logger.debug(f"Received signature: {data.hex()}")
signature = dbuf.read()
res = {"signature": signature}
await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res))
elif packet_type_value == PacketType.DISABLED.value:
logger.debug("Received disabled response")
res = {"reason": "private_key_export_disabled"}
await self.dispatcher.dispatch(Event(EventType.DISABLED, res))
elif packet_type_value == PacketType.CONTROL_DATA.value:
logger.debug("Received control data packet")
res={}
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
res["path_len"] = dbuf.read(1)[0]
payload = dbuf.read()
payload_type = payload[0]
res["payload_type"] = payload_type
res["payload"] = payload
attributes = {"payload_type": payload_type}
await self.dispatcher.dispatch(
Event(EventType.CONTROL_DATA, res, attributes)
)
# decode NODE_DISCOVER_RESP
if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value:
pbuf = io.BytesIO(payload[1:])
ndr = dict(res)
del ndr["payload_type"]
del ndr["payload"]
ndr["node_type"] = payload_type & 0x0F
ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4
ndr["tag"] = pbuf.read(4).hex()
pubkey = pbuf.read()
if len(pubkey) < 32:
pubkey = pubkey[0:8]
else:
pubkey = pubkey[0:32]
ndr["pubkey"] = pubkey.hex()
attributes = {
"node_type" : ndr["node_type"],
"tag" : ndr["tag"],
"pubkey" : ndr["pubkey"],
}
await self.dispatcher.dispatch(
Event(EventType.DISCOVER_RESPONSE, ndr, attributes)
)
else:
logger.debug(f"Unhandled data received {data}")
logger.debug(f"Unhandled packet type: {packet_type_value}")