G1: add verification tests for F06, N07, NEW-C, R02

Adds the four unit tests required by proposal §4.1 verification:

(a) test_g1_handle_rx_malformed_frame_logged_and_swallowed — F06.
    Sends a 4-byte CHANNEL_MSG_RECV_V3 frame missing the channel_idx
    byte. The handler raises IndexError on dbuf.read(1)[0]; the F06
    umbrella must catch it, log "handle_rx parse error" with a
    traceback, and return cleanly without dispatching any event.

(b) test_g1_battery_short_frame_omits_storage_fields — N07.
    Sends a 3-byte BATTERY frame (type + 2-byte level only). Verifies
    that exactly one BATTERY event is dispatched, the payload contains
    `level` but does NOT contain `used_kb` or `total_kb`. Pre-fix the
    `len(data) > 3` gate would have produced both fields with bogus
    silent zeros.

(c) test_g1_status_response_short_frame_skipped — NEW-C.
    Sends a 30-byte STATUS_RESPONSE push frame (well below the 60-byte
    minimum). Verifies that no STATUS_RESPONSE event is dispatched and
    the new "STATUS_RESPONSE push frame too short" debug log fires.

(d) test_g1_parse_packet_payload_txt_type_decodes_high_bits — R02.
    Sets up a synthetic channel with a known 16-byte AES key, encrypts
    a 16-byte plaintext where byte 4 = (5 << 2) | 1 = 0x15, builds the
    full pkt_payload (chan_hash + cipher_mac + ciphertext), wraps it in
    a minimal route header, calls parsePacketPayload directly, and
    asserts log_data["txt_type"] == 5 and log_data["attempt"] == 1.
    Pre-R02-fix `uncrypted[4:4]` was the empty slice, so txt_type was
    always 0 — this test would have failed with txt_type=0.

Adds a small _CapturingDispatcher helper class. Adds imports for
logging at the top of the file. The existing test_binary_response is
left untouched.

