mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-06-11 11:56:18 +00:00
Merge pull request #74 from mwolter805/fix/reader-parser-crash-safety
fix: add umbrella crash protection and length guards to reader/parser dispatch
This commit is contained in:
@@ -42,6 +42,28 @@ class MeshcorePacketParser:
|
|||||||
Returns :
|
Returns :
|
||||||
completed log_data
|
completed log_data
|
||||||
"""
|
"""
|
||||||
|
# Minimum viable payload is 2 bytes (1 header + 1 path_byte) for a
|
||||||
|
# direct route. Anything shorter is provably broken — for example,
|
||||||
|
# the LOG_DATA branch in reader.py only requires `len(data) > 3`,
|
||||||
|
# which means a 4-byte LOG_DATA frame produces a 1-byte payload
|
||||||
|
# here, and `path_byte = pbuf.read(1)[0]` further down would raise
|
||||||
|
# IndexError on the empty buffer. Populate sentinel values so the
|
||||||
|
# caller's downstream `log_data['route_type']` etc. lookups don't
|
||||||
|
# KeyError, then return early.
|
||||||
|
if len(payload) < 2:
|
||||||
|
logger.debug(f"parsePacketPayload: payload too short ({len(payload)} bytes < 2), returning sentinel log_data")
|
||||||
|
log_data["route_type"] = -1
|
||||||
|
log_data["route_typename"] = "UNK"
|
||||||
|
log_data["payload_type"] = -1
|
||||||
|
log_data["payload_typename"] = "UNK"
|
||||||
|
log_data["payload_ver"] = 0
|
||||||
|
log_data["path_len"] = 0
|
||||||
|
log_data["path_hash_size"] = 1
|
||||||
|
log_data["path"] = ""
|
||||||
|
log_data["pkt_payload"] = b""
|
||||||
|
log_data["pkt_hash"] = 0
|
||||||
|
return log_data
|
||||||
|
|
||||||
pbuf = io.BytesIO(payload)
|
pbuf = io.BytesIO(payload)
|
||||||
|
|
||||||
header = pbuf.read(1)[0]
|
header = pbuf.read(1)[0]
|
||||||
@@ -128,7 +150,7 @@ class MeshcorePacketParser:
|
|||||||
uncrypted = cipher.decrypt(msg)
|
uncrypted = cipher.decrypt(msg)
|
||||||
timestamp = int.from_bytes(uncrypted[0:4], "little", signed=False)
|
timestamp = int.from_bytes(uncrypted[0:4], "little", signed=False)
|
||||||
attempt = uncrypted[4] & 3
|
attempt = uncrypted[4] & 3
|
||||||
txt_type = int.from_bytes(uncrypted[4:4], "little", signed=False) >> 2
|
txt_type = int.from_bytes(uncrypted[4:5], "little", signed=False) >> 2
|
||||||
message = uncrypted[5:].strip(b"\0")
|
message = uncrypted[5:].strip(b"\0")
|
||||||
msg_hash = int.from_bytes(SHA256.new(timestamp.to_bytes(4, "little", signed=False) + message).digest()[0:4], "little", signed=False)
|
msg_hash = int.from_bytes(SHA256.new(timestamp.to_bytes(4, "little", signed=False) + message).digest()[0:4], "little", signed=False)
|
||||||
log_data["message"] = message.decode("utf-8", "ignore")
|
log_data["message"] = message.decode("utf-8", "ignore")
|
||||||
@@ -149,6 +171,7 @@ class MeshcorePacketParser:
|
|||||||
del self.channels_log[:25]
|
del self.channels_log[:25]
|
||||||
|
|
||||||
elif not payload is None and payload_type == 0x04: # Advert
|
elif not payload is None and payload_type == 0x04: # Advert
|
||||||
|
try:
|
||||||
pk_buf = io.BytesIO(pkt_payload)
|
pk_buf = io.BytesIO(pkt_payload)
|
||||||
adv_key = pk_buf.read(32).hex()
|
adv_key = pk_buf.read(32).hex()
|
||||||
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
|
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
|
||||||
@@ -183,5 +206,7 @@ class MeshcorePacketParser:
|
|||||||
log_data["adv_feat1"] = adv_feat1
|
log_data["adv_feat1"] = adv_feat1
|
||||||
if not adv_feat2 is None:
|
if not adv_feat2 is None:
|
||||||
log_data["adv_feat2"] = adv_feat2
|
log_data["adv_feat2"] = adv_feat2
|
||||||
|
except (IndexError, ValueError) as e:
|
||||||
|
logger.debug(f"parsePacketPayload: malformed ADVERT payload ({type(e).__name__}: {e}), len={len(pkt_payload)}")
|
||||||
|
|
||||||
return log_data
|
return log_data
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import json
|
|||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
import io
|
import io
|
||||||
|
import traceback
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
from .events import Event, EventType, EventDispatcher, ErrorMessages
|
from .events import Event, EventType, EventDispatcher, ErrorMessages
|
||||||
from .meshcore_parser import MeshcorePacketParser
|
from .meshcore_parser import MeshcorePacketParser
|
||||||
@@ -69,6 +70,7 @@ class MessageReader:
|
|||||||
except IndexError as e:
|
except IndexError as e:
|
||||||
logger.warning(f"Received empty packet: {e}")
|
logger.warning(f"Received empty packet: {e}")
|
||||||
return
|
return
|
||||||
|
try:
|
||||||
logger.debug(f"Received data: {data.hex()}")
|
logger.debug(f"Received data: {data.hex()}")
|
||||||
|
|
||||||
# Handle command responses
|
# Handle command responses
|
||||||
@@ -220,7 +222,7 @@ class MessageReader:
|
|||||||
|
|
||||||
await self.dispatcher.dispatch(Event(evt_type, res, attributes))
|
await self.dispatcher.dispatch(Event(evt_type, res, attributes))
|
||||||
|
|
||||||
elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
|
elif packet_type_value == PacketType.CONTACT_MSG_RECV_V3.value: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
|
||||||
res = {}
|
res = {}
|
||||||
res["type"] = "PRIV"
|
res["type"] = "PRIV"
|
||||||
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
|
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
|
||||||
@@ -285,7 +287,7 @@ class MessageReader:
|
|||||||
Event(EventType.CHANNEL_MSG_RECV, res, attributes)
|
Event(EventType.CHANNEL_MSG_RECV, res, attributes)
|
||||||
)
|
)
|
||||||
|
|
||||||
elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
|
elif packet_type_value == PacketType.CHANNEL_MSG_RECV_V3.value: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
|
||||||
res = {}
|
res = {}
|
||||||
res["type"] = "CHAN"
|
res["type"] = "CHAN"
|
||||||
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
|
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
|
||||||
@@ -339,9 +341,22 @@ class MessageReader:
|
|||||||
await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result))
|
await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result))
|
||||||
|
|
||||||
elif packet_type_value == PacketType.BATTERY.value:
|
elif packet_type_value == PacketType.BATTERY.value:
|
||||||
|
# Full RESP_CODE_BATT_AND_STORAGE: 1 type + 2 level + 4 used_kb + 4 total_kb = 11 bytes.
|
||||||
|
# Minimum viable frame is 3 bytes (type + level). Shorter frames are
|
||||||
|
# malformed — dbuf.read(2) would return short bytes and
|
||||||
|
# int.from_bytes(b"", ...) silently yields 0 (same class as N07).
|
||||||
|
if len(data) < 3:
|
||||||
|
logger.debug(
|
||||||
|
"BATTERY frame too short for level field "
|
||||||
|
f"({len(data)} bytes < 3), skipping"
|
||||||
|
)
|
||||||
|
return
|
||||||
battery_level = int.from_bytes(dbuf.read(2), byteorder="little")
|
battery_level = int.from_bytes(dbuf.read(2), byteorder="little")
|
||||||
result = {"level": battery_level}
|
result = {"level": battery_level}
|
||||||
if len(data) > 3: # has storage info as well
|
# The previous `len(data) > 3` guard let 4-10 byte truncated frames
|
||||||
|
# through, producing silent zero values for used_kb/total_kb because
|
||||||
|
# io.BytesIO.read() returns short data without raising.
|
||||||
|
if len(data) >= 11: # has storage info as well
|
||||||
result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
|
result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
|
||||||
result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
|
result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little")
|
||||||
await self.dispatcher.dispatch(Event(EventType.BATTERY, result))
|
await self.dispatcher.dispatch(Event(EventType.BATTERY, result))
|
||||||
@@ -527,7 +542,7 @@ class MessageReader:
|
|||||||
res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
|
res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
|
||||||
res["payload"] = dbuf.read(4).hex()
|
res["payload"] = dbuf.read(4).hex()
|
||||||
logger.debug("Received raw data")
|
logger.debug("Received raw data")
|
||||||
print(res)
|
logger.debug(res)
|
||||||
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
|
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
|
||||||
|
|
||||||
elif packet_type_value == PacketType.LOGIN_SUCCESS.value:
|
elif packet_type_value == PacketType.LOGIN_SUCCESS.value:
|
||||||
@@ -537,7 +552,7 @@ class MessageReader:
|
|||||||
perms = dbuf.read(1)[0]
|
perms = dbuf.read(1)[0]
|
||||||
res["permissions"] = perms
|
res["permissions"] = perms
|
||||||
res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set
|
res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set
|
||||||
|
if len(data) > 7:
|
||||||
res["pubkey_prefix"] = dbuf.read(6).hex()
|
res["pubkey_prefix"] = dbuf.read(6).hex()
|
||||||
|
|
||||||
attributes = {"pubkey_prefix": res.get("pubkey_prefix")}
|
attributes = {"pubkey_prefix": res.get("pubkey_prefix")}
|
||||||
@@ -562,6 +577,15 @@ class MessageReader:
|
|||||||
)
|
)
|
||||||
|
|
||||||
elif packet_type_value == PacketType.STATUS_RESPONSE.value:
|
elif packet_type_value == PacketType.STATUS_RESPONSE.value:
|
||||||
|
# parse_status with offset=8 reads up through data[56:60]
|
||||||
|
# (rx_airtime field), so the full payload is 60 bytes:
|
||||||
|
# 1 type + 1 reserved + 6 pubkey + 52 status fields. The
|
||||||
|
# BINARY_RESPONSE STATUS path below gates with `>= 52` on
|
||||||
|
# the offset-stripped buffer; this gate is the equivalent
|
||||||
|
# for the push path with the 8-byte header included.
|
||||||
|
if len(data) < 60:
|
||||||
|
logger.debug(f"STATUS_RESPONSE push frame too short ({len(data)} bytes < 60), skipping parse")
|
||||||
|
return
|
||||||
res = parse_status(data, offset=8)
|
res = parse_status(data, offset=8)
|
||||||
data_hex = data[8:].hex()
|
data_hex = data[8:].hex()
|
||||||
logger.debug(f"Status response: {data_hex}")
|
logger.debug(f"Status response: {data_hex}")
|
||||||
@@ -715,8 +739,8 @@ class MessageReader:
|
|||||||
cont = False
|
cont = False
|
||||||
else:
|
else:
|
||||||
freqs.append({"min" : min, "max": max})
|
freqs.append({"min" : min, "max": max})
|
||||||
except e:
|
except Exception as e:
|
||||||
print(e)
|
logger.warning(f"Error parsing ALLOWED_REPEAT_FREQ payload: {e}")
|
||||||
|
|
||||||
res["freqs"] = freqs
|
res["freqs"] = freqs
|
||||||
|
|
||||||
@@ -879,6 +903,9 @@ class MessageReader:
|
|||||||
res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
|
res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
|
||||||
res["path_len"] = dbuf.read(1)[0]
|
res["path_len"] = dbuf.read(1)[0]
|
||||||
payload = dbuf.read()
|
payload = dbuf.read()
|
||||||
|
if len(payload) == 0:
|
||||||
|
logger.debug("CONTROL_DATA frame has empty payload, skipping")
|
||||||
|
return
|
||||||
payload_type = payload[0]
|
payload_type = payload[0]
|
||||||
res["payload_type"] = payload_type
|
res["payload_type"] = payload_type
|
||||||
res["payload"] = payload
|
res["payload"] = payload
|
||||||
@@ -915,7 +942,6 @@ class MessageReader:
|
|||||||
await self.dispatcher.dispatch(
|
await self.dispatcher.dispatch(
|
||||||
Event(EventType.DISCOVER_RESPONSE, ndr, attributes)
|
Event(EventType.DISCOVER_RESPONSE, ndr, attributes)
|
||||||
)
|
)
|
||||||
|
|
||||||
elif packet_type_value == PacketType.CONTACT_DELETED.value:
|
elif packet_type_value == PacketType.CONTACT_DELETED.value:
|
||||||
# N01: PUSH_CODE_CONTACT_DELETED (0x8F) — 1-byte code + 32-byte pubkey
|
# N01: PUSH_CODE_CONTACT_DELETED (0x8F) — 1-byte code + 32-byte pubkey
|
||||||
# Emitted by MyMesh::onContactOverwrite() (MyMesh.cpp:325-334)
|
# Emitted by MyMesh::onContactOverwrite() (MyMesh.cpp:325-334)
|
||||||
@@ -950,3 +976,11 @@ class MessageReader:
|
|||||||
else:
|
else:
|
||||||
logger.debug(f"Unhandled data received {data}")
|
logger.debug(f"Unhandled data received {data}")
|
||||||
logger.debug(f"Unhandled packet type: {packet_type_value}")
|
logger.debug(f"Unhandled packet type: {packet_type_value}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"handle_rx parse error: %s: %s | raw=%s\n%s",
|
||||||
|
type(e).__name__,
|
||||||
|
e,
|
||||||
|
data.hex(),
|
||||||
|
traceback.format_exc(),
|
||||||
|
)
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
from unittest.mock import AsyncMock
|
from unittest.mock import AsyncMock
|
||||||
from meshcore.events import EventType
|
from meshcore.events import EventType
|
||||||
from meshcore.reader import MessageReader
|
from meshcore.reader import MessageReader
|
||||||
@@ -88,3 +89,192 @@ async def test_binary_response():
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(test_binary_response())
|
asyncio.run(test_binary_response())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Reader/parser crash-safety verification tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class _CapturingDispatcher:
|
||||||
|
"""Quiet dispatcher that records every dispatched event."""
|
||||||
|
def __init__(self):
|
||||||
|
self.events = []
|
||||||
|
|
||||||
|
async def dispatch(self, event):
|
||||||
|
self.events.append(event)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_handle_rx_malformed_frame_logged_and_swallowed(caplog):
|
||||||
|
"""Malformed frame must not propagate, must be logged with traceback."""
|
||||||
|
dispatcher = _CapturingDispatcher()
|
||||||
|
reader = MessageReader(dispatcher)
|
||||||
|
|
||||||
|
# 4-byte CHANNEL_MSG_RECV_V3 frame: type byte (0x11) + 1 SNR byte +
|
||||||
|
# 2 reserved bytes, but no channel_idx byte. The handler will raise
|
||||||
|
# IndexError on the next dbuf.read(1)[0] when the buffer is empty.
|
||||||
|
# The umbrella try/except must catch it, log the parse error, and
|
||||||
|
# return cleanly.
|
||||||
|
malformed = bytearray.fromhex("11100000")
|
||||||
|
|
||||||
|
with caplog.at_level(logging.ERROR, logger="meshcore"):
|
||||||
|
await reader.handle_rx(malformed) # must not raise
|
||||||
|
|
||||||
|
error_records = [r for r in caplog.records if "handle_rx parse error" in r.message]
|
||||||
|
assert error_records, (
|
||||||
|
f"Expected an error log containing 'handle_rx parse error'; "
|
||||||
|
f"got: {[r.message for r in caplog.records]}"
|
||||||
|
)
|
||||||
|
# Traceback should be present in the log message
|
||||||
|
assert "Traceback" in error_records[0].message, (
|
||||||
|
"Umbrella log message must include a traceback"
|
||||||
|
)
|
||||||
|
# No CHANNEL_MSG_RECV event should have been dispatched
|
||||||
|
assert not any(e.type == EventType.CHANNEL_MSG_RECV for e in dispatcher.events)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_battery_short_frame_omits_storage_fields():
|
||||||
|
"""Short BATTERY frame must not silently yield zero used_kb/total_kb."""
|
||||||
|
dispatcher = _CapturingDispatcher()
|
||||||
|
reader = MessageReader(dispatcher)
|
||||||
|
|
||||||
|
# 3-byte BATTERY frame: type 0x0c + 2 level bytes (no storage tail).
|
||||||
|
# Pre-fix the `len(data) > 3` gate would have let any frame >= 4 bytes
|
||||||
|
# through, producing a BATTERY event with bogus zero used_kb/total_kb
|
||||||
|
# because io.BytesIO.read() returns short data without raising.
|
||||||
|
# Post-fix (`len(data) >= 11`) the storage fields are skipped entirely.
|
||||||
|
short_battery = bytearray.fromhex("0c8000")
|
||||||
|
|
||||||
|
await reader.handle_rx(short_battery)
|
||||||
|
|
||||||
|
battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY]
|
||||||
|
assert len(battery_events) == 1, (
|
||||||
|
f"Expected exactly one BATTERY event, got {len(battery_events)}"
|
||||||
|
)
|
||||||
|
payload = battery_events[0].payload
|
||||||
|
assert payload["level"] == 0x0080, f"Unexpected level: {payload['level']}"
|
||||||
|
assert "used_kb" not in payload, (
|
||||||
|
"Short BATTERY frame must not include used_kb (would be a silent zero)"
|
||||||
|
)
|
||||||
|
assert "total_kb" not in payload, (
|
||||||
|
"Short BATTERY frame must not include total_kb (would be a silent zero)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_battery_too_short_for_level(caplog):
|
||||||
|
"""BATTERY frame shorter than 3 bytes must be dropped entirely (Option B).
|
||||||
|
|
||||||
|
A 1-byte frame (just the packet-type byte 0x0c, no level bytes) would cause
|
||||||
|
dbuf.read(2) to return b"" and int.from_bytes(b"", ...) to silently yield 0.
|
||||||
|
The fix adds an early return with a debug log.
|
||||||
|
"""
|
||||||
|
dispatcher = _CapturingDispatcher()
|
||||||
|
reader = MessageReader(dispatcher)
|
||||||
|
|
||||||
|
# 1-byte BATTERY frame: only the type byte, no level payload.
|
||||||
|
too_short = bytearray.fromhex("0c")
|
||||||
|
|
||||||
|
with caplog.at_level(logging.DEBUG, logger="meshcore"):
|
||||||
|
await reader.handle_rx(too_short)
|
||||||
|
|
||||||
|
battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY]
|
||||||
|
assert len(battery_events) == 0, (
|
||||||
|
"BATTERY frame shorter than 3 bytes must not dispatch an event"
|
||||||
|
)
|
||||||
|
debug_records = [
|
||||||
|
r for r in caplog.records if "BATTERY frame too short" in r.message
|
||||||
|
]
|
||||||
|
assert debug_records, "Expected a debug log about the short BATTERY frame"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_status_response_short_frame_skipped(caplog):
|
||||||
|
"""Short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros."""
|
||||||
|
dispatcher = _CapturingDispatcher()
|
||||||
|
reader = MessageReader(dispatcher)
|
||||||
|
|
||||||
|
# 30-byte STATUS_RESPONSE push frame, well below the 60-byte minimum.
|
||||||
|
# First byte is the type (0x87 = PacketType.STATUS_RESPONSE), the rest
|
||||||
|
# is arbitrary filler. parse_status with offset=8 reads up through
|
||||||
|
# data[56:60], so anything < 60 bytes would yield short reads and
|
||||||
|
# silent zero values pre-fix.
|
||||||
|
short_status = bytearray([0x87] + [0xAA] * 29)
|
||||||
|
assert len(short_status) == 30
|
||||||
|
|
||||||
|
with caplog.at_level(logging.DEBUG, logger="meshcore"):
|
||||||
|
await reader.handle_rx(short_status)
|
||||||
|
|
||||||
|
status_events = [e for e in dispatcher.events if e.type == EventType.STATUS_RESPONSE]
|
||||||
|
assert len(status_events) == 0, (
|
||||||
|
"Short STATUS_RESPONSE push frame must not dispatch a parsed event"
|
||||||
|
)
|
||||||
|
assert any(
|
||||||
|
"STATUS_RESPONSE push frame too short" in r.message for r in caplog.records
|
||||||
|
), "Expected a debug log line for short STATUS_RESPONSE frames"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_parse_packet_payload_txt_type_decodes_high_bits():
|
||||||
|
"""txt_type must decode the high 6 bits of byte 4, not always be 0."""
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
from Crypto.Hash import HMAC, SHA256
|
||||||
|
from meshcore.meshcore_parser import MeshcorePacketParser
|
||||||
|
|
||||||
|
parser = MeshcorePacketParser()
|
||||||
|
parser.decrypt_channels = True
|
||||||
|
|
||||||
|
# Set up a synthetic channel with a known 16-byte AES key. Direct dict
|
||||||
|
# assignment matches how the parser stores channels (newChannel is async
|
||||||
|
# and serves the same purpose).
|
||||||
|
channel_secret = b"\x01" * 16
|
||||||
|
channel_hash_byte = 0xAB
|
||||||
|
parser.channels[0] = {
|
||||||
|
"channel_idx": 0,
|
||||||
|
"channel_name": "test",
|
||||||
|
"channel_hash": "ab",
|
||||||
|
"channel_secret": channel_secret,
|
||||||
|
}
|
||||||
|
|
||||||
|
# 16-byte plaintext (one AES block):
|
||||||
|
# bytes 0-3 = sender_timestamp (little-endian)
|
||||||
|
# byte 4 = (txt_type << 2) | attempt
|
||||||
|
# bytes 5-15 = message + null padding
|
||||||
|
# Pick txt_type=5, attempt=1 → byte 4 = (5 << 2) | 1 = 0x15.
|
||||||
|
# Pre-fix uncrypted[4:4] is empty so txt_type would be 0;
|
||||||
|
# post-fix uncrypted[4:5] yields 0x15 >> 2 = 5.
|
||||||
|
plaintext = b"\x00\x00\x00\x00\x15hello\x00\x00\x00\x00\x00\x00"
|
||||||
|
assert len(plaintext) == 16
|
||||||
|
|
||||||
|
encrypted = AES.new(channel_secret, AES.MODE_ECB).encrypt(plaintext)
|
||||||
|
|
||||||
|
# cipher_mac = first 2 bytes of HMAC-SHA256(channel_secret, encrypted)
|
||||||
|
h = HMAC.new(channel_secret, digestmod=SHA256)
|
||||||
|
h.update(encrypted)
|
||||||
|
cipher_mac = h.digest()[:2]
|
||||||
|
|
||||||
|
# pkt_payload layout: 1-byte chan_hash + 2-byte cipher_mac + ciphertext
|
||||||
|
pkt_payload = bytes([channel_hash_byte]) + cipher_mac + encrypted
|
||||||
|
|
||||||
|
# parsePacketPayload expects the full payload buffer:
|
||||||
|
# header byte (route_type=1 DIRECT, payload_type=5 channel, ver=0)
|
||||||
|
# path_byte (path_len=0, path_hash_size=1) → 0x00
|
||||||
|
# pkt_payload
|
||||||
|
header = 0x15 # route_type=1, payload_type=5, payload_ver=0
|
||||||
|
path_byte = 0x00
|
||||||
|
payload = bytes([header, path_byte]) + pkt_payload
|
||||||
|
|
||||||
|
log_data = await parser.parsePacketPayload(payload, log_data={})
|
||||||
|
|
||||||
|
assert log_data["payload_type"] == 0x05
|
||||||
|
assert "txt_type" in log_data, (
|
||||||
|
f"txt_type missing from log_data — channel decrypt path was not reached. "
|
||||||
|
f"log_data keys: {list(log_data.keys())}"
|
||||||
|
)
|
||||||
|
assert log_data["txt_type"] == 5, (
|
||||||
|
f"Expected txt_type=5, got {log_data['txt_type']}"
|
||||||
|
)
|
||||||
|
assert log_data["attempt"] == 1, (
|
||||||
|
f"Expected attempt=1, got {log_data['attempt']}"
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user