diff --git a/sbapp/main.py b/sbapp/main.py index 46d351d..1028867 100644 --- a/sbapp/main.py +++ b/sbapp/main.py @@ -3207,7 +3207,14 @@ class SidebandApp(MDApp): self.telemetry_info_dialog.dismiss() ok_button.bind(on_release=dl_ok) - result = self.sideband.send_latest_telemetry(to_addr=self.sideband.config["telemetry_collector"]) + collector_address = self.sideband.config["telemetry_collector"] + + if self.sideband.config["telemetry_send_all_to_collector"]: + last_timebase = (self.sideband.getpersistent(f"telemetry.{RNS.hexrep(collector_address, delimit=False)}.last_send_success_timebase") or 0) + result = self.sideband.create_telemetry_collector_response(to_addr=collector_address, timebase=last_timebase, is_authorized_telemetry_request=True) + else: + result = self.sideband.send_latest_telemetry(to_addr=collector_address) + if result == "no_address": title_str = "Invalid Address" info_str = "You must specify a valid LXMF address for the collector you want to sent data to." @@ -3223,9 +3230,15 @@ class SidebandApp(MDApp): elif result == "sent": title_str = "Update Sent" info_str = "A telemetry update was sent to the collector." + elif result == "not_sent": + title_str = "Not Sent" + info_str = "The telemetry update could not be sent." + elif result == "nothing_to_send": + title_str = "Nothing to Send" + info_str = "There was no new data to send." else: title_str = "Unknown Status" - info_str = "The status of the telemetry update is unknown." + info_str = "The status of the telemetry update is unknown: "+str(result) self.telemetry_info_dialog.title = title_str self.telemetry_info_dialog.text = info_str @@ -3263,7 +3276,7 @@ class SidebandApp(MDApp): info_str = "A telemetry request could not be sent." else: title_str = "Unknown Status" - info_str = "The status of the telemetry request is unknown." + info_str = "The status of the telemetry request is unknown: "+str(result) self.telemetry_info_dialog.title = title_str self.telemetry_info_dialog.text = info_str diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index 627f40d..0d7b0a8 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -747,21 +747,25 @@ class SidebandCore(): RNS.log("Error while checking telemetry sending for "+RNS.prettyhexrep(context_dest)+": "+str(e), RNS.LOG_ERROR) return False - def requests_allowed_from(self, context_dest): + def allow_request_from(self, context_dest): try: if self.config["telemetry_allow_requests_from_anyone"] == True: return True + if self.config["telemetry_allow_requests_from_trusted"] == True: + existing_conv = self._db_conversation(context_dest) + return existing_conv["trust"] == 1 + + return self.requests_allowed_from(context_dest) + + except Exception as e: + RNS.log("Error while checking request permissions for "+RNS.prettyhexrep(context_dest)+": "+str(e), RNS.LOG_ERROR) + return False + + def requests_allowed_from(self, context_dest): + try: existing_conv = self._db_conversation(context_dest) if existing_conv != None: - if existing_conv["trust"] == 1: - trusted = True - else: - trusted = False - - if self.config["telemetry_allow_requests_from_trusted"] == True: - return trusted - cd = existing_conv["data"] if cd != None and "allow_requests" in cd and cd["allow_requests"] == True: return True @@ -977,15 +981,18 @@ class SidebandCore(): return "not_sent" - def send_latest_telemetry(self, to_addr=None, stream=None): + def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False): if to_addr == None or to_addr == self.lxmf_destination.hash: return "no_address" else: + if to_addr == self.config["telemetry_collector"]: + is_authorized_telemetry_request = True + if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True: RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG) return "in_progress" - if self.latest_packed_telemetry != None and self.latest_telemetry != None: + if (self.latest_packed_telemetry != None and self.latest_telemetry != None) or stream != None: dest_identity = RNS.Identity.recall(to_addr) if dest_identity == None: @@ -1002,7 +1009,7 @@ class SidebandCore(): else: desired_method = LXMF.LXMessage.DIRECT - lxm_fields = self.get_message_fields(to_addr) + lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request) if stream != None and len(stream) > 0: lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream @@ -1041,7 +1048,7 @@ class SidebandCore(): else: RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING) - return "not_sent" + return "nothing_to_send" def list_telemetry(self, context_dest = None, after = None, before = None, limit = None): @@ -1559,10 +1566,11 @@ class SidebandCore(): return results - def _db_save_telemetry(self, context_dest, telemetry, physical_link = None, source_dest = None): + def _db_save_telemetry(self, context_dest, telemetry, physical_link = None, source_dest = None, via = None): try: remote_telemeter = Telemeter.from_packed(telemetry) - telemetry_timestamp = remote_telemeter.read_all()["time"]["utc"] + read_telemetry = remote_telemeter.read_all() + telemetry_timestamp = read_telemetry["time"]["utc"] db = self.__db_connect() dbc = db.cursor() @@ -1572,7 +1580,8 @@ class SidebandCore(): result = dbc.fetchall() if len(result) != 0: - return + RNS.log("Telemetry entry with source "+RNS.prettyhexrep(context_dest)+" and timestamp "+str(telemetry_timestamp)+" already exists, skipping save", RNS.LOG_DEBUG) + return None if physical_link != None and len(physical_link) != 0: remote_telemeter.synthesize("physical_link") @@ -1602,6 +1611,20 @@ class SidebandCore(): remote_telemeter.sensors["received"].update_data() telemetry = remote_telemeter.packed() + + if via != None: + if not "received" in remote_telemeter.sensors: + remote_telemeter.synthesize("received") + + if "by" in remote_telemeter.sensors["received"].data: + remote_telemeter.sensors["received"].by = remote_telemeter.sensors["received"].data["by"] + if "distance" in remote_telemeter.sensors["received"].data: + remote_telemeter.sensors["received"].geodesic_distance = remote_telemeter.sensors["received"].data["distance"]["geodesic"] + remote_telemeter.sensors["received"].euclidian_distance = remote_telemeter.sensors["received"].data["distance"]["euclidian"] + + remote_telemeter.sensors["received"].via = via + remote_telemeter.sensors["received"].update_data() + telemetry = remote_telemeter.packed() query = "INSERT INTO telemetry (dest_context, ts, data) values (?, ?, ?)" data = (context_dest, telemetry_timestamp, telemetry) @@ -1617,35 +1640,52 @@ class SidebandCore(): def _db_update_appearance(self, context_dest, timestamp, appearance): conv = self._db_conversation(context_dest) - data_dict = conv["data"] - if data_dict == None: - data_dict = {} - if not "appearance" in data_dict: - data_dict["appearance"] = None + if conv == None: + ae = [appearance, int(time.time())] + # TODO: Clean out these temporary values at some interval. + # Probably expire after 14 days or so. + self.setpersistent("temp.peer_appearance."+RNS.hexrep(context_dest, delimit=False), ae) + else: + data_dict = conv["data"] + if data_dict == None: + data_dict = {} - if data_dict["appearance"] != appearance: - data_dict["appearance"] = appearance - packed_dict = msgpack.packb(data_dict) - - db = self.__db_connect() - dbc = db.cursor() - - query = "UPDATE conv set data = ? where dest_context = ?" - data = (packed_dict, context_dest) - dbc.execute(query, data) - result = dbc.fetchall() - db.commit() + if not "appearance" in data_dict: + data_dict["appearance"] = None + + if data_dict["appearance"] != appearance: + data_dict["appearance"] = appearance + packed_dict = msgpack.packb(data_dict) + + db = self.__db_connect() + dbc = db.cursor() + + query = "UPDATE conv set data = ? where dest_context = ?" + data = (packed_dict, context_dest) + dbc.execute(query, data) + result = dbc.fetchall() + db.commit() def _db_get_appearance(self, context_dest, conv = None): if context_dest == self.lxmf_destination.hash: return [self.config["telemetry_icon"], self.config["telemetry_fg"], self.config["telemetry_bg"]] else: - if conv == None: - conv = self._db_conversation(context_dest) - - if conv != None and "data" in conv: + data_dict = None + if conv != None: data_dict = conv["data"] + + else: + conv = self._db_conversation(context_dest) + if conv != None: + data_dict = conv["data"] + else: + data_dict = {} + apd = self.getpersistent("temp.peer_appearance."+RNS.hexrep(context_dest, delimit=False)) + if apd != None: + data_dict["appearance"] = apd + + if data_dict != None: try: if data_dict != None and "appearance" in data_dict: def htf(cbytes): @@ -2029,7 +2069,9 @@ class SidebandCore(): if not originator and lxm.fields != None: if self.config["telemetry_receive_trusted_only"] == False or (self.config["telemetry_receive_trusted_only"] == True and self.is_trusted(context_dest)): if LXMF.FIELD_ICON_APPEARANCE in lxm.fields: - self._db_update_appearance(context_dest, lxm.timestamp, lxm.fields[LXMF.FIELD_ICON_APPEARANCE]) + peer_appearance = lxm.fields[LXMF.FIELD_ICON_APPEARANCE] + if peer_appearance != None and len(peer_appearance) > 0 and len(peer_appearance) < 96: + self._db_update_appearance(context_dest, lxm.timestamp, peer_appearance) if LXMF.FIELD_TELEMETRY in lxm.fields: physical_link = {} @@ -2040,9 +2082,16 @@ class SidebandCore(): packed_telemetry = self._db_save_telemetry(context_dest, lxm.fields[LXMF.FIELD_TELEMETRY], physical_link=physical_link, source_dest=context_dest) if LXMF.FIELD_TELEMETRY_STREAM in lxm.fields: - for telemetry_entry in lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]: - # TODO: Implement - RNS.log("TODO: Save this telemetry stream entry: "+str(telemetry_entry), RNS.LOG_WARNING) + if lxm.fields[LXMF.FIELD_TELEMETRY_STREAM] != None and len(lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]) > 0: + for telemetry_entry in lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]: + tsource = telemetry_entry[0] + ttstemp = telemetry_entry[1] + tpacked = telemetry_entry[2] + if self._db_save_telemetry(tsource, tpacked, via = context_dest): + RNS.log("Saved telemetry stream entry from "+RNS.prettyhexrep(tsource), RNS.LOG_WARNING) + + else: + RNS.log("Received telemetry stream field with no data: "+str(lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]), RNS.LOG_DEBUG) if own_command or len(lxm.content) != 0 or len(lxm.title) != 0: db = self.__db_connect() @@ -2482,7 +2531,8 @@ class SidebandCore(): self.pending_telemetry_send = True self.pending_telemetry_send_try += 1 if self.config["telemetry_send_all_to_collector"]: - self.create_telemetry_collector_response(to_addr=collector_address) + last_timebase = (self.getpersistent(f"telemetry.{RNS.hexrep(collector_address, delimit=False)}.last_send_success_timebase") or 0) + self.create_telemetry_collector_response(to_addr=collector_address, timebase=last_timebase, is_authorized_telemetry_request=True) else: self.send_latest_telemetry(to_addr=collector_address) else: @@ -2996,9 +3046,9 @@ class SidebandCore(): except Exception as e: RNS.log("Error while setting last successul telemetry timebase for "+RNS.prettyhexrep(message.destination_hash), RNS.LOG_DEBUG) - def get_message_fields(self, context_dest, telemetry_update=False): + def get_message_fields(self, context_dest, telemetry_update=False, is_authorized_telemetry_request=False): fields = {} - send_telemetry = (telemetry_update == True) or self.should_send_telemetry(context_dest) + send_telemetry = (telemetry_update == True) or (self.should_send_telemetry(context_dest) or is_authorized_telemetry_request) send_appearance = self.config["telemetry_send_appearance"] or send_telemetry if send_telemetry and self.latest_packed_telemetry != None: @@ -3379,7 +3429,7 @@ class SidebandCore(): return if message.signature_validated and LXMF.FIELD_COMMANDS in message.fields: - if self.requests_allowed_from(context_dest): + if self.allow_request_from(context_dest): commands = message.fields[LXMF.FIELD_COMMANDS] self.handle_commands(commands, message) else: @@ -3391,7 +3441,7 @@ class SidebandCore(): self.lxm_ingest(message) except Exception as e: - RNS.log("Error while ingesting LXMF message "+RNS.prettyhexrep(message.hash)+" to database: "+str(e)) + RNS.log("Error while ingesting LXMF message "+RNS.prettyhexrep(message.hash)+" to database: "+str(e), RNS.LOG_ERROR) def handle_commands(self, commands, message): try: @@ -3403,7 +3453,7 @@ class SidebandCore(): RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG) if self.config["telemetry_collector_enabled"]: RNS.log(f"Collector requests enabled, returning complete telemetry response for all known objects since {timebase}", RNS.LOG_DEBUG) - self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase) + self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase, is_authorized_telemetry_request=True) else: RNS.log("Responding with own latest telemetry", RNS.LOG_DEBUG) self.send_latest_telemetry(to_addr=context_dest) @@ -3436,25 +3486,33 @@ class SidebandCore(): except Exception as e: RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR) - def create_telemetry_collector_response(self, to_addr, timebase): - sources = {} + def create_telemetry_collector_response(self, to_addr, timebase, is_authorized_telemetry_request=False): + added_sources = {} sources = self.list_telemetry(after=timebase) only_latest = self.config["telemetry_requests_only_send_latest"] + elements = 0; added = 0 telemetry_stream = [] for source in sources: if source != to_addr: for entry in sources[source]: + elements += 1 timestamp = entry[0]; packed_telemetry = entry[1] te = [source, timestamp, packed_telemetry] if only_latest: - if not source in sources: - sources[source] = True + if not source in added_sources: + added_sources[source] = True telemetry_stream.append(te) + added += 1 else: telemetry_stream.append(te) + added += 1 - self.send_latest_telemetry(to_addr=to_addr, stream=telemetry_stream) + return self.send_latest_telemetry( + to_addr=to_addr, + stream=telemetry_stream, + is_authorized_telemetry_request=is_authorized_telemetry_request + ) def get_display_name_bytes(self): diff --git a/sbapp/ui/conversations.py b/sbapp/ui/conversations.py index 376be2d..9c689ed 100644 --- a/sbapp/ui/conversations.py +++ b/sbapp/ui/conversations.py @@ -136,8 +136,6 @@ class Conversations(): icon_color=fg, md_bg_color=bg, on_release=self.app.conversation_action) - RNS.log("ICON "+str(iconl.md_bg_color)) - iconl._default_icon_pad = dp(ic_p) iconl.icon_size = dp(ic_s) diff --git a/sbapp/ui/objectdetails.py b/sbapp/ui/objectdetails.py index 3737503..2248e85 100644 --- a/sbapp/ui/objectdetails.py +++ b/sbapp/ui/objectdetails.py @@ -140,6 +140,12 @@ class ObjectDetails(): pds = self.app.sideband.peer_display_name(source_dest) appearance = self.app.sideband.peer_appearance(source_dest) self.screen.ids.name_label.text = pds + + if source_dest == own_address: + self.screen.ids.name_label.text = pds+" (this device)" + elif source_dest == self.app.sideband.config["telemetry_collector"]: + self.screen.ids.name_label.text = pds+" (collector)" + self.screen.ids.coordinates_button.disabled = True self.screen.ids.object_appearance.icon = appearance[0] self.screen.ids.object_appearance.icon_color = appearance[1] @@ -170,8 +176,6 @@ class ObjectDetails(): relative_to = None if source_dest != own_address: relative_to = self.app.sideband.telemeter - else: - self.screen.ids.name_label.text = pds+" (this device)" rendered_telemetry = telemeter.render(relative_to=relative_to) if "location" in telemeter.sensors: @@ -291,7 +295,7 @@ class RVDetails(MDRecycleView): "Battery": 70, "Timestamp": 80, "Received": 90, - "Information": 100, + "Information": 5, } self.entries = [] rendered_telemetry.sort(key=lambda s: sort[s["name"]] if s["name"] in sort else 1000) @@ -323,29 +327,43 @@ class RVDetails(MDRecycleView): formatted_values = f"[b]Information[/b]: {external_text}" elif name == "Received": formatted_values = "" - by = s["values"]["by"]; by_str = "" - if by != None: - if by == self.app.sideband.lxmf_destination.hash: - by_str = "Directly by [b]this device[/b]" - else: - dstr = self.app.sideband.peer_display_name(by) - by_str = f"By [b]{dstr}[/b]" - formatted_values+=by_str + by = s["values"]["by"]; + via = s["values"]["via"]; - via = s["values"]["via"]; via_str = "" - if via != None: + if by == self.app.sideband.lxmf_destination.hash: if via == self.delegate.object_hash: - via_str = "directly [b]from emitter[/b]" + formatted_values = "Collected directly by [b]this device[/b], directly [b]from emitter[/b]" else: - dstr = self.app.sideband.peer_display_name(by) - via_str = f"via [b]{dstr}[/b]" - if len(formatted_values) != 0: formatted_values += ", " - formatted_values += via_str - - if formatted_values != "": - formatted_values = f"Collected {formatted_values}" + via_str = self.app.sideband.peer_display_name(via) + if via_str == None: + via_str = "an [b]unknown peer[/b]" + formatted_values = f"Collected directly by [b]this device[/b], via {via_str}" else: + if via != None and via == by: + vstr = self.app.sideband.peer_display_name(via) + formatted_values = f"Received from and collected by [b]{vstr}[/b]" + + else: + if via != None: + vstr = self.app.sideband.peer_display_name(via) + via_str = f"Received from [b]{vstr}[/b]" + else: + via_str = "Received from an [b]unknown peer[/b]" + + if by != None: + dstr = self.app.sideband.peer_display_name(by) + by_str = f", collected by [b]{dstr}[/b]" + else: + by_str = f", collected by an [b]unknown peer[/b]" + + formatted_values = f"{via_str}{by_str}" + + if formatted_values == "": formatted_values = None + + if not by == self.app.sideband.lxmf_destination.hash and not self.app.sideband.is_trusted(by): + extra_entries.append({"icon": "alert", "text": "Collected by a [b]non-trusted[/b] peer"}) + elif name == "Battery": p = s["values"]["percent"] cs = s["values"]["_meta"]