From 3b7d1b78929c8ee5dc5f6bba25597c3c41dca54a Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 5 Dec 2023 20:37:05 +0100 Subject: [PATCH] Fixed incorrect defaults for IFAC size after increase in default values in RNS --- sbapp/sideband/core.py | 14 +- sbapp/sideband/geo.py | 14 + sbapp/ui/objectdetails.py | 519 +++++++++++++++++++------------------- 3 files changed, 288 insertions(+), 259 deletions(-) diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index f36bf83..e7225fb 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -2810,6 +2810,11 @@ class SidebandCore(): else: ifac_netkey = self.config["connect_tcp_ifac_passphrase"] + if ifac_netname != None or ifac_netkey != None: + ifac_size = 16 + else: + ifac_size = None + tcpinterface = RNS.Interfaces.TCPInterface.TCPClientInterface( RNS.Transport, "TCPClientInterface", @@ -2834,7 +2839,7 @@ class SidebandCore(): else: if_mode = None - self.reticulum._add_interface(tcpinterface, mode=if_mode, ifac_netname=ifac_netname, ifac_netkey=ifac_netkey) + self.reticulum._add_interface(tcpinterface, mode=if_mode, ifac_netname=ifac_netname, ifac_netkey=ifac_netkey, ifac_size=ifac_size) self.interface_tcp = tcpinterface except Exception as e: @@ -2857,6 +2862,11 @@ class SidebandCore(): else: ifac_netkey = self.config["connect_i2p_ifac_passphrase"] + if ifac_netname != None or ifac_netkey != None: + ifac_size = 16 + else: + ifac_size = None + i2pinterface = RNS.Interfaces.I2PInterface.I2PInterface( RNS.Transport, "I2PInterface", @@ -2880,7 +2890,7 @@ class SidebandCore(): else: if_mode = None - self.reticulum._add_interface(i2pinterface, mode = if_mode, ifac_netname=ifac_netname, ifac_netkey=ifac_netkey) + self.reticulum._add_interface(i2pinterface, mode = if_mode, ifac_netname=ifac_netname, ifac_netkey=ifac_netkey, ifac_size=ifac_size) for si in RNS.Transport.interfaces: if type(si) == RNS.Interfaces.I2PInterface.I2PInterfacePeer: diff --git a/sbapp/sideband/geo.py b/sbapp/sideband/geo.py index 8c00a34..881db80 100644 --- a/sbapp/sideband/geo.py +++ b/sbapp/sideband/geo.py @@ -290,6 +290,20 @@ def shared_radio_horizon(c1, c2,): "antenna_distance": antenna_distance } +def ghtest(): + import pygeodesy + from pygeodesy.ellipsoidalKarney import LatLon + ginterpolator = pygeodesy.GeoidKarney("./assets/geoids/egm2008-5.pgm") + + # Make an example location + lat=51.416422 + lon=-116.217151 + + # Get the geoid height + single_position=LatLon(lat, lon) + h = ginterpolator(single_position) + print(h) + # def tests(): # import RNS # import numpy as np diff --git a/sbapp/ui/objectdetails.py b/sbapp/ui/objectdetails.py index 9424195..faa8a27 100644 --- a/sbapp/ui/objectdetails.py +++ b/sbapp/ui/objectdetails.py @@ -282,280 +282,285 @@ class RVDetails(MDRecycleView): self.data = [] def update_source(self, rendered_telemetry=None): - if not rendered_telemetry: - rendered_telemetry = [] - - sort = { - "Physical Link": 10, - "Location": 20, - "Ambient Light": 30, - "Ambient Temperature": 40, - "Relative Humidity": 50, - "Ambient Pressure": 60, - "Battery": 70, - "Timestamp": 80, - "Received": 90, - "Information": 5, - } - self.entries = [] - rendered_telemetry.sort(key=lambda s: sort[s["name"]] if s["name"] in sort else 1000) - for s in rendered_telemetry: - extra_entries = [] - def pass_job(sender=None): - pass - release_function = pass_job - formatted_values = None - name = s["name"] - if name == "Timestamp": - ts = s["values"]["UTC"] - if ts != None: - ts_str = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") - formatted_values = f"Recorded [b]{RNS.prettytime(time.time()-ts, compact=True)} ago[/b] ({ts_str})" - def copy_info(e=None): - Clipboard.copy(ts_str) - toast("Copied to clipboard") - release_function = copy_info - elif name == "Information": - info = s["values"]["contents"] - if info != None: - istr = str(info) - def copy_info(e=None): - Clipboard.copy(istr) - toast("Copied to clipboard") - release_function = copy_info - external_text = escape_markup(istr) - formatted_values = f"[b]Information[/b]: {external_text}" - elif name == "Received": - formatted_values = "" - by = s["values"]["by"]; - via = s["values"]["via"]; + try: + if not rendered_telemetry: + rendered_telemetry = [] + + sort = { + "Physical Link": 10, + "Location": 20, + "Ambient Light": 30, + "Ambient Temperature": 40, + "Relative Humidity": 50, + "Ambient Pressure": 60, + "Battery": 70, + "Timestamp": 80, + "Received": 90, + "Information": 5, + } + self.entries = [] + rendered_telemetry.sort(key=lambda s: sort[s["name"]] if s["name"] in sort else 1000) + for s in rendered_telemetry: + extra_entries = [] + def pass_job(sender=None): + pass + release_function = pass_job + formatted_values = None + name = s["name"] + if name == "Timestamp": + ts = s["values"]["UTC"] + if ts != None: + ts_str = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") + formatted_values = f"Recorded [b]{RNS.prettytime(time.time()-ts, compact=True)} ago[/b] ({ts_str})" + def copy_info(e=None): + Clipboard.copy(ts_str) + toast("Copied to clipboard") + release_function = copy_info + elif name == "Information": + info = s["values"]["contents"] + if info != None: + istr = str(info) + def copy_info(e=None): + Clipboard.copy(istr) + toast("Copied to clipboard") + release_function = copy_info + external_text = escape_markup(istr) + formatted_values = f"[b]Information[/b]: {external_text}" + elif name == "Received": + formatted_values = "" + by = s["values"]["by"]; + via = s["values"]["via"]; - if by == self.app.sideband.lxmf_destination.hash: - if via == self.delegate.object_hash: - formatted_values = "Collected directly by [b]this device[/b], directly [b]from emitter[/b]" + if by == self.app.sideband.lxmf_destination.hash: + if via == self.delegate.object_hash: + formatted_values = "Collected directly by [b]this device[/b], directly [b]from emitter[/b]" + else: + via_str = self.app.sideband.peer_display_name(via) + if via_str == None: + via_str = "an [b]unknown peer[/b]" + formatted_values = f"Collected directly by [b]this device[/b], via {via_str}" else: - via_str = self.app.sideband.peer_display_name(via) - if via_str == None: - via_str = "an [b]unknown peer[/b]" - formatted_values = f"Collected directly by [b]this device[/b], via {via_str}" - else: - if via != None and via == by: - vstr = self.app.sideband.peer_display_name(via) - formatted_values = f"Received from, and collected by [b]{vstr}[/b]" - - else: - if via != None: + if via != None and via == by: vstr = self.app.sideband.peer_display_name(via) - via_str = f"Received from [b]{vstr}[/b]" - else: - via_str = "Received from an [b]unknown peer[/b]" + formatted_values = f"Received from, and collected by [b]{vstr}[/b]" - if by != None: - dstr = self.app.sideband.peer_display_name(by) - by_str = f", collected by [b]{dstr}[/b]" else: - by_str = f", collected by an [b]unknown peer[/b]" - - formatted_values = f"{via_str}{by_str}" - - if formatted_values == "": - formatted_values = None - - if not by == self.app.sideband.lxmf_destination.hash and not self.app.sideband.is_trusted(by): - extra_entries.append({"icon": "alert", "text": "Collected by a [b]non-trusted[/b] peer"}) - - elif name == "Battery": - p = s["values"]["percent"] - cs = s["values"]["_meta"] - if cs != None: cs_str = f" ({cs})" - if p != None: formatted_values = f"{name} [b]{p}%[/b]"+cs_str - elif name == "Ambient Pressure": - p = s["values"]["mbar"] - if p != None: formatted_values = f"{name} [b]{p} mbar[/b]" - dt = "mbar" - if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: - d = s["deltas"][dt] - formatted_values += f" (Δ = {d} mbar)" - elif name == "Ambient Temperature": - c = s["values"]["c"] - if c != None: formatted_values = f"{name} [b]{c}° C[/b]" - dt = "c" - if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: - d = s["deltas"][dt] - formatted_values += f" (Δ = {d}° C)" - elif name == "Relative Humidity": - r = s["values"]["percent"] - if r != None: formatted_values = f"{name} [b]{r}%[/b]" - dt = "percent" - if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: - d = s["deltas"][dt] - formatted_values += f" (Δ = {d}%)" - elif name == "Physical Link": - rssi = s["values"]["rssi"]; rssi_str = None - snr = s["values"]["snr"]; snr_str = None - q = s["values"]["q"]; q_str = None - if q != None: q_str = f"Link Quality [b]{q}%[/b]" - if rssi != None: - rssi_str = f"RSSI [b]{rssi} dBm[/b]" - if q != None: rssi_str = ", "+rssi_str - if snr != None: - snr_str = f"SNR [b]{snr} dB[/b]" - if q != None or rssi != None: snr_str = ", "+snr_str - if q_str or rssi_str or snr_str: - formatted_values = q_str+rssi_str+snr_str - elif name == "Location": - lat = s["values"]["latitude"] - lon = s["values"]["longitude"] - alt = s["values"]["altitude"] - speed = s["values"]["speed"] - heading = s["values"]["heading"] - accuracy = s["values"]["accuracy"] - updated = s["values"]["updated"] - updated_str = f", logged [b]{RNS.prettytime(time.time()-updated, compact=True)} ago[/b]" - - coords = f"{lat}, {lon}" - fcoords = f"{round(lat,4)}, {round(lon,4)}" - self.delegate.coords = coords - if alt == 0: - alt_str = "0" - else: - alt_str = RNS.prettydistance(alt) - formatted_values = f"Coordinates [b]{fcoords}[/b], altitude [b]{alt_str}[/b]" - if speed != None: - if speed > 0.000001: - speed_formatted_values = f"Speed [b]{speed} Km/h[/b], heading [b]{heading}°[/b]" - else: - speed_formatted_values = f"Speed [b]0 Km/h[/b]" - else: - speed_formatted_values = None - extra_formatted_values = f"Uncertainty [b]{accuracy} meters[/b]"+updated_str - - data = {"icon": s["icon"], "text": f"{formatted_values}"} - - extra_entries.append({"icon": "map-marker-question", "text": extra_formatted_values}) - if speed_formatted_values != None: - extra_entries.append({"icon": "speedometer", "text": speed_formatted_values}) - - if "distance" in s: - if "orthodromic" in s["distance"]: - od = s["distance"]["orthodromic"] - if od != None: - od_text = f"Geodesic distance [b]{RNS.prettydistance(od)}[/b]" - extra_entries.append({"icon": "earth", "text": od_text}) - - if "euclidian" in s["distance"]: - ed = s["distance"]["euclidian"] - if ed != None: - ed_text = f"Euclidian distance [b]{RNS.prettydistance(ed)}[/b]" - extra_entries.append({"icon": "axis-arrow", "text": ed_text}) - - if "vertical" in s["distance"]: - vd = s["distance"]["vertical"] - if vd != None: - if vd < 0: - relstr = "lower" - vd = abs(vd) + if via != None: + vstr = self.app.sideband.peer_display_name(via) + via_str = f"Received from [b]{vstr}[/b]" else: - relstr = "greater" - vd_text = f"Altitude is [b]{RNS.prettydistance(vd)}[/b] {relstr} than this device" - extra_entries.append({"icon": "altimeter", "text": vd_text}) + via_str = "Received from an [b]unknown peer[/b]" + + if by != None: + dstr = self.app.sideband.peer_display_name(by) + by_str = f", collected by [b]{dstr}[/b]" + else: + by_str = f", collected by an [b]unknown peer[/b]" - if "angle_to_horizon" in s["values"]: - oath = s["values"]["angle_to_horizon"] - if oath != None: - if self.delegate.viewing_self: - oath_text = f"Local horizon is at [b]{round(oath,3)}°[/b]" - else: - oath_text = f"Object's horizon is at [b]{round(oath,3)}°[/b]" - extra_entries.append({"icon": "arrow-split-horizontal", "text": oath_text}) + formatted_values = f"{via_str}{by_str}" - if self.delegate.viewing_self and "radio_horizon" in s["values"]: - orh = s["values"]["radio_horizon"] - if orh != None: - range_text = RNS.prettydistance(orh) - rh_formatted_text = f"Radio horizon of [b]{range_text}[/b]" - extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) + if formatted_values == "": + formatted_values = None - if "azalt" in s and "local_angle_to_horizon" in s["azalt"]: - lath = s["azalt"]["local_angle_to_horizon"] - if lath != None: - lath_text = f"Local horizon is at [b]{round(lath,3)}°[/b]" - extra_entries.append({"icon": "align-vertical-distribute", "text": lath_text}) - - if "azalt" in s: - azalt_formatted_text = "" - if "azimuth" in s["azalt"]: - az = s["azalt"]["azimuth"] - az_text = f"Azimuth [b]{round(az,3)}°[/b]" - azalt_formatted_text += az_text + if not by == self.app.sideband.lxmf_destination.hash and not self.app.sideband.is_trusted(by): + extra_entries.append({"icon": "alert", "text": "Collected by a [b]non-trusted[/b] peer"}) - if "altitude" in s["azalt"]: - al = s["azalt"]["altitude"] - al_text = f"altitude [b]{round(al,3)}°[/b]" - if len(azalt_formatted_text) != 0: azalt_formatted_text += ", " - azalt_formatted_text += al_text - - extra_entries.append({"icon": "compass-rose", "text": azalt_formatted_text}) - - if "above_horizon" in s["azalt"]: - astr = "above" if s["azalt"]["above_horizon"] == True else "below" - dstr = str(round(s["azalt"]["altitude_delta"], 3)) - ah_text = f"Object is [b]{astr}[/b] the horizon (Δ = {dstr}°)" - extra_entries.append({"icon": "angle-acute", "text": ah_text}) - - if not self.delegate.viewing_self and "radio_horizon" in s["values"]: - orh = s["values"]["radio_horizon"] - if orh != None: - range_text = RNS.prettydistance(orh) - rh_formatted_text = f"Object's radio horizon is [b]{range_text}[/b]" - extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) - - if "radio_horizon" in s: - rh_icon = "circle-outline" - crange_text = RNS.prettydistance(s["radio_horizon"]["combined_range"]) - if s["radio_horizon"]["within_range"]: - rh_formatted_text = f"[b]Within[/b] shared radio horizon of [b]{crange_text}[/b]" - rh_icon = "set-none" - else: - rh_formatted_text = f"[b]Outside[/b] shared radio horizon of [b]{crange_text}[/b]" - - extra_entries.append({"icon": rh_icon, "text": rh_formatted_text}) - - def select(e=None): - geo_uri = f"geo:{lat},{lon}" - def lj(): - webbrowser.open(geo_uri) - threading.Thread(target=lj, daemon=True).start() - - release_function = select - else: - formatted_values = f"{name}" - for vn in s["values"]: - v = s["values"][vn] - formatted_values += f" [b]{v} {vn}[/b]" - - dt = vn + elif name == "Battery": + p = s["values"]["percent"] + cs = s["values"]["_meta"] + if cs != None: cs_str = f" ({cs})" + if p != None: formatted_values = f"{name} [b]{p}%[/b]"+cs_str + elif name == "Ambient Pressure": + p = s["values"]["mbar"] + if p != None: formatted_values = f"{name} [b]{p} mbar[/b]" + dt = "mbar" if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: d = s["deltas"][dt] - formatted_values += f" (Δ = {d} {vn})" - - data = None - if formatted_values != None: - if release_function: - data = {"icon": s["icon"], "text": f"{formatted_values}", "on_release": release_function} - else: + formatted_values += f" (Δ = {d} mbar)" + elif name == "Ambient Temperature": + c = s["values"]["c"] + if c != None: formatted_values = f"{name} [b]{c}° C[/b]" + dt = "c" + if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: + d = s["deltas"][dt] + formatted_values += f" (Δ = {d}° C)" + elif name == "Relative Humidity": + r = s["values"]["percent"] + if r != None: formatted_values = f"{name} [b]{r}%[/b]" + dt = "percent" + if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: + d = s["deltas"][dt] + formatted_values += f" (Δ = {d}%)" + elif name == "Physical Link": + rssi = s["values"]["rssi"]; rssi_str = None + snr = s["values"]["snr"]; snr_str = None + q = s["values"]["q"]; q_str = None + if q != None: q_str = f"Link Quality [b]{q}%[/b]" + if rssi != None: + rssi_str = f"RSSI [b]{rssi} dBm[/b]" + if q != None: rssi_str = ", "+rssi_str + if snr != None: + snr_str = f"SNR [b]{snr} dB[/b]" + if q != None or rssi != None: snr_str = ", "+snr_str + if q_str or rssi_str or snr_str: + formatted_values = q_str+rssi_str+snr_str + elif name == "Location": + lat = s["values"]["latitude"] + lon = s["values"]["longitude"] + alt = s["values"]["altitude"] + speed = s["values"]["speed"] + heading = s["values"]["heading"] + accuracy = s["values"]["accuracy"] + updated = s["values"]["updated"] + updated_str = f", logged [b]{RNS.prettytime(time.time()-updated, compact=True)} ago[/b]" + + coords = f"{lat}, {lon}" + fcoords = f"{round(lat,4)}, {round(lon,4)}" + self.delegate.coords = coords + if alt == 0: + alt_str = "0" + else: + alt_str = RNS.prettydistance(alt) + formatted_values = f"Coordinates [b]{fcoords}[/b], altitude [b]{alt_str}[/b]" + if speed != None: + if speed > 0.000001: + speed_formatted_values = f"Speed [b]{speed} Km/h[/b], heading [b]{heading}°[/b]" + else: + speed_formatted_values = f"Speed [b]0 Km/h[/b]" + else: + speed_formatted_values = None + extra_formatted_values = f"Uncertainty [b]{accuracy} meters[/b]"+updated_str + data = {"icon": s["icon"], "text": f"{formatted_values}"} - if data != None: - self.entries.append(data) - for extra in extra_entries: - self.entries.append(extra) + extra_entries.append({"icon": "map-marker-question", "text": extra_formatted_values}) + if speed_formatted_values != None: + extra_entries.append({"icon": "speedometer", "text": speed_formatted_values}) - if len(self.entries) == 0: - self.entries.append({"icon": "timeline-question-outline", "text": f"No telemetry available for this device"}) + if "distance" in s: + if "orthodromic" in s["distance"]: + od = s["distance"]["orthodromic"] + if od != None: + od_text = f"Geodesic distance [b]{RNS.prettydistance(od)}[/b]" + extra_entries.append({"icon": "earth", "text": od_text}) + + if "euclidian" in s["distance"]: + ed = s["distance"]["euclidian"] + if ed != None: + ed_text = f"Euclidian distance [b]{RNS.prettydistance(ed)}[/b]" + extra_entries.append({"icon": "axis-arrow", "text": ed_text}) + + if "vertical" in s["distance"]: + vd = s["distance"]["vertical"] + if vd != None: + if vd < 0: + relstr = "lower" + vd = abs(vd) + else: + relstr = "greater" + vd_text = f"Altitude is [b]{RNS.prettydistance(vd)}[/b] {relstr} than this device" + extra_entries.append({"icon": "altimeter", "text": vd_text}) - self.data = self.entries + if "angle_to_horizon" in s["values"]: + oath = s["values"]["angle_to_horizon"] + if oath != None: + if self.delegate.viewing_self: + oath_text = f"Local horizon is at [b]{round(oath,3)}°[/b]" + else: + oath_text = f"Object's horizon is at [b]{round(oath,3)}°[/b]" + extra_entries.append({"icon": "arrow-split-horizontal", "text": oath_text}) + if self.delegate.viewing_self and "radio_horizon" in s["values"]: + orh = s["values"]["radio_horizon"] + if orh != None: + range_text = RNS.prettydistance(orh) + rh_formatted_text = f"Radio horizon of [b]{range_text}[/b]" + extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) + + if "azalt" in s and "local_angle_to_horizon" in s["azalt"]: + lath = s["azalt"]["local_angle_to_horizon"] + if lath != None: + lath_text = f"Local horizon is at [b]{round(lath,3)}°[/b]" + extra_entries.append({"icon": "align-vertical-distribute", "text": lath_text}) + + if "azalt" in s: + azalt_formatted_text = "" + if "azimuth" in s["azalt"]: + az = s["azalt"]["azimuth"] + az_text = f"Azimuth [b]{round(az,3)}°[/b]" + azalt_formatted_text += az_text + + if "altitude" in s["azalt"]: + al = s["azalt"]["altitude"] + al_text = f"altitude [b]{round(al,3)}°[/b]" + if len(azalt_formatted_text) != 0: azalt_formatted_text += ", " + azalt_formatted_text += al_text + + extra_entries.append({"icon": "compass-rose", "text": azalt_formatted_text}) + + if "above_horizon" in s["azalt"]: + astr = "above" if s["azalt"]["above_horizon"] == True else "below" + dstr = str(round(s["azalt"]["altitude_delta"], 3)) + ah_text = f"Object is [b]{astr}[/b] the horizon (Δ = {dstr}°)" + extra_entries.append({"icon": "angle-acute", "text": ah_text}) + + if not self.delegate.viewing_self and "radio_horizon" in s["values"]: + orh = s["values"]["radio_horizon"] + if orh != None: + range_text = RNS.prettydistance(orh) + rh_formatted_text = f"Object's radio horizon is [b]{range_text}[/b]" + extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) + + if "radio_horizon" in s: + rh_icon = "circle-outline" + crange_text = RNS.prettydistance(s["radio_horizon"]["combined_range"]) + if s["radio_horizon"]["within_range"]: + rh_formatted_text = f"[b]Within[/b] shared radio horizon of [b]{crange_text}[/b]" + rh_icon = "set-none" + else: + rh_formatted_text = f"[b]Outside[/b] shared radio horizon of [b]{crange_text}[/b]" + + extra_entries.append({"icon": rh_icon, "text": rh_formatted_text}) + + def select(e=None): + geo_uri = f"geo:{lat},{lon}" + def lj(): + webbrowser.open(geo_uri) + threading.Thread(target=lj, daemon=True).start() + + release_function = select + else: + formatted_values = f"{name}" + for vn in s["values"]: + v = s["values"][vn] + formatted_values += f" [b]{v} {vn}[/b]" + + dt = vn + if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: + d = s["deltas"][dt] + formatted_values += f" (Δ = {d} {vn})" + + data = None + if formatted_values != None: + if release_function: + data = {"icon": s["icon"], "text": f"{formatted_values}", "on_release": release_function} + else: + data = {"icon": s["icon"], "text": f"{formatted_values}"} + + if data != None: + self.entries.append(data) + for extra in extra_entries: + self.entries.append(extra) + + if len(self.entries) == 0: + self.entries.append({"icon": "timeline-question-outline", "text": f"No telemetry available for this device"}) + + self.data = self.entries + except Exception as e: + import traceback + exception_info = "".join(traceback.TracebackException.from_exception(e).format()) + RNS.log(f"An {str(type(e))} occurred while updating service telemetry: {str(e)}", RNS.LOG_ERROR) + RNS.log(exception_info, RNS.LOG_ERROR) layout_object_details = """