File: tests/unit/test_reader.py
This commit is contained in:
Matthew Wolter
2026-04-11 18:39:01 -07:00
parent d98523cddb
commit f3973151d6

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import logging
from unittest.mock import AsyncMock from unittest.mock import AsyncMock
from meshcore.events import EventType from meshcore.events import EventType
from meshcore.reader import MessageReader from meshcore.reader import MessageReader
@@ -88,3 +89,165 @@ async def test_binary_response():
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(test_binary_response()) asyncio.run(test_binary_response())
# ---------------------------------------------------------------------------
# G1 verification tests (proposal §4.1)
# ---------------------------------------------------------------------------
class _CapturingDispatcher:
"""Quiet dispatcher that records every dispatched event."""
def __init__(self):
self.events = []
async def dispatch(self, event):
self.events.append(event)
@pytest.mark.asyncio
async def test_g1_handle_rx_malformed_frame_logged_and_swallowed(caplog):
"""G1/F06: malformed frame must not propagate, must be logged with traceback."""
dispatcher = _CapturingDispatcher()
reader = MessageReader(dispatcher)
# 4-byte CHANNEL_MSG_RECV_V3 frame: type byte (0x11) + 1 SNR byte +
# 2 reserved bytes, but no channel_idx byte. The handler will raise
# IndexError on the next dbuf.read(1)[0] when the buffer is empty.
# F06's umbrella try/except must catch it, log the parse error, and
# return cleanly.
malformed = bytearray.fromhex("11100000")
with caplog.at_level(logging.ERROR, logger="meshcore"):
await reader.handle_rx(malformed) # must not raise
error_records = [r for r in caplog.records if "handle_rx parse error" in r.message]
assert error_records, (
f"Expected an error log containing 'handle_rx parse error'; "
f"got: {[r.message for r in caplog.records]}"
)
# Traceback should be present in the log message (F06 includes it)
assert "Traceback" in error_records[0].message, (
"F06 umbrella log message must include a traceback"
)
# No CHANNEL_MSG_RECV event should have been dispatched
assert not any(e.type == EventType.CHANNEL_MSG_RECV for e in dispatcher.events)
@pytest.mark.asyncio
async def test_g1_battery_short_frame_omits_storage_fields():
"""G1/N07: short BATTERY frame must not silently yield zero used_kb/total_kb."""
dispatcher = _CapturingDispatcher()
reader = MessageReader(dispatcher)
# 3-byte BATTERY frame: type 0x0c + 2 level bytes (no storage tail).
# Pre-fix the `len(data) > 3` gate would have let any frame >= 4 bytes
# through, producing a BATTERY event with bogus zero used_kb/total_kb
# because io.BytesIO.read() returns short data without raising.
# Post-fix (`len(data) >= 11`) the storage fields are skipped entirely.
short_battery = bytearray.fromhex("0c8000")
await reader.handle_rx(short_battery)
battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY]
assert len(battery_events) == 1, (
f"Expected exactly one BATTERY event, got {len(battery_events)}"
)
payload = battery_events[0].payload
assert payload["level"] == 0x0080, f"Unexpected level: {payload['level']}"
assert "used_kb" not in payload, (
"Short BATTERY frame must not include used_kb (would be a silent zero)"
)
assert "total_kb" not in payload, (
"Short BATTERY frame must not include total_kb (would be a silent zero)"
)
@pytest.mark.asyncio
async def test_g1_status_response_short_frame_skipped(caplog):
"""G1/NEW-C: short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros."""
dispatcher = _CapturingDispatcher()
reader = MessageReader(dispatcher)
# 30-byte STATUS_RESPONSE push frame, well below the 60-byte minimum.
# First byte is the type (0x87 = PacketType.STATUS_RESPONSE), the rest
# is arbitrary filler. parse_status with offset=8 reads up through
# data[56:60], so anything < 60 bytes would yield short reads and
# silent zero values pre-fix.
short_status = bytearray([0x87] + [0xAA] * 29)
assert len(short_status) == 30
with caplog.at_level(logging.DEBUG, logger="meshcore"):
await reader.handle_rx(short_status)
status_events = [e for e in dispatcher.events if e.type == EventType.STATUS_RESPONSE]
assert len(status_events) == 0, (
"Short STATUS_RESPONSE push frame must not dispatch a parsed event"
)
assert any(
"STATUS_RESPONSE push frame too short" in r.message for r in caplog.records
), "Expected the NEW-C debug log line for short STATUS_RESPONSE frames"
@pytest.mark.asyncio
async def test_g1_parse_packet_payload_txt_type_decodes_high_bits():
"""G1/R02: txt_type must decode the high 6 bits of byte 4, not always be 0."""
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
from meshcore.meshcore_parser import MeshcorePacketParser
parser = MeshcorePacketParser()
parser.decrypt_channels = True
# Set up a synthetic channel with a known 16-byte AES key. Direct dict
# assignment matches how the parser stores channels (newChannel is async
# and serves the same purpose).
channel_secret = b"\x01" * 16
channel_hash_byte = 0xAB
parser.channels[0] = {
"channel_idx": 0,
"channel_name": "test",
"channel_hash": "ab",
"channel_secret": channel_secret,
}
# 16-byte plaintext (one AES block):
# bytes 0-3 = sender_timestamp (little-endian)
# byte 4 = (txt_type << 2) | attempt
# bytes 5-15 = message + null padding
# Pick txt_type=5, attempt=1 → byte 4 = (5 << 2) | 1 = 0x15.
# Pre-R02-fix uncrypted[4:4] is empty so txt_type would be 0;
# post-fix uncrypted[4:5] yields 0x15 >> 2 = 5.
plaintext = b"\x00\x00\x00\x00\x15hello\x00\x00\x00\x00\x00\x00"
assert len(plaintext) == 16
encrypted = AES.new(channel_secret, AES.MODE_ECB).encrypt(plaintext)
# cipher_mac = first 2 bytes of HMAC-SHA256(channel_secret, encrypted)
h = HMAC.new(channel_secret, digestmod=SHA256)
h.update(encrypted)
cipher_mac = h.digest()[:2]
# pkt_payload layout: 1-byte chan_hash + 2-byte cipher_mac + ciphertext
pkt_payload = bytes([channel_hash_byte]) + cipher_mac + encrypted
# parsePacketPayload expects the full payload buffer:
# header byte (route_type=1 DIRECT, payload_type=5 channel, ver=0)
# path_byte (path_len=0, path_hash_size=1) → 0x00
# pkt_payload
header = 0x15 # route_type=1, payload_type=5, payload_ver=0
path_byte = 0x00
payload = bytes([header, path_byte]) + pkt_payload
log_data = await parser.parsePacketPayload(payload, log_data={})
assert log_data["payload_type"] == 0x05
assert "txt_type" in log_data, (
f"txt_type missing from log_data — channel decrypt path was not reached. "
f"log_data keys: {list(log_data.keys())}"
)
assert log_data["txt_type"] == 5, (
f"Expected txt_type=5 (R02 fix), got {log_data['txt_type']}"
)
assert log_data["attempt"] == 1, (
f"Expected attempt=1, got {log_data['attempt']}"
)