From 4c9de8b1849c5d997ba7d15f240772da647afcc4 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 1 Jun 2024 11:11:29 +0200 Subject: [PATCH] Added CPU, storage and RAM sensors. Added custom sensor. --- sbapp/sideband/sense.py | 358 ++++++++++++++++++- sbapp/ui/objectdetails.py | 709 +++++++++++++++++++++----------------- 2 files changed, 744 insertions(+), 323 deletions(-) diff --git a/sbapp/sideband/sense.py b/sbapp/sideband/sense.py index ff7f1ae..ba76347 100644 --- a/sbapp/sideband/sense.py +++ b/sbapp/sideband/sense.py @@ -53,6 +53,7 @@ class Telemeter(): Sensor.SID_PROXIMITY: Proximity, Sensor.SID_POWER_CONSUMPTION: PowerConsumption, Sensor.SID_POWER_PRODUCTION: PowerProduction, Sensor.SID_PROCESSOR: Processor, Sensor.SID_RAM: RandomAccessMemory, Sensor.SID_NVM: NonVolatileMemory, + Sensor.SID_CUSTOM: Custom, } self.available = { "time": Sensor.SID_TIME, @@ -65,6 +66,7 @@ class Telemeter(): "acceleration": Sensor.SID_ACCELERATION, "proximity": Sensor.SID_PROXIMITY, "power_consumption": Sensor.SID_POWER_CONSUMPTION, "power_production": Sensor.SID_POWER_PRODUCTION, "processor": Sensor.SID_PROCESSOR, "ram": Sensor.SID_RAM, "nvm": Sensor.SID_NVM, + "custom": Sensor.SID_CUSTOM } self.from_packed = from_packed self.sensors = {} @@ -191,6 +193,7 @@ class Sensor(): SID_PROCESSOR = 0x13 SID_RAM = 0x14 SID_NVM = 0x15 + SID_CUSTOM = 0xff def __init__(self, sid = None, stale_time = None): self._telemeter = None @@ -1339,7 +1342,7 @@ class PowerConsumption(Sensor): def teardown_sensor(self): self.data = None - def update_consumer(self, power, type_label=None): + def update_consumer(self, power, type_label=None, custom_icon=None): if type_label == None: type_label = 0x00 elif type(type_label) != str: @@ -1348,7 +1351,7 @@ class PowerConsumption(Sensor): if self.data == None: self.data = {} - self.data[type_label] = power + self.data[type_label] = [power, custom_icon] return True def remove_consumer(self, type_label=None): @@ -1397,7 +1400,7 @@ class PowerConsumption(Sensor): label = "Power consumption" else: label = type_label - consumers.append({"label": label, "w": self.data[type_label]}) + consumers.append({"label": label, "w": self.data[type_label][0], "custom_icon": self.data[type_label][1]}) rendered = { "icon": "power-plug-outline", @@ -1420,7 +1423,7 @@ class PowerProduction(Sensor): def teardown_sensor(self): self.data = None - def update_producer(self, power, type_label=None): + def update_producer(self, power, type_label=None, custom_icon=None): if type_label == None: type_label = 0x00 elif type(type_label) != str: @@ -1429,7 +1432,7 @@ class PowerProduction(Sensor): if self.data == None: self.data = {} - self.data[type_label] = power + self.data[type_label] = [power, custom_icon] return True def remove_producer(self, type_label=None): @@ -1478,7 +1481,7 @@ class PowerProduction(Sensor): label = "Power Production" else: label = type_label - producers.append({"label": label, "w": self.data[type_label]}) + producers.append({"label": label, "w": self.data[type_label][0], "custom_icon": self.data[type_label][1]}) rendered = { "icon": "lightning-bolt", @@ -1488,12 +1491,347 @@ class PowerProduction(Sensor): return rendered -# TODO: Implement class Processor(Sensor): - pass + SID = Sensor.SID_PROCESSOR + STALE_TIME = 5 + + def __init__(self): + super().__init__(type(self).SID, type(self).STALE_TIME) + + def setup_sensor(self): + self.update_data() + + def teardown_sensor(self): + self.data = None + + def update_entry(self, current_load=0, load_avgs=None, clock=None, type_label=None): + if type_label == None: + type_label = 0x00 + elif type(type_label) != str: + return False + + if self.data == None: + self.data = {} + + self.data[type_label] = [current_load, load_avgs, clock] + return True + + def remove_entry(self, type_label=None): + if type_label == None: + type_label = 0x00 + + if type_label in self.data: + self.data.pop(type_label) + return True + + return False + + def update_data(self): + pass + + def pack(self): + d = self.data + if d == None: + return None + else: + packed = [] + for type_label in self.data: + packed.append([type_label, self.data[type_label]]) + return packed + + def unpack(self, packed): + try: + if packed == None: + return None + else: + unpacked = {} + for entry in packed: + unpacked[entry[0]] = entry[1] + return unpacked + + except: + return None + + def render(self, relative_to=None): + if self.data == None: + return None + + entries = [] + for type_label in self.data: + if type_label == 0x00: + label = "Storage" + else: + label = type_label + entries.append({ + "label": label, + "current_load": self.data[type_label][0], + "load_avgs": self.data[type_label][1], + "clock": self.data[type_label][2], + }) + + rendered = { + "icon": "chip", + "name": "Processor", + "values": entries, + } + + return rendered class RandomAccessMemory(Sensor): - pass + SID = Sensor.SID_RAM + STALE_TIME = 5 + + def __init__(self): + super().__init__(type(self).SID, type(self).STALE_TIME) + + def setup_sensor(self): + self.update_data() + + def teardown_sensor(self): + self.data = None + + def update_entry(self, capacity=0, used=0, type_label=None): + if type_label == None: + type_label = 0x00 + elif type(type_label) != str: + return False + + if self.data == None: + self.data = {} + + self.data[type_label] = [capacity, used] + return True + + def remove_entry(self, type_label=None): + if type_label == None: + type_label = 0x00 + + if type_label in self.data: + self.data.pop(type_label) + return True + + return False + + def update_data(self): + pass + + def pack(self): + d = self.data + if d == None: + return None + else: + packed = [] + for type_label in self.data: + packed.append([type_label, self.data[type_label]]) + return packed + + def unpack(self, packed): + try: + if packed == None: + return None + else: + unpacked = {} + for entry in packed: + unpacked[entry[0]] = entry[1] + return unpacked + + except: + return None + + def render(self, relative_to=None): + if self.data == None: + return None + + entries = [] + for type_label in self.data: + if type_label == 0x00: + label = "Storage" + else: + label = type_label + entries.append({ + "label": label, + "capacity": self.data[type_label][0], + "used": self.data[type_label][1], + "free": self.data[type_label][0]-self.data[type_label][1], + "percent": (self.data[type_label][1]/self.data[type_label][0])*100, + }) + + rendered = { + "icon": "memory", + "name": "Random Access Memory", + "values": entries, + } + + return rendered class NonVolatileMemory(Sensor): - pass \ No newline at end of file + SID = Sensor.SID_NVM + STALE_TIME = 5 + + def __init__(self): + super().__init__(type(self).SID, type(self).STALE_TIME) + + def setup_sensor(self): + self.update_data() + + def teardown_sensor(self): + self.data = None + + def update_entry(self, capacity=0, used=0, type_label=None): + if type_label == None: + type_label = 0x00 + elif type(type_label) != str: + return False + + if self.data == None: + self.data = {} + + self.data[type_label] = [capacity, used] + return True + + def remove_entry(self, type_label=None): + if type_label == None: + type_label = 0x00 + + if type_label in self.data: + self.data.pop(type_label) + return True + + return False + + def update_data(self): + pass + + def pack(self): + d = self.data + if d == None: + return None + else: + packed = [] + for type_label in self.data: + packed.append([type_label, self.data[type_label]]) + return packed + + def unpack(self, packed): + try: + if packed == None: + return None + else: + unpacked = {} + for entry in packed: + unpacked[entry[0]] = entry[1] + return unpacked + + except: + return None + + def render(self, relative_to=None): + if self.data == None: + return None + + entries = [] + for type_label in self.data: + if type_label == 0x00: + label = "Storage" + else: + label = type_label + entries.append({ + "label": label, + "capacity": self.data[type_label][0], + "used": self.data[type_label][1], + "free": self.data[type_label][0]-self.data[type_label][1], + "percent": (self.data[type_label][1]/self.data[type_label][0])*100, + }) + + rendered = { + "icon": "harddisk", + "name": "Non-Volatile Memory", + "values": entries, + } + + return rendered + +class Custom(Sensor): + SID = Sensor.SID_CUSTOM + STALE_TIME = 5 + + def __init__(self): + super().__init__(type(self).SID, type(self).STALE_TIME) + + def setup_sensor(self): + self.update_data() + + def teardown_sensor(self): + self.data = None + + def update_entry(self, value=None, type_label=None, custom_icon=None): + if type_label == None: + type_label = 0x00 + elif type(type_label) != str: + return False + + if self.data == None: + self.data = {} + + self.data[type_label] = [value, custom_icon] + return True + + def remove_entry(self, type_label=None): + if type_label == None: + type_label = 0x00 + + if type_label in self.data: + self.data.pop(type_label) + return True + + return False + + def update_data(self): + pass + + def pack(self): + d = self.data + if d == None: + return None + else: + packed = [] + for type_label in self.data: + packed.append([type_label, self.data[type_label]]) + return packed + + def unpack(self, packed): + try: + if packed == None: + return None + else: + unpacked = {} + for entry in packed: + unpacked[entry[0]] = entry[1] + return unpacked + + except: + return None + + def render(self, relative_to=None): + if self.data == None: + return None + + entries = [] + for type_label in self.data: + if type_label == 0x00: + label = "Custom" + else: + label = type_label + entries.append({ + "label": label, + "value": self.data[type_label][0], + "custom_icon": self.data[type_label][1], + }) + + rendered = { + "icon": "ruler", + "name": "Custom", + "values": entries, + } + + return rendered diff --git a/sbapp/ui/objectdetails.py b/sbapp/ui/objectdetails.py index 2950239..0168101 100644 --- a/sbapp/ui/objectdetails.py +++ b/sbapp/ui/objectdetails.py @@ -305,6 +305,7 @@ class RVDetails(MDRecycleView): rendered_telemetry = [] sort = { + "Information": 5, "Physical Link": 10, "Location": 20, "Ambient Light": 30, @@ -312,9 +313,12 @@ class RVDetails(MDRecycleView): "Relative Humidity": 50, "Ambient Pressure": 60, "Battery": 70, + "Processor": 72, + "Random Access Memory": 74, + "Non-Volatile Memory": 76, "Timestamp": 80, + "Custom": 85, "Received": 90, - "Information": 5, } def pass_job(sender=None): @@ -323,328 +327,407 @@ class RVDetails(MDRecycleView): self.entries = [] rendered_telemetry.sort(key=lambda s: sort[s["name"]] if s["name"] in sort else 1000) for s in rendered_telemetry: - extra_entries = [] - release_function = pass_job - formatted_values = None - name = s["name"] - if name == "Timestamp": - ts = s["values"]["UTC"] - if ts != None: - ts_str = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") - formatted_values = f"Recorded [b]{RNS.prettytime(time.time()-ts, compact=True)} ago[/b] ({ts_str})" - def copy_info(e=None): - Clipboard.copy(ts_str) - toast("Copied to clipboard") - release_function = copy_info - elif name == "Information": - info = s["values"]["contents"] - if info != None: - istr = str(info) - def copy_info(e=None): - Clipboard.copy(istr) - toast("Copied to clipboard") - release_function = copy_info - external_text = multilingual_markup(escape_markup(istr).encode("utf-8")).decode("utf-8") - formatted_values = f"[b]Information[/b]: {external_text}" - elif name == "Received": - formatted_values = "" - by = s["values"]["by"]; - via = s["values"]["via"]; - - if by == self.app.sideband.lxmf_destination.hash: - if via == self.delegate.object_hash: - formatted_values = "Collected directly by [b]this device[/b], directly [b]from emitter[/b]" - else: - via_str = self.app.sideband.peer_display_name(via) - if via_str == None: - via_str = "an [b]unknown peer[/b]" - formatted_values = f"Collected directly by [b]this device[/b], via {via_str}" - else: - if via != None and via == by: - vstr = self.app.sideband.peer_display_name(via) - formatted_values = f"Received from, and collected by [b]{vstr}[/b]" - - else: - if via != None: - vstr = self.app.sideband.peer_display_name(via) - via_str = f"Received from [b]{vstr}[/b]" - else: - via_str = "Received from an [b]unknown peer[/b]" - - if by != None: - dstr = self.app.sideband.peer_display_name(by) - by_str = f", collected by [b]{dstr}[/b]" - else: - by_str = f", collected by an [b]unknown peer[/b]" - - formatted_values = f"{via_str}{by_str}" - - if formatted_values == "": - formatted_values = None - - if not by == self.app.sideband.lxmf_destination.hash and not self.app.sideband.is_trusted(by): - extra_entries.append({"icon": "alert", "text": "Collected by a [b]non-trusted[/b] peer"}) + try: + extra_entries = [] + release_function = pass_job + formatted_values = None + name = s["name"] - elif name == "Battery": - p = s["values"]["percent"] - cs = s["values"]["_meta"] - if cs != None: cs_str = f" ({cs})" - if p != None: formatted_values = f"{name} [b]{p}%[/b]"+cs_str - elif name == "Ambient Pressure": - p = s["values"]["mbar"] - if p != None: formatted_values = f"{name} [b]{p} mbar[/b]" - dt = "mbar" - if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: - d = s["deltas"][dt] - formatted_values += f" (Δ = {d} mbar)" - elif name == "Ambient Temperature": - c = s["values"]["c"] - if c != None: formatted_values = f"{name} [b]{c}° C[/b]" - dt = "c" - if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: - d = s["deltas"][dt] - formatted_values += f" (Δ = {d}° C)" - elif name == "Relative Humidity": - r = s["values"]["percent"] - if r != None: formatted_values = f"{name} [b]{r}%[/b]" - dt = "percent" - if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: - d = s["deltas"][dt] - formatted_values += f" (Δ = {d}%)" - elif name == "Physical Link": - rssi = s["values"]["rssi"]; rssi_str = None - snr = s["values"]["snr"]; snr_str = None - q = s["values"]["q"]; q_str = None - if q != None: q_str = f"Link Quality [b]{q}%[/b]" - if rssi != None: - rssi_str = f"RSSI [b]{rssi} dBm[/b]" - if q != None: rssi_str = ", "+rssi_str - if snr != None: - snr_str = f"SNR [b]{snr} dB[/b]" - if q != None or rssi != None: snr_str = ", "+snr_str - if q_str or rssi_str or snr_str: - formatted_values = q_str+rssi_str+snr_str - elif name == "Power Consumption": - cs = s["values"] - if cs != None: - for c in cs: - label = c["label"] - watts = c["w"] - prefix = "" - if watts < 1/1e6: - watts *= 1e9 - prefix = "n" - elif watts < 1/1e3: - watts *= 1e6 - prefix = "µ" - elif watts < 1: - watts *= 1e3 - prefix = "m" - elif watts >= 1e15: - watts /= 1e15 - prefix = "E" - elif watts >= 1e12: - watts /= 1e12 - prefix = "T" - elif watts >= 1e9: - watts /= 1e9 - prefix = "G" - elif watts >= 1e6: - watts /= 1e6 - prefix = "M" - elif watts >= 1e3: - watts /= 1e3 - prefix = "K" + if name == "Timestamp": + ts = s["values"]["UTC"] + if ts != None: + ts_str = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") + formatted_values = f"Recorded [b]{RNS.prettytime(time.time()-ts, compact=True)} ago[/b] ({ts_str})" + def copy_info(e=None): + Clipboard.copy(ts_str) + toast("Copied to clipboard") + release_function = copy_info + + elif name == "Information": + info = s["values"]["contents"] + if info != None: + istr = str(info) + def copy_info(e=None): + Clipboard.copy(istr) + toast("Copied to clipboard") + release_function = copy_info + external_text = multilingual_markup(escape_markup(istr).encode("utf-8")).decode("utf-8") + formatted_values = f"[b]Information[/b]: {external_text}" + + elif name == "Received": + formatted_values = "" + by = s["values"]["by"]; + via = s["values"]["via"]; - watts = round(watts, 2) - p_text = f"{label} [b]{watts} {prefix}W[/b]" - extra_entries.append({"icon": s["icon"], "text": p_text}) - - elif name == "Power Production": - cs = s["values"] - if cs != None: - for c in cs: - label = c["label"] - watts = c["w"] - prefix = "" - if watts < 1/1e6: - watts *= 1e9 - prefix = "n" - elif watts < 1/1e3: - watts *= 1e6 - prefix = "µ" - elif watts < 1: - watts *= 1e3 - prefix = "m" - elif watts >= 1e15: - watts /= 1e15 - prefix = "E" - elif watts >= 1e12: - watts /= 1e12 - prefix = "T" - elif watts >= 1e9: - watts /= 1e9 - prefix = "G" - elif watts >= 1e6: - watts /= 1e6 - prefix = "M" - elif watts >= 1e3: - watts /= 1e3 - prefix = "K" - - watts = round(watts, 2) - p_text = f"{label} [b]{watts} {prefix}W[/b]" - extra_entries.append({"icon": s["icon"], "text": p_text}) - - elif name == "Location": - lat = s["values"]["latitude"] - lon = s["values"]["longitude"] - alt = s["values"]["altitude"] - speed = s["values"]["speed"] - heading = s["values"]["heading"] - accuracy = s["values"]["accuracy"] - updated = s["values"]["updated"] - updated_str = f", logged [b]{RNS.prettytime(time.time()-updated, compact=True)} ago[/b]" - - coords = f"{lat}, {lon}" - fcoords = f"{round(lat,4)}, {round(lon,4)}" - self.delegate.coords = coords - if alt == 0: - alt_str = "0" - else: - alt_str = RNS.prettydistance(alt) - formatted_values = f"Coordinates [b]{fcoords}[/b], altitude [b]{alt_str}[/b]" - if speed != None: - if speed > 0.02: - speed_formatted_values = f"Speed [b]{speed} Km/h[/b], heading [b]{heading}°[/b]" - else: - # speed_formatted_values = f"Speed [b]0 Km/h[/b]" - speed_formatted_values = f"Object is [b]stationary[/b]" - else: - speed_formatted_values = None - extra_formatted_values = f"Uncertainty [b]{accuracy} meters[/b]"+updated_str - - data = {"icon": s["icon"], "text": f"{formatted_values}"} - - extra_entries.append({"icon": "map-marker-question", "text": extra_formatted_values}) - if speed_formatted_values != None: - extra_entries.append({"icon": "speedometer", "text": speed_formatted_values}) - - if "distance" in s: - if "orthodromic" in s["distance"]: - od = s["distance"]["orthodromic"] - if od != None: - od_text = f"Geodesic distance [b]{RNS.prettydistance(od)}[/b]" - extra_entries.append({"icon": "earth", "text": od_text}) - - if "euclidian" in s["distance"]: - ed = s["distance"]["euclidian"] - if ed != None: - ed_text = f"Euclidian distance [b]{RNS.prettydistance(ed)}[/b]" - extra_entries.append({"icon": "axis-arrow", "text": ed_text}) - - if "vertical" in s["distance"]: - vd = s["distance"]["vertical"] - if vd != None: - if vd < 0: - relstr = "lower" - vd = abs(vd) - else: - relstr = "greater" - vd_text = f"Altitude is [b]{RNS.prettydistance(vd)}[/b] {relstr} than this device" - extra_entries.append({"icon": "altimeter", "text": vd_text}) - - if "angle_to_horizon" in s["values"]: - oath = s["values"]["angle_to_horizon"] - if oath != None: - if self.delegate.viewing_self: - oath_text = f"Local horizon is at [b]{round(oath,3)}°[/b]" + if by == self.app.sideband.lxmf_destination.hash: + if via == self.delegate.object_hash: + formatted_values = "Collected directly by [b]this device[/b], directly [b]from emitter[/b]" else: - oath_text = f"Object's horizon is at [b]{round(oath,3)}°[/b]" - extra_entries.append({"icon": "arrow-split-horizontal", "text": oath_text}) - - if self.delegate.viewing_self and "radio_horizon" in s["values"]: - orh = s["values"]["radio_horizon"] - if orh != None: - range_text = RNS.prettydistance(orh) - rh_formatted_text = f"Radio horizon of [b]{range_text}[/b]" - extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) - - if "azalt" in s and "local_angle_to_horizon" in s["azalt"]: - lath = s["azalt"]["local_angle_to_horizon"] - if lath != None: - lath_text = f"Local horizon is at [b]{round(lath,3)}°[/b]" - extra_entries.append({"icon": "align-vertical-distribute", "text": lath_text}) - - if "azalt" in s: - azalt_formatted_text = "" - if "azimuth" in s["azalt"]: - az = s["azalt"]["azimuth"] - az_text = f"Azimuth [b]{round(az,3)}°[/b]" - azalt_formatted_text += az_text - - if "altitude" in s["azalt"]: - al = s["azalt"]["altitude"] - al_text = f"altitude [b]{round(al,3)}°[/b]" - if len(azalt_formatted_text) != 0: azalt_formatted_text += ", " - azalt_formatted_text += al_text - - extra_entries.append({"icon": "compass-rose", "text": azalt_formatted_text}) - - if "above_horizon" in s["azalt"]: - astr = "above" if s["azalt"]["above_horizon"] == True else "below" - dstr = str(round(s["azalt"]["altitude_delta"], 3)) - ah_text = f"Object is [b]{astr}[/b] the horizon (Δ = {dstr}°)" - extra_entries.append({"icon": "angle-acute", "text": ah_text}) - - if not self.delegate.viewing_self and "radio_horizon" in s["values"]: - orh = s["values"]["radio_horizon"] - if orh != None: - range_text = RNS.prettydistance(orh) - rh_formatted_text = f"Object's radio horizon is [b]{range_text}[/b]" - extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) - - if "radio_horizon" in s: - rh_icon = "circle-outline" - crange_text = RNS.prettydistance(s["radio_horizon"]["combined_range"]) - if s["radio_horizon"]["within_range"]: - rh_formatted_text = f"[b]Within[/b] shared radio horizon of [b]{crange_text}[/b]" - rh_icon = "set-none" + via_str = self.app.sideband.peer_display_name(via) + if via_str == None: + via_str = "an [b]unknown peer[/b]" + formatted_values = f"Collected directly by [b]this device[/b], via {via_str}" else: - rh_formatted_text = f"[b]Outside[/b] shared radio horizon of [b]{crange_text}[/b]" + if via != None and via == by: + vstr = self.app.sideband.peer_display_name(via) + formatted_values = f"Received from, and collected by [b]{vstr}[/b]" + + else: + if via != None: + vstr = self.app.sideband.peer_display_name(via) + via_str = f"Received from [b]{vstr}[/b]" + else: + via_str = "Received from an [b]unknown peer[/b]" + + if by != None: + dstr = self.app.sideband.peer_display_name(by) + by_str = f", collected by [b]{dstr}[/b]" + else: + by_str = f", collected by an [b]unknown peer[/b]" + + formatted_values = f"{via_str}{by_str}" + + if formatted_values == "": + formatted_values = None + + if not by == self.app.sideband.lxmf_destination.hash and not self.app.sideband.is_trusted(by): + extra_entries.append({"icon": "alert", "text": "Collected by a [b]non-trusted[/b] peer"}) - extra_entries.append({"icon": rh_icon, "text": rh_formatted_text}) - - def select(e=None): - geo_uri = f"geo:{lat},{lon}" - def lj(): - webbrowser.open(geo_uri) - threading.Thread(target=lj, daemon=True).start() - - release_function = select - else: - formatted_values = f"{name}" - for vn in s["values"]: - v = s["values"][vn] - formatted_values += f" [b]{v} {vn}[/b]" - - dt = vn + elif name == "Battery": + p = s["values"]["percent"] + cs = s["values"]["_meta"] + if cs != None: cs_str = f" ({cs})" + if p != None: formatted_values = f"{name} [b]{p}%[/b]"+cs_str + + elif name == "Ambient Pressure": + p = s["values"]["mbar"] + if p != None: formatted_values = f"{name} [b]{p} mbar[/b]" + dt = "mbar" if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: d = s["deltas"][dt] - formatted_values += f" (Δ = {d} {vn})" - - data = None - if formatted_values != None: - if release_function: - data = {"icon": s["icon"], "text": f"{formatted_values}", "on_release": release_function} + formatted_values += f" (Δ = {d} mbar)" + + elif name == "Ambient Temperature": + c = s["values"]["c"] + if c != None: formatted_values = f"{name} [b]{c}° C[/b]" + dt = "c" + if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: + d = s["deltas"][dt] + formatted_values += f" (Δ = {d}° C)" + + elif name == "Relative Humidity": + r = s["values"]["percent"] + if r != None: formatted_values = f"{name} [b]{r}%[/b]" + dt = "percent" + if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: + d = s["deltas"][dt] + formatted_values += f" (Δ = {d}%)" + + elif name == "Physical Link": + rssi = s["values"]["rssi"]; rssi_str = None + snr = s["values"]["snr"]; snr_str = None + q = s["values"]["q"]; q_str = None + if q != None: q_str = f"Link Quality [b]{q}%[/b]" + if rssi != None: + rssi_str = f"RSSI [b]{rssi} dBm[/b]" + if q != None: rssi_str = ", "+rssi_str + if snr != None: + snr_str = f"SNR [b]{snr} dB[/b]" + if q != None or rssi != None: snr_str = ", "+snr_str + if q_str or rssi_str or snr_str: + formatted_values = q_str+rssi_str+snr_str + + elif name == "Power Consumption": + cs = s["values"] + if cs != None: + for c in cs: + label = c["label"] + cicon = c["custom_icon"] + watts = c["w"] + prefix = "" + if watts < 1/1e6: + watts *= 1e9 + prefix = "n" + elif watts < 1/1e3: + watts *= 1e6 + prefix = "µ" + elif watts < 1: + watts *= 1e3 + prefix = "m" + elif watts >= 1e15: + watts /= 1e15 + prefix = "E" + elif watts >= 1e12: + watts /= 1e12 + prefix = "T" + elif watts >= 1e9: + watts /= 1e9 + prefix = "G" + elif watts >= 1e6: + watts /= 1e6 + prefix = "M" + elif watts >= 1e3: + watts /= 1e3 + prefix = "K" + + if cicon: + set_icon = cicon + else: + set_icon = s["icon"] + + watts = round(watts, 2) + p_text = f"{label} [b]{watts} {prefix}W[/b]" + extra_entries.append({"icon": set_icon, "text": p_text}) + + elif name == "Power Production": + cs = s["values"] + if cs != None: + for c in cs: + label = c["label"] + cicon = c["custom_icon"] + watts = c["w"] + prefix = "" + if watts < 1/1e6: + watts *= 1e9 + prefix = "n" + elif watts < 1/1e3: + watts *= 1e6 + prefix = "µ" + elif watts < 1: + watts *= 1e3 + prefix = "m" + elif watts >= 1e15: + watts /= 1e15 + prefix = "E" + elif watts >= 1e12: + watts /= 1e12 + prefix = "T" + elif watts >= 1e9: + watts /= 1e9 + prefix = "G" + elif watts >= 1e6: + watts /= 1e6 + prefix = "M" + elif watts >= 1e3: + watts /= 1e3 + prefix = "K" + + if cicon: + set_icon = cicon + else: + set_icon = s["icon"] + + watts = round(watts, 2) + p_text = f"{label} [b]{watts} {prefix}W[/b]" + extra_entries.append({"icon": set_icon, "text": p_text}) + + elif name == "Custom": + cs = s["values"] + if cs != None: + for c in cs: + label = c["label"] + cicon = c["custom_icon"] + value = str(c["value"]) + set_icon = cicon if cicon else s["icon"] + e_text = f"{label} [b]{value}[/b]" + extra_entries.append({"icon": set_icon, "text": e_text}) + + elif name == "Processor": + cs = s["values"] + if cs != None: + for c in cs: + label = c["label"] + load = c["current_load"] + avgs = c["load_avgs"] + clock = c["clock"] + pct = round(load*100, 1) + + avgs_str = f", averages are [b]{round(avgs[0],2)}[/b], [b]{round(avgs[1],2)}[/b], [b]{round(avgs[2],2)}[/b]" if avgs != None and len(avgs) == 3 else "" + clock_str = " at [b]"+RNS.prettyfrequency(clock)+"[/b]" if clock != None else "" + + e_text = f"Using [b]{pct}%[/b] of {label}{clock_str}{avgs_str}" + e_text = f"{label} use is [b]{pct}%[/b]{clock_str}{avgs_str}" + extra_entries.append({"icon": s["icon"], "text": e_text}) + + elif name == "Non-Volatile Memory": + cs = s["values"] + if cs != None: + for c in cs: + label = c["label"] + cap = RNS.prettysize(c["capacity"]) + use = RNS.prettysize(c["used"]) + free = RNS.prettysize(c["free"]) + pct = round(c["percent"], 1) + + e_text = f"{label} use is [b]{use}[/b] ([b]{pct}%[/b]) of [b]{cap}[/b], with [b]{free}[/b] free" + extra_entries.append({"icon": s["icon"], "text": e_text}) + + elif name == "Random Access Memory": + cs = s["values"] + if cs != None: + for c in cs: + label = c["label"] + cap = RNS.prettysize(c["capacity"]) + use = RNS.prettysize(c["used"]) + free = RNS.prettysize(c["free"]) + pct = round(c["percent"], 1) + + e_text = f"{label} use is [b]{use}[/b] ([b]{pct}%[/b]) of [b]{cap}[/b], with [b]{free}[/b] free" + extra_entries.append({"icon": s["icon"], "text": e_text}) + + elif name == "Location": + lat = s["values"]["latitude"] + lon = s["values"]["longitude"] + alt = s["values"]["altitude"] + speed = s["values"]["speed"] + heading = s["values"]["heading"] + accuracy = s["values"]["accuracy"] + updated = s["values"]["updated"] + updated_str = f", logged [b]{RNS.prettytime(time.time()-updated, compact=True)} ago[/b]" + + coords = f"{lat}, {lon}" + fcoords = f"{round(lat,4)}, {round(lon,4)}" + self.delegate.coords = coords + if alt == 0: + alt_str = "0" + else: + alt_str = RNS.prettydistance(alt) + formatted_values = f"Coordinates [b]{fcoords}[/b], altitude [b]{alt_str}[/b]" + if speed != None: + if speed > 0.02: + speed_formatted_values = f"Speed [b]{speed} Km/h[/b], heading [b]{heading}°[/b]" + else: + # speed_formatted_values = f"Speed [b]0 Km/h[/b]" + speed_formatted_values = f"Object is [b]stationary[/b]" + else: + speed_formatted_values = None + extra_formatted_values = f"Uncertainty [b]{accuracy} meters[/b]"+updated_str + + data = {"icon": s["icon"], "text": f"{formatted_values}"} + + extra_entries.append({"icon": "map-marker-question", "text": extra_formatted_values}) + if speed_formatted_values != None: + extra_entries.append({"icon": "speedometer", "text": speed_formatted_values}) + + if "distance" in s: + if "orthodromic" in s["distance"]: + od = s["distance"]["orthodromic"] + if od != None: + od_text = f"Geodesic distance [b]{RNS.prettydistance(od)}[/b]" + extra_entries.append({"icon": "earth", "text": od_text}) + + if "euclidian" in s["distance"]: + ed = s["distance"]["euclidian"] + if ed != None: + ed_text = f"Euclidian distance [b]{RNS.prettydistance(ed)}[/b]" + extra_entries.append({"icon": "axis-arrow", "text": ed_text}) + + if "vertical" in s["distance"]: + vd = s["distance"]["vertical"] + if vd != None: + if vd < 0: + relstr = "lower" + vd = abs(vd) + else: + relstr = "greater" + vd_text = f"Altitude is [b]{RNS.prettydistance(vd)}[/b] {relstr} than this device" + extra_entries.append({"icon": "altimeter", "text": vd_text}) + + if "angle_to_horizon" in s["values"]: + oath = s["values"]["angle_to_horizon"] + if oath != None: + if self.delegate.viewing_self: + oath_text = f"Local horizon is at [b]{round(oath,3)}°[/b]" + else: + oath_text = f"Object's horizon is at [b]{round(oath,3)}°[/b]" + extra_entries.append({"icon": "arrow-split-horizontal", "text": oath_text}) + + if self.delegate.viewing_self and "radio_horizon" in s["values"]: + orh = s["values"]["radio_horizon"] + if orh != None: + range_text = RNS.prettydistance(orh) + rh_formatted_text = f"Radio horizon of [b]{range_text}[/b]" + extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) + + if "azalt" in s and "local_angle_to_horizon" in s["azalt"]: + lath = s["azalt"]["local_angle_to_horizon"] + if lath != None: + lath_text = f"Local horizon is at [b]{round(lath,3)}°[/b]" + extra_entries.append({"icon": "align-vertical-distribute", "text": lath_text}) + + if "azalt" in s: + azalt_formatted_text = "" + if "azimuth" in s["azalt"]: + az = s["azalt"]["azimuth"] + az_text = f"Azimuth [b]{round(az,3)}°[/b]" + azalt_formatted_text += az_text + + if "altitude" in s["azalt"]: + al = s["azalt"]["altitude"] + al_text = f"altitude [b]{round(al,3)}°[/b]" + if len(azalt_formatted_text) != 0: azalt_formatted_text += ", " + azalt_formatted_text += al_text + + extra_entries.append({"icon": "compass-rose", "text": azalt_formatted_text}) + + if "above_horizon" in s["azalt"]: + astr = "above" if s["azalt"]["above_horizon"] == True else "below" + dstr = str(round(s["azalt"]["altitude_delta"], 3)) + ah_text = f"Object is [b]{astr}[/b] the horizon (Δ = {dstr}°)" + extra_entries.append({"icon": "angle-acute", "text": ah_text}) + + if not self.delegate.viewing_self and "radio_horizon" in s["values"]: + orh = s["values"]["radio_horizon"] + if orh != None: + range_text = RNS.prettydistance(orh) + rh_formatted_text = f"Object's radio horizon is [b]{range_text}[/b]" + extra_entries.append({"icon": "radio-tower", "text": rh_formatted_text}) + + if "radio_horizon" in s: + rh_icon = "circle-outline" + crange_text = RNS.prettydistance(s["radio_horizon"]["combined_range"]) + if s["radio_horizon"]["within_range"]: + rh_formatted_text = f"[b]Within[/b] shared radio horizon of [b]{crange_text}[/b]" + rh_icon = "set-none" + else: + rh_formatted_text = f"[b]Outside[/b] shared radio horizon of [b]{crange_text}[/b]" + + extra_entries.append({"icon": rh_icon, "text": rh_formatted_text}) + + def select(e=None): + geo_uri = f"geo:{lat},{lon}" + def lj(): + webbrowser.open(geo_uri) + threading.Thread(target=lj, daemon=True).start() + + release_function = select else: - data = {"icon": s["icon"], "text": f"{formatted_values}", "on_release": pass_job} + formatted_values = f"{name}" + for vn in s["values"]: + v = s["values"][vn] + formatted_values += f" [b]{v} {vn}[/b]" - if data != None: - self.entries.append(data) - for extra in extra_entries: - self.entries.append(extra) + dt = vn + if "deltas" in s and dt in s["deltas"] and s["deltas"][dt] != None: + d = s["deltas"][dt] + formatted_values += f" (Δ = {d} {vn})" + + data = None + if formatted_values != None: + if release_function: + data = {"icon": s["icon"], "text": f"{formatted_values}", "on_release": release_function} + else: + data = {"icon": s["icon"], "text": f"{formatted_values}", "on_release": pass_job} + if data != None: + self.entries.append(data) + for extra in extra_entries: + self.entries.append(extra) + + except Exception as e: + RNS.log("An error ocurred while displaying telemetry for object", RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) try: nh = RNS.Transport.hops_to(self.delegate.object_hash)