Added RNS Transport sensor and MQTT renderers

This commit is contained in:
Mark Qvist 2025-01-25 14:23:03 +01:00
parent cc722dec9f
commit 94809b0ec4

View File

@ -2504,7 +2504,196 @@ class Fuel(Sensor):
return rendered
class RNSTransport(Sensor):
pass
SID = Sensor.SID_RNS_TRANSPORT
STALE_TIME = 5
def __init__(self):
super().__init__(type(self).SID, type(self).STALE_TIME)
def setup_sensor(self):
self.update_data()
def teardown_sensor(self):
self.identity = None
self.data = None
def update_data(self):
r = RNS.Reticulum.get_instance()
ifstats = r.get_interface_stats()
rss = None
if "rss" in ifstats:
rss = ifstats.pop("rss")
self.data = {
"transport_enabled": RNS.Reticulum.transport_enabled(),
"transport_identity": RNS.Transport.identity.hash,
"transport_uptime": time.time()-RNS.Transport.start_time if RNS.Reticulum.transport_enabled() else None,
"traffic_rxb": RNS.Transport.traffic_rxb,
"traffic_txb": RNS.Transport.traffic_txb,
"speed_rx": RNS.Transport.speed_rx,
"speed_tx": RNS.Transport.speed_tx,
"memory_used": rss,
"ifstats": ifstats,
"link_count": r.get_link_count(),
"path_table": sorted(r.get_path_table(max_hops=RNS.Transport.PATHFINDER_M-1), key=lambda e: (e["interface"], e["hops"]) )
}
def pack(self):
d = self.data
if d == None:
return None
else:
packed = self.data
return packed
def unpack(self, packed):
try:
if packed == None:
return None
else:
return packed
except:
return None
def render(self, relative_to=None):
if self.data == None:
return None
try:
d = self.data
ifs = {}
transport_nodes = {}
for ife in d["ifstats"]["interfaces"]:
ifi = ife.copy()
ifi["paths"] = {}
ifi["path_count"] = 0
ifs[ifi.pop("name")] = ifi
for path in d["path_table"]:
pifn = path["interface"]
if pifn in ifs:
pif = ifs[pifn]
via = path["via"]
if not via in transport_nodes:
transport_nodes[via] = {"on_interface": pifn}
if not via in pif["paths"]:
pif["paths"][via] = {}
p = path.copy()
p.pop("via")
pif["paths"][via][p.pop("hash")] = p
pif["path_count"] += 1
values = {
"transport_enabled": d["transport_enabled"],
"transport_identity": d["transport_identity"],
"transport_uptime": d["transport_uptime"],
"traffic_rxb": d["traffic_rxb"],
"traffic_txb": d["traffic_txb"],
"speed_rx": d["speed_rx"],
"speed_tx": d["speed_tx"],
"memory_used": d["memory_used"],
"path_count": len(d["path_table"]),
"link_count": d["link_count"],
"interfaces": ifs,
"remote_transport_node_count": len(transport_nodes),
"remote_transport_nodes": transport_nodes,
"path_table": d["path_table"],
}
rendered = {
"icon": "transit-connection-variant",
"name": "Reticulum Transport",
"values": values,
}
return rendered
except Exception as e:
RNS.log(f"Could not render RNS Transport telemetry data. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
return None
def render_mqtt(self, relative_to=None):
try:
if self.data != None:
r = self.render(relative_to=relative_to)
v = r["values"]
tid = mqtt_desthash(v["transport_identity"])
topic = f"{self.name()}/{tid}"
rendered = {
f"{topic}/name": r["name"],
f"{topic}/icon": r["icon"],
f"{topic}/transport_enabled": v["transport_enabled"],
f"{topic}/transport_identity": mqtt_desthash(v["transport_identity"]),
f"{topic}/transport_uptime": v["transport_uptime"],
f"{topic}/traffic_rxb": v["traffic_rxb"],
f"{topic}/traffic_txb": v["traffic_txb"],
f"{topic}/speed_rx": v["speed_rx"],
f"{topic}/speed_tx": v["speed_tx"],
f"{topic}/memory_used": v["memory_used"],
f"{topic}/path_count": v["path_count"],
f"{topic}/link_count": v["link_count"],
f"{topic}/remote_transport_node_count": v["remote_transport_node_count"],
}
for if_name in v["interfaces"]:
i = v["interfaces"][if_name]
im = "unknown"
if i["mode"] == RNS.Interfaces.Interface.Interface.MODE_FULL:
im = "full"
elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_POINT_TO_POINT:
im = "point_to_point"
elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
im = "access_point"
elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_ROAMING:
im = "roaming"
elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY:
im = "boundary"
elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_GATEWAY:
im = "gateway"
mif_name = mqtt_hash(i["hash"])
rendered[f"{topic}/interfaces/{mif_name}/name"] = if_name
rendered[f"{topic}/interfaces/{mif_name}/short_name"] = i["short_name"]
rendered[f"{topic}/interfaces/{mif_name}/up"] = i["status"]
rendered[f"{topic}/interfaces/{mif_name}/mode"] = im
rendered[f"{topic}/interfaces/{mif_name}/type"] = i["type"]
rendered[f"{topic}/interfaces/{mif_name}/bitrate"] = i["bitrate"]
rendered[f"{topic}/interfaces/{mif_name}/rxs"] = i["rxs"]
rendered[f"{topic}/interfaces/{mif_name}/txs"] = i["txs"]
rendered[f"{topic}/interfaces/{mif_name}/rxb"] = i["rxb"]
rendered[f"{topic}/interfaces/{mif_name}/txb"] = i["txb"]
rendered[f"{topic}/interfaces/{mif_name}/ifac_signature"] = mqtt_hash(i["ifac_signature"])
rendered[f"{topic}/interfaces/{mif_name}/ifac_size"] = i["ifac_size"]
rendered[f"{topic}/interfaces/{mif_name}/ifac_netname"] = i["ifac_netname"]
rendered[f"{topic}/interfaces/{mif_name}/incoming_announce_frequency"] = i["incoming_announce_frequency"]
rendered[f"{topic}/interfaces/{mif_name}/outgoing_announce_frequency"] = i["outgoing_announce_frequency"]
rendered[f"{topic}/interfaces/{mif_name}/held_announces"] = i["held_announces"]
rendered[f"{topic}/interfaces/{mif_name}/path_count"] = i["path_count"]
for via in i["paths"]:
vh = mqtt_desthash(via)
for desthash in i["paths"][via]:
dh = mqtt_desthash(desthash)
d = i["paths"][via][desthash]
lp = f"{topic}/interfaces/{mif_name}/paths/{vh}/{dh}"
rendered[f"{lp}/hops"] = d["hops"]
rendered[f"{lp}/timestamp"] = d["timestamp"]
rendered[f"{lp}/expires"] = d["expires"]
rendered[f"{lp}/interface"] = d["interface"]
else:
rendered = None
return rendered
except Exception as e:
RNS.log(f"Could not render RNS Transport telemetry data to MQTT format. The contained exception was: {e}", RNS.LOG_ERROR)
return None
class LXMFPropagation(Sensor):
SID = Sensor.SID_LXMF_PROPAGATION
@ -2548,12 +2737,11 @@ class LXMFPropagation(Sensor):
RNS.log("Status response from lxmd was received, but contained no data", RNS.LOG_ERROR)
elif status_response == self.ERROR_NO_IDENTITY:
RNS.log("Updating telemetry from lxmd failed due to missing identification", RNS.LOG_ERROR)
elif status_response == self.ERROR_NO_IDENTITY:
RNS.log("Updating telemetry from lxmd failed due to missing identification", RNS.LOG_ERROR)
elif status_response == self.ERROR_NO_IDENTITY:
RNS.log("Updating telemetry from lxmd failed due to missing identification", RNS.LOG_ERROR)
elif status_response == self.ERROR_NO_ACCESS:
RNS.log("Access was denied while attempting to update lxmd telemetry", RNS.LOG_ERROR)
elif status_response == self.ERROR_TIMEOUT:
RNS.log("Updating telemetry from lxmd failed due to timeout", RNS.LOG_ERROR)
else:
RNS.log("Received status response from lxmd", RNS.LOG_DEBUG) # TODO: Remove debug
self.data = status_response
def pack(self):
@ -2640,63 +2828,76 @@ class LXMFPropagation(Sensor):
return None
def render_mqtt(self, relative_to=None):
if self.data != None:
r = self.render(relative_to=relative_to)
v = r["values"]
nid = mqtt_desthash(v["destination_hash"])
topic = f"{self.name()}/{nid}"
rendered = {
f"{topic}/name": r["name"],
f"{topic}/icon": r["icon"],
f"{topic}/identity_hash": mqtt_desthash(v["identity_hash"]),
f"{topic}/uptime": v["uptime"],
f"{topic}/delivery_limit": v["delivery_limit"],
f"{topic}/propagation_limit": v["propagation_limit"],
f"{topic}/autopeer_maxdepth": v["autopeer_maxdepth"],
f"{topic}/from_static_only": v["from_static_only"],
f"{topic}/messagestore_count": v["messagestore_count"],
f"{topic}/messagestore_bytes": v["messagestore_bytes"],
f"{topic}/messagestore_free": v["messagestore_free"],
f"{topic}/messagestore_limit": v["messagestore_limit"],
f"{topic}/messagestore_pct": v["messagestore_pct"],
f"{topic}/client_propagation_messages_received": v["client_propagation_messages_received"],
f"{topic}/client_propagation_messages_served": v["client_propagation_messages_served"],
f"{topic}/unpeered_propagation_incoming": v["unpeered_propagation_incoming"],
f"{topic}/unpeered_propagation_rx_bytes": v["unpeered_propagation_rx_bytes"],
f"{topic}/static_peers": v["static_peers"],
f"{topic}/total_peers": v["total_peers"],
f"{topic}/max_peers": v["max_peers"],
}
try:
if self.data != None:
r = self.render(relative_to=relative_to)
v = r["values"]
nid = mqtt_desthash(v["destination_hash"])
topic = f"{self.name()}/{nid}"
rendered = {
f"{topic}/name": r["name"],
f"{topic}/icon": r["icon"],
f"{topic}/identity_hash": mqtt_desthash(v["identity_hash"]),
f"{topic}/uptime": v["uptime"],
f"{topic}/delivery_limit": v["delivery_limit"],
f"{topic}/propagation_limit": v["propagation_limit"],
f"{topic}/autopeer_maxdepth": v["autopeer_maxdepth"],
f"{topic}/from_static_only": v["from_static_only"],
f"{topic}/messagestore_count": v["messagestore_count"],
f"{topic}/messagestore_bytes": v["messagestore_bytes"],
f"{topic}/messagestore_free": v["messagestore_free"],
f"{topic}/messagestore_limit": v["messagestore_limit"],
f"{topic}/messagestore_pct": v["messagestore_pct"],
f"{topic}/client_propagation_messages_received": v["client_propagation_messages_received"],
f"{topic}/client_propagation_messages_served": v["client_propagation_messages_served"],
f"{topic}/unpeered_propagation_incoming": v["unpeered_propagation_incoming"],
f"{topic}/unpeered_propagation_rx_bytes": v["unpeered_propagation_rx_bytes"],
f"{topic}/static_peers": v["static_peers"],
f"{topic}/total_peers": v["total_peers"],
f"{topic}/max_peers": v["max_peers"],
}
for peer_id in v["peers"]:
p = v["peers"][peer_id]
pid = mqtt_desthash(peer_id)
rendered[f"{topic}/peers/{pid}/type"] = p["type"]
rendered[f"{topic}/peers/{pid}/state"] = p["state"]
rendered[f"{topic}/peers/{pid}/alive"] = p["alive"]
rendered[f"{topic}/peers/{pid}/last_heard"] = p["last_heard"]
rendered[f"{topic}/peers/{pid}/next_sync_attempt"] = p["next_sync_attempt"]
rendered[f"{topic}/peers/{pid}/last_sync_attempt"] = p["last_sync_attempt"]
rendered[f"{topic}/peers/{pid}/sync_backoff"] = p["sync_backoff"]
rendered[f"{topic}/peers/{pid}/peering_timebase"] = p["peering_timebase"]
rendered[f"{topic}/peers/{pid}/ler"] = p["ler"]
rendered[f"{topic}/peers/{pid}/str"] = p["str"]
rendered[f"{topic}/peers/{pid}/transfer_limit"] = p["transfer_limit"]
rendered[f"{topic}/peers/{pid}/network_distance"] = p["network_distance"]
rendered[f"{topic}/peers/{pid}/rx_bytes"] = p["rx_bytes"]
rendered[f"{topic}/peers/{pid}/tx_bytes"] = p["tx_bytes"]
rendered[f"{topic}/peers/{pid}/messages_offered"] = p["messages_offered"]
rendered[f"{topic}/peers/{pid}/messages_outgoing"] = p["messages_outgoing"]
rendered[f"{topic}/peers/{pid}/messages_incoming"] = p["messages_incoming"]
rendered[f"{topic}/peers/{pid}/messages_unhandled"] = p["messages_unhandled"]
for peer_id in v["peers"]:
p = v["peers"][peer_id]
pid = mqtt_desthash(peer_id)
rendered[f"{topic}/peers/{pid}/type"] = p["type"]
rendered[f"{topic}/peers/{pid}/state"] = p["state"]
rendered[f"{topic}/peers/{pid}/alive"] = p["alive"]
rendered[f"{topic}/peers/{pid}/last_heard"] = p["last_heard"]
rendered[f"{topic}/peers/{pid}/next_sync_attempt"] = p["next_sync_attempt"]
rendered[f"{topic}/peers/{pid}/last_sync_attempt"] = p["last_sync_attempt"]
rendered[f"{topic}/peers/{pid}/sync_backoff"] = p["sync_backoff"]
rendered[f"{topic}/peers/{pid}/peering_timebase"] = p["peering_timebase"]
rendered[f"{topic}/peers/{pid}/ler"] = p["ler"]
rendered[f"{topic}/peers/{pid}/str"] = p["str"]
rendered[f"{topic}/peers/{pid}/transfer_limit"] = p["transfer_limit"]
rendered[f"{topic}/peers/{pid}/network_distance"] = p["network_distance"]
rendered[f"{topic}/peers/{pid}/rx_bytes"] = p["rx_bytes"]
rendered[f"{topic}/peers/{pid}/tx_bytes"] = p["tx_bytes"]
rendered[f"{topic}/peers/{pid}/messages_offered"] = p["messages_offered"]
rendered[f"{topic}/peers/{pid}/messages_outgoing"] = p["messages_outgoing"]
rendered[f"{topic}/peers/{pid}/messages_incoming"] = p["messages_incoming"]
rendered[f"{topic}/peers/{pid}/messages_unhandled"] = p["messages_unhandled"]
else:
rendered = None
else:
rendered = None
return rendered
return rendered
except Exception as e:
RNS.log(f"Could not render lxmd telemetry data to MQTT format. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
return None
def mqtt_desthash(desthash):
if type(desthash) == bytes:
return RNS.prettyhexrep(desthash)
else:
return None
def mqtt_hash(ihash):
if type(ihash) == bytes:
return RNS.hexrep(ihash, delimit=False)
else:
return None