From 94809b0ec4e79b6ecf4d0c82ce90340e893339b6 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 25 Jan 2025 14:23:03 +0100 Subject: [PATCH] Added RNS Transport sensor and MQTT renderers --- sbapp/sideband/sense.py | 317 ++++++++++++++++++++++++++++++++-------- 1 file changed, 259 insertions(+), 58 deletions(-) diff --git a/sbapp/sideband/sense.py b/sbapp/sideband/sense.py index 4b700fb..3d4d314 100644 --- a/sbapp/sideband/sense.py +++ b/sbapp/sideband/sense.py @@ -2504,7 +2504,196 @@ class Fuel(Sensor): return rendered class RNSTransport(Sensor): - pass + SID = Sensor.SID_RNS_TRANSPORT + STALE_TIME = 5 + + def __init__(self): + super().__init__(type(self).SID, type(self).STALE_TIME) + + def setup_sensor(self): + self.update_data() + + def teardown_sensor(self): + self.identity = None + self.data = None + + def update_data(self): + r = RNS.Reticulum.get_instance() + ifstats = r.get_interface_stats() + rss = None + if "rss" in ifstats: + rss = ifstats.pop("rss") + self.data = { + "transport_enabled": RNS.Reticulum.transport_enabled(), + "transport_identity": RNS.Transport.identity.hash, + "transport_uptime": time.time()-RNS.Transport.start_time if RNS.Reticulum.transport_enabled() else None, + "traffic_rxb": RNS.Transport.traffic_rxb, + "traffic_txb": RNS.Transport.traffic_txb, + "speed_rx": RNS.Transport.speed_rx, + "speed_tx": RNS.Transport.speed_tx, + "memory_used": rss, + "ifstats": ifstats, + "link_count": r.get_link_count(), + "path_table": sorted(r.get_path_table(max_hops=RNS.Transport.PATHFINDER_M-1), key=lambda e: (e["interface"], e["hops"]) ) + } + + def pack(self): + d = self.data + if d == None: + return None + else: + packed = self.data + return packed + + def unpack(self, packed): + try: + if packed == None: + return None + else: + return packed + + except: + return None + + def render(self, relative_to=None): + if self.data == None: + return None + + try: + d = self.data + ifs = {} + transport_nodes = {} + for ife in d["ifstats"]["interfaces"]: + ifi = ife.copy() + ifi["paths"] = {} + ifi["path_count"] = 0 + ifs[ifi.pop("name")] = ifi + + for path in d["path_table"]: + pifn = path["interface"] + if pifn in ifs: + pif = ifs[pifn] + via = path["via"] + if not via in transport_nodes: + transport_nodes[via] = {"on_interface": pifn} + if not via in pif["paths"]: + pif["paths"][via] = {} + p = path.copy() + p.pop("via") + pif["paths"][via][p.pop("hash")] = p + pif["path_count"] += 1 + + + values = { + "transport_enabled": d["transport_enabled"], + "transport_identity": d["transport_identity"], + "transport_uptime": d["transport_uptime"], + "traffic_rxb": d["traffic_rxb"], + "traffic_txb": d["traffic_txb"], + "speed_rx": d["speed_rx"], + "speed_tx": d["speed_tx"], + "memory_used": d["memory_used"], + "path_count": len(d["path_table"]), + "link_count": d["link_count"], + "interfaces": ifs, + "remote_transport_node_count": len(transport_nodes), + "remote_transport_nodes": transport_nodes, + "path_table": d["path_table"], + } + + rendered = { + "icon": "transit-connection-variant", + "name": "Reticulum Transport", + "values": values, + } + + return rendered + + except Exception as e: + RNS.log(f"Could not render RNS Transport telemetry data. The contained exception was: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + return None + + def render_mqtt(self, relative_to=None): + try: + if self.data != None: + r = self.render(relative_to=relative_to) + v = r["values"] + tid = mqtt_desthash(v["transport_identity"]) + topic = f"{self.name()}/{tid}" + rendered = { + f"{topic}/name": r["name"], + f"{topic}/icon": r["icon"], + f"{topic}/transport_enabled": v["transport_enabled"], + f"{topic}/transport_identity": mqtt_desthash(v["transport_identity"]), + f"{topic}/transport_uptime": v["transport_uptime"], + f"{topic}/traffic_rxb": v["traffic_rxb"], + f"{topic}/traffic_txb": v["traffic_txb"], + f"{topic}/speed_rx": v["speed_rx"], + f"{topic}/speed_tx": v["speed_tx"], + f"{topic}/memory_used": v["memory_used"], + f"{topic}/path_count": v["path_count"], + f"{topic}/link_count": v["link_count"], + f"{topic}/remote_transport_node_count": v["remote_transport_node_count"], + } + + for if_name in v["interfaces"]: + i = v["interfaces"][if_name] + im = "unknown" + if i["mode"] == RNS.Interfaces.Interface.Interface.MODE_FULL: + im = "full" + elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_POINT_TO_POINT: + im = "point_to_point" + elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: + im = "access_point" + elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_ROAMING: + im = "roaming" + elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_BOUNDARY: + im = "boundary" + elif i["mode"] == RNS.Interfaces.Interface.Interface.MODE_GATEWAY: + im = "gateway" + + mif_name = mqtt_hash(i["hash"]) + rendered[f"{topic}/interfaces/{mif_name}/name"] = if_name + rendered[f"{topic}/interfaces/{mif_name}/short_name"] = i["short_name"] + rendered[f"{topic}/interfaces/{mif_name}/up"] = i["status"] + rendered[f"{topic}/interfaces/{mif_name}/mode"] = im + rendered[f"{topic}/interfaces/{mif_name}/type"] = i["type"] + rendered[f"{topic}/interfaces/{mif_name}/bitrate"] = i["bitrate"] + rendered[f"{topic}/interfaces/{mif_name}/rxs"] = i["rxs"] + rendered[f"{topic}/interfaces/{mif_name}/txs"] = i["txs"] + rendered[f"{topic}/interfaces/{mif_name}/rxb"] = i["rxb"] + rendered[f"{topic}/interfaces/{mif_name}/txb"] = i["txb"] + rendered[f"{topic}/interfaces/{mif_name}/ifac_signature"] = mqtt_hash(i["ifac_signature"]) + rendered[f"{topic}/interfaces/{mif_name}/ifac_size"] = i["ifac_size"] + rendered[f"{topic}/interfaces/{mif_name}/ifac_netname"] = i["ifac_netname"] + rendered[f"{topic}/interfaces/{mif_name}/incoming_announce_frequency"] = i["incoming_announce_frequency"] + rendered[f"{topic}/interfaces/{mif_name}/outgoing_announce_frequency"] = i["outgoing_announce_frequency"] + rendered[f"{topic}/interfaces/{mif_name}/held_announces"] = i["held_announces"] + rendered[f"{topic}/interfaces/{mif_name}/path_count"] = i["path_count"] + + for via in i["paths"]: + vh = mqtt_desthash(via) + + for desthash in i["paths"][via]: + dh = mqtt_desthash(desthash) + d = i["paths"][via][desthash] + lp = f"{topic}/interfaces/{mif_name}/paths/{vh}/{dh}" + rendered[f"{lp}/hops"] = d["hops"] + rendered[f"{lp}/timestamp"] = d["timestamp"] + rendered[f"{lp}/expires"] = d["expires"] + rendered[f"{lp}/interface"] = d["interface"] + + else: + rendered = None + + return rendered + + except Exception as e: + RNS.log(f"Could not render RNS Transport telemetry data to MQTT format. The contained exception was: {e}", RNS.LOG_ERROR) + + return None class LXMFPropagation(Sensor): SID = Sensor.SID_LXMF_PROPAGATION @@ -2548,12 +2737,11 @@ class LXMFPropagation(Sensor): RNS.log("Status response from lxmd was received, but contained no data", RNS.LOG_ERROR) elif status_response == self.ERROR_NO_IDENTITY: RNS.log("Updating telemetry from lxmd failed due to missing identification", RNS.LOG_ERROR) - elif status_response == self.ERROR_NO_IDENTITY: - RNS.log("Updating telemetry from lxmd failed due to missing identification", RNS.LOG_ERROR) - elif status_response == self.ERROR_NO_IDENTITY: - RNS.log("Updating telemetry from lxmd failed due to missing identification", RNS.LOG_ERROR) + elif status_response == self.ERROR_NO_ACCESS: + RNS.log("Access was denied while attempting to update lxmd telemetry", RNS.LOG_ERROR) + elif status_response == self.ERROR_TIMEOUT: + RNS.log("Updating telemetry from lxmd failed due to timeout", RNS.LOG_ERROR) else: - RNS.log("Received status response from lxmd", RNS.LOG_DEBUG) # TODO: Remove debug self.data = status_response def pack(self): @@ -2640,63 +2828,76 @@ class LXMFPropagation(Sensor): return None def render_mqtt(self, relative_to=None): - if self.data != None: - r = self.render(relative_to=relative_to) - v = r["values"] - nid = mqtt_desthash(v["destination_hash"]) - topic = f"{self.name()}/{nid}" - rendered = { - f"{topic}/name": r["name"], - f"{topic}/icon": r["icon"], - f"{topic}/identity_hash": mqtt_desthash(v["identity_hash"]), - f"{topic}/uptime": v["uptime"], - f"{topic}/delivery_limit": v["delivery_limit"], - f"{topic}/propagation_limit": v["propagation_limit"], - f"{topic}/autopeer_maxdepth": v["autopeer_maxdepth"], - f"{topic}/from_static_only": v["from_static_only"], - f"{topic}/messagestore_count": v["messagestore_count"], - f"{topic}/messagestore_bytes": v["messagestore_bytes"], - f"{topic}/messagestore_free": v["messagestore_free"], - f"{topic}/messagestore_limit": v["messagestore_limit"], - f"{topic}/messagestore_pct": v["messagestore_pct"], - f"{topic}/client_propagation_messages_received": v["client_propagation_messages_received"], - f"{topic}/client_propagation_messages_served": v["client_propagation_messages_served"], - f"{topic}/unpeered_propagation_incoming": v["unpeered_propagation_incoming"], - f"{topic}/unpeered_propagation_rx_bytes": v["unpeered_propagation_rx_bytes"], - f"{topic}/static_peers": v["static_peers"], - f"{topic}/total_peers": v["total_peers"], - f"{topic}/max_peers": v["max_peers"], - } + try: + if self.data != None: + r = self.render(relative_to=relative_to) + v = r["values"] + nid = mqtt_desthash(v["destination_hash"]) + topic = f"{self.name()}/{nid}" + rendered = { + f"{topic}/name": r["name"], + f"{topic}/icon": r["icon"], + f"{topic}/identity_hash": mqtt_desthash(v["identity_hash"]), + f"{topic}/uptime": v["uptime"], + f"{topic}/delivery_limit": v["delivery_limit"], + f"{topic}/propagation_limit": v["propagation_limit"], + f"{topic}/autopeer_maxdepth": v["autopeer_maxdepth"], + f"{topic}/from_static_only": v["from_static_only"], + f"{topic}/messagestore_count": v["messagestore_count"], + f"{topic}/messagestore_bytes": v["messagestore_bytes"], + f"{topic}/messagestore_free": v["messagestore_free"], + f"{topic}/messagestore_limit": v["messagestore_limit"], + f"{topic}/messagestore_pct": v["messagestore_pct"], + f"{topic}/client_propagation_messages_received": v["client_propagation_messages_received"], + f"{topic}/client_propagation_messages_served": v["client_propagation_messages_served"], + f"{topic}/unpeered_propagation_incoming": v["unpeered_propagation_incoming"], + f"{topic}/unpeered_propagation_rx_bytes": v["unpeered_propagation_rx_bytes"], + f"{topic}/static_peers": v["static_peers"], + f"{topic}/total_peers": v["total_peers"], + f"{topic}/max_peers": v["max_peers"], + } - for peer_id in v["peers"]: - p = v["peers"][peer_id] - pid = mqtt_desthash(peer_id) - rendered[f"{topic}/peers/{pid}/type"] = p["type"] - rendered[f"{topic}/peers/{pid}/state"] = p["state"] - rendered[f"{topic}/peers/{pid}/alive"] = p["alive"] - rendered[f"{topic}/peers/{pid}/last_heard"] = p["last_heard"] - rendered[f"{topic}/peers/{pid}/next_sync_attempt"] = p["next_sync_attempt"] - rendered[f"{topic}/peers/{pid}/last_sync_attempt"] = p["last_sync_attempt"] - rendered[f"{topic}/peers/{pid}/sync_backoff"] = p["sync_backoff"] - rendered[f"{topic}/peers/{pid}/peering_timebase"] = p["peering_timebase"] - rendered[f"{topic}/peers/{pid}/ler"] = p["ler"] - rendered[f"{topic}/peers/{pid}/str"] = p["str"] - rendered[f"{topic}/peers/{pid}/transfer_limit"] = p["transfer_limit"] - rendered[f"{topic}/peers/{pid}/network_distance"] = p["network_distance"] - rendered[f"{topic}/peers/{pid}/rx_bytes"] = p["rx_bytes"] - rendered[f"{topic}/peers/{pid}/tx_bytes"] = p["tx_bytes"] - rendered[f"{topic}/peers/{pid}/messages_offered"] = p["messages_offered"] - rendered[f"{topic}/peers/{pid}/messages_outgoing"] = p["messages_outgoing"] - rendered[f"{topic}/peers/{pid}/messages_incoming"] = p["messages_incoming"] - rendered[f"{topic}/peers/{pid}/messages_unhandled"] = p["messages_unhandled"] - - else: - rendered = None + for peer_id in v["peers"]: + p = v["peers"][peer_id] + pid = mqtt_desthash(peer_id) + rendered[f"{topic}/peers/{pid}/type"] = p["type"] + rendered[f"{topic}/peers/{pid}/state"] = p["state"] + rendered[f"{topic}/peers/{pid}/alive"] = p["alive"] + rendered[f"{topic}/peers/{pid}/last_heard"] = p["last_heard"] + rendered[f"{topic}/peers/{pid}/next_sync_attempt"] = p["next_sync_attempt"] + rendered[f"{topic}/peers/{pid}/last_sync_attempt"] = p["last_sync_attempt"] + rendered[f"{topic}/peers/{pid}/sync_backoff"] = p["sync_backoff"] + rendered[f"{topic}/peers/{pid}/peering_timebase"] = p["peering_timebase"] + rendered[f"{topic}/peers/{pid}/ler"] = p["ler"] + rendered[f"{topic}/peers/{pid}/str"] = p["str"] + rendered[f"{topic}/peers/{pid}/transfer_limit"] = p["transfer_limit"] + rendered[f"{topic}/peers/{pid}/network_distance"] = p["network_distance"] + rendered[f"{topic}/peers/{pid}/rx_bytes"] = p["rx_bytes"] + rendered[f"{topic}/peers/{pid}/tx_bytes"] = p["tx_bytes"] + rendered[f"{topic}/peers/{pid}/messages_offered"] = p["messages_offered"] + rendered[f"{topic}/peers/{pid}/messages_outgoing"] = p["messages_outgoing"] + rendered[f"{topic}/peers/{pid}/messages_incoming"] = p["messages_incoming"] + rendered[f"{topic}/peers/{pid}/messages_unhandled"] = p["messages_unhandled"] + + else: + rendered = None - return rendered + return rendered + + except Exception as e: + RNS.log(f"Could not render lxmd telemetry data to MQTT format. The contained exception was: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + return None def mqtt_desthash(desthash): if type(desthash) == bytes: return RNS.prettyhexrep(desthash) + else: + return None + +def mqtt_hash(ihash): + if type(ihash) == bytes: + return RNS.hexrep(ihash, delimit=False) else: return None \ No newline at end of file