mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-06-11 11:56:18 +00:00
add req_binary
This commit is contained in:
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "meshcore"
|
name = "meshcore"
|
||||||
version = "2.1.3"
|
version = "2.1.5"
|
||||||
authors = [
|
authors = [
|
||||||
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
|
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
|
||||||
{ name="Alex Wolden", email="awolden@gmail.com" },
|
{ name="Alex Wolden", email="awolden@gmail.com" },
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ logger = logging.getLogger("meshcore")
|
|||||||
|
|
||||||
|
|
||||||
class BinaryReqType(Enum):
|
class BinaryReqType(Enum):
|
||||||
|
STATUS = 0x01
|
||||||
|
KEEP_ALIVE = 0x02
|
||||||
TELEMETRY = 0x03
|
TELEMETRY = 0x03
|
||||||
MMA = 0x04
|
MMA = 0x04
|
||||||
ACL = 0x05
|
ACL = 0x05
|
||||||
@@ -93,6 +95,36 @@ class BinaryCommandHandler(CommandHandlerBase):
|
|||||||
else:
|
else:
|
||||||
return res2.payload
|
return res2.payload
|
||||||
|
|
||||||
|
async def req_status(self, contact, timeout=0):
|
||||||
|
code = BinaryReqType.STATUS.value
|
||||||
|
req = code.to_bytes(1, "little", signed=False)
|
||||||
|
rep = await self.req_binary(contact, req, timeout)
|
||||||
|
|
||||||
|
if rep is None :
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
data=bytes.fromhex(rep["data"])
|
||||||
|
res = {}
|
||||||
|
res["pubkey_pre"] = contact["public_key"][0:12]
|
||||||
|
res["bat"] = int.from_bytes(data[0:2], byteorder="little")
|
||||||
|
res["tx_queue_len"] = int.from_bytes(data[2:4], byteorder="little")
|
||||||
|
res["noise_floor"] = int.from_bytes(data[4:6], byteorder="little", signed=True)
|
||||||
|
res["last_rssi"] = int.from_bytes(data[6:8], byteorder="little", signed=True)
|
||||||
|
res["nb_recv"] = int.from_bytes(data[8:12], byteorder="little", signed=False)
|
||||||
|
res["nb_sent"] = int.from_bytes(data[12:16], byteorder="little", signed=False)
|
||||||
|
res["airtime"] = int.from_bytes(data[16:20], byteorder="little")
|
||||||
|
res["uptime"] = int.from_bytes(data[20:24], byteorder="little")
|
||||||
|
res["sent_flood"] = int.from_bytes(data[24:28], byteorder="little")
|
||||||
|
res["sent_direct"] = int.from_bytes(data[28:32], byteorder="little")
|
||||||
|
res["recv_flood"] = int.from_bytes(data[32:36], byteorder="little")
|
||||||
|
res["recv_direct"] = int.from_bytes(data[36:40], byteorder="little")
|
||||||
|
res["full_evts"] = int.from_bytes(data[40:42], byteorder="little")
|
||||||
|
res["last_snr"] = (int.from_bytes(data[42:44], byteorder="little", signed=True) / 4)
|
||||||
|
res["direct_dups"] = int.from_bytes(data[44:46], byteorder="little")
|
||||||
|
res["flood_dups"] = int.from_bytes(data[46:48], byteorder="little")
|
||||||
|
res["rx_airtime"] = int.from_bytes(data[48:52], byteorder="little")
|
||||||
|
return res if res["uptime"] > 0 else None
|
||||||
|
|
||||||
async def req_telemetry(self, contact, timeout=0):
|
async def req_telemetry(self, contact, timeout=0):
|
||||||
code = BinaryReqType.TELEMETRY.value
|
code = BinaryReqType.TELEMETRY.value
|
||||||
req = code.to_bytes(1, "little", signed=False)
|
req = code.to_bytes(1, "little", signed=False)
|
||||||
|
|||||||
Reference in New Issue
Block a user