make a class and module for parsing meshcore packets

This commit is contained in:
Florent
2026-03-09 18:22:02 -04:00
parent f3fce820fc
commit 18528f2ed3

View File

@@ -5,19 +5,15 @@ import time
import io import io
from typing import Any, Dict from typing import Any, Dict
from .events import Event, EventType, EventDispatcher, ErrorMessages from .events import Event, EventType, EventDispatcher, ErrorMessages
from .meshcore_parser import MeshcorePacketParser
from .packets import BinaryReqType, PacketType, ControlType from .packets import BinaryReqType, PacketType, ControlType
from .parsing import lpp_parse, lpp_parse_mma, parse_acl, parse_status from .parsing import lpp_parse, lpp_parse_mma, parse_acl, parse_status
from cayennelpp import LppFrame, LppData from cayennelpp import LppFrame, LppData
from meshcore.lpp_json_encoder import lpp_json_encoder from meshcore.lpp_json_encoder import lpp_json_encoder
from Crypto.Cipher import AES from Crypto.Hash import SHA256
from Crypto.Hash import HMAC, SHA256
logger = logging.getLogger("meshcore") logger = logging.getLogger("meshcore")
PAYLOAD_TYPENAMES = ["REQ", "RESPONSE", "TEXT_MSG", "ACK", "ADVERT", "GRP_TXT", "GRP_DATA", "ANON_REQ", "PATH", "TRACE", "MULTIPART", "CONTROL"]
ROUTE_TYPENAMES = ["TC_FLOOD", "FLOOD", "DIRECT", "TC_DIRECT"]
CONTACT_TYPENAMES = ["NONE","CLI","REP","ROOM","SENS"]
class MessageReader: class MessageReader:
def __init__(self, dispatcher: EventDispatcher): def __init__(self, dispatcher: EventDispatcher):
self.dispatcher = dispatcher self.dispatcher = dispatcher
@@ -29,9 +25,16 @@ class MessageReader:
# Track pending binary requests by tag for proper response parsing # Track pending binary requests by tag for proper response parsing
self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at} self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at}
self.channels = [{} for _ in range(40)] # keep our own copy of channels, 40 elements by default self.packet_parser = MeshcorePacketParser()
self.decrypt_channels = True self.decrypt_channels = True
self.channels_log = [] # stores the channel msg events
@property
def decrypt_channels(self):
return self.packet_parser.decrypt_channels
@decrypt_channels.setter
def decrypt_channels(self, value):
self.packet_parser.decrypt_channels = value
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}, is_anon=False): def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}, is_anon=False):
"""Register a pending binary request for proper response parsing""" """Register a pending binary request for proper response parsing"""
@@ -266,7 +269,7 @@ class MessageReader:
# search for text in log_channels # search for text in log_channels
txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False)
if self.decrypt_channels: if self.decrypt_channels:
logged = next((l for l in reversed(self.channels_log) if 'msg_hash' in l and l['msg_hash'] == txt_hash), None) logged = await self.packet_parser.findLogChannelMsg(txt_hash)
if not logged is None: if not logged is None:
res["path"] = logged["path"] res["path"] = logged["path"]
res["RSSI"] = logged["rssi"] res["RSSI"] = logged["rssi"]
@@ -302,11 +305,11 @@ class MessageReader:
res["text"] = text.decode("utf-8", "ignore") res["text"] = text.decode("utf-8", "ignore")
# search for text in log_channels # search for text in log_channels
if self.decrypt_channels:
txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False)
res["txt_hash"] = txt_hash res["txt_hash"] = txt_hash
logged = next((l for l in reversed(self.channels_log) if 'msg_hash' in l and l['msg_hash'] == txt_hash), None) logged = await self.packet_parser.findLogChannelMsg(txt_hash)
if self.decrypt_channels:
if not logged is None: if not logged is None:
res["path"] = logged["path"] res["path"] = logged["path"]
res["RSSI"] = logged["rssi"] res["RSSI"] = logged["rssi"]
@@ -476,9 +479,6 @@ class MessageReader:
idx = dbuf.read(1)[0] idx = dbuf.read(1)[0]
res["channel_idx"] = idx res["channel_idx"] = idx
if len(self.channels) <= idx:
self.channels.extend([{} for _ in range(1 + idx - len(self.channels))])
# Channel name is null-terminated, so find the first null byte # Channel name is null-terminated, so find the first null byte
name_bytes = dbuf.read(32) name_bytes = dbuf.read(32)
null_pos = name_bytes.find(0) null_pos = name_bytes.find(0)
@@ -490,7 +490,7 @@ class MessageReader:
res["channel_secret"] = dbuf.read(16) res["channel_secret"] = dbuf.read(16)
res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2]
self.channels[idx] = res await self.packet_parser.newChannel(res)
await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res))
@@ -608,156 +608,11 @@ class MessageReader:
log_data["payload_length"] = len(payload) log_data["payload_length"] = len(payload)
# Parse payload and get some info from it # Parse payload and get some info from it
if not payload is None: log_data = await self.packet_parser.parsePacketPayload(payload, log_data)
pbuf = io.BytesIO(payload) attributes['route_type'] = log_data['route_type']
attributes['payload_type'] = log_data['payload_type']
header = pbuf.read(1)[0] attributes['path_len'] = log_data['path_len']
route_type = header & 0x03 attributes['path'] = log_data['path']
payload_type = (header & 0x3c) >> 2
payload_ver = (header & 0xc0) >> 6
transport_code = None
if route_type == 0x00 or route_type == 0x03: # has transport code
transport_code = pbuf.read(4) # discard transport code
path_byte = pbuf.read(1)[0]
path_hash_size = ((path_byte & 0xC0) >> 6) + 1
path_len = (path_byte & 0x3F)
# here path_len is number of hops, not number of bytes
path = pbuf.read(path_len*path_hash_size).hex() # Beware of traces where pathes are mixed
try :
route_typename = ROUTE_TYPENAMES[route_type]
except IndexError:
logger.debug(f"Unknown route type {route_type}")
route_typename = "UNK"
try :
payload_typename = PAYLOAD_TYPENAMES[payload_type]
except IndexError:
logger.debug(f"Unknown payload type {payload_type}")
payload_typename = "UNK"
pkt_payload = pbuf.read()
pkt_hash = int.from_bytes(SHA256.new(pkt_payload).digest()[0:4], "little", signed=False)
log_data["header"] = header
log_data["route_type"] = route_type
attributes["route_type"] = route_type
log_data["route_typename"] = route_typename
log_data["payload_type"] = payload_type
attributes["payload_type"] = payload_type
log_data["payload_typename"]= payload_typename
log_data["payload_ver"] = payload_ver
if not transport_code is None:
log_data["transport_code"] = transport_code.hex()
log_data["path_len"] = path_len
attributes["path_len"] = path_len
log_data["path_hash_size"] = path_hash_size
log_data["path"] = path
attributes["path"] = path
log_data["pkt_payload"] = pkt_payload
log_data["pkt_hash"] = pkt_hash
if not payload is None and payload_type == 0x05: # flood msg / channel
pk_buf = io.BytesIO(pkt_payload)
chan_hash = pk_buf.read(1).hex()
cipher_mac = pk_buf.read(2)
msg = pk_buf.read() # until the end of buffer
channel = None
for c in self.channels:
if "channel_hash" in c and c["channel_hash"] == chan_hash : # validate against MAC
h = HMAC.new(c["channel_secret"], digestmod=SHA256)
h.update(msg)
if h.digest()[0:2] == cipher_mac:
channel = c
break
chan_name = ""
if channel is None :
chan_name = chan_hash
else:
chan_name = channel["channel_name"]
log_data["chan_hash"] = chan_hash
log_data["cipher_mac"] = cipher_mac.hex()
log_data["crypted"] = msg.hex()
log_data["chan_name"] = chan_name
if not channel is None and self.decrypt_channels:
# search for the same packet
logged = next((l for l in reversed(self.channels_log) if 'pkt_hash' in l and l['pkt_hash'] == pkt_hash), None)
if logged is None:
# not found: decrypt the text and hash it
aes_key = channel["channel_secret"]
cipher = AES.new(aes_key, AES.MODE_ECB)
uncrypted = cipher.decrypt(msg)
timestamp = int.from_bytes(uncrypted[0:4], "little", signed=False)
attempt = uncrypted[4] & 3
txt_type = int.from_bytes(uncrypted[4:4], "little", signed=False) >> 2
message = uncrypted[5:].strip(b"\0")
msg_hash = int.from_bytes(SHA256.new(timestamp.to_bytes(4, "little", signed=False) + message).digest()[0:4], "little", signed=False)
log_data["message"] = message.decode("utf-8", "ignore")
log_data["msg_hash"] = msg_hash
log_data["sender_timestamp"] = timestamp
log_data["attempt"] = attempt
log_data["txt_type"] = txt_type
else:
# found: copy
log_data["message"] = logged["message"]
log_data["msg_hash"] = logged["msg_hash"]
log_data["sender_timestamp"] = logged["sender_timestamp"]
log_data["attempt"] = logged["attempt"]
log_data["txt_type"] = logged["txt_type"]
self.channels_log.append(log_data)
if len(self.channels_log) > 100:
del self.channels_log[:25]
elif not payload is None and payload_type == 0x04: # Advert
pk_buf = io.BytesIO(pkt_payload)
adv_key = pk_buf.read(32).hex()
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
signature = pk_buf.read(64).hex()
flags = pk_buf.read(1)[0]
adv_type = flags & 0x0F
adv_lat = None
adv_lon = None
adv_feat1 = None
adv_feat2 = None
if flags & 0x10 > 0: #has location
adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
if flags & 0x20 > 0: #has feature1
adv_feat1 = pk_buf.read(2).hex()
if flags & 0x40 > 0: #has feature2
adv_feat2 = pk_buf.read(2).hex()
if flags & 0x80 > 0: #has name
adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00")
log_data["adv_name"] = adv_name
log_data["adv_key"] = adv_key
log_data["adv_timestamp"] = adv_timestamp
log_data["signature"] = signature
log_data["adv_flags"] = flags
log_data["adv_type"] = adv_type
if not adv_lat is None :
log_data["adv_lat"] = adv_lat
if not adv_lon is None :
log_data["adv_lon"] = adv_lon
if not adv_feat1 is None:
log_data["adv_feat1"] = adv_feat1
if not adv_feat2 is None:
log_data["adv_feat2"] = adv_feat2
# Dispatch as RF log data # Dispatch as RF log data
await self.dispatcher.dispatch( await self.dispatcher.dispatch(