Implemented telemetry update sending

This commit is contained in:
Mark Qvist 2023-10-29 20:59:27 +01:00
parent 0b2a86a394
commit 92d8b4305b
5 changed files with 320 additions and 134 deletions

View File

@ -3153,11 +3153,42 @@ class SidebandApp(MDApp):
self.open_conversation(context_dest) self.open_conversation(context_dest)
def telemetry_send_update(self, sender=None): def telemetry_send_update(self, sender=None):
# TODO: Implement if not hasattr(self, "telemetry_info_dialog") or self.telemetry_info_dialog == None:
ok_button = MDRectangleFlatButton(text="OK",font_size=dp(18))
self.telemetry_info_dialog = MDDialog(
title="Info",
text="",
buttons=[ ok_button ],
)
def dl_ok(s):
self.telemetry_info_dialog.dismiss()
ok_button.bind(on_release=dl_ok)
result = self.sideband.send_latest_telemetry(to_addr=self.sideband.config["telemetry_collector"])
if result == "destination_unknown":
title_str = "Unknown Destination"
info_str = "No keys known for the destination. Connected reticules have been queried for the keys."
elif result == "in_progress":
title_str = "Transfer In Progress"
info_str = "There is already an outbound telemetry transfer in progress to the collector."
elif result == "already_sent":
title_str = "Already Delivered"
info_str = "The current telemetry data was already sent and delivered to the collector or propagation network."
elif result == "sent":
title_str = "Update Sent"
info_str = "A telemetry update was sent to the collector."
else:
title_str = "Unknown Status"
info_str = "The status of the telemetry update is unknown."
self.telemetry_info_dialog.title = title_str
self.telemetry_info_dialog.text = info_str
self.telemetry_info_dialog.open()
pass pass
def telemetry_request_action(self, sender=None): def telemetry_request_action(self, sender=None):
# TODO: Implement self.sideband.request_latest_telemetry(from_addr=self.sideband.config["telemetry_collector"])
pass pass
### Map Screen ### Map Screen

View File

@ -523,6 +523,10 @@ class SidebandCore():
self.config["telemetry_receive_trusted_only"] = False self.config["telemetry_receive_trusted_only"] = False
if not "telemetry_send_all_to_collector" in self.config: if not "telemetry_send_all_to_collector" in self.config:
self.config["telemetry_send_all_to_collector"] = False self.config["telemetry_send_all_to_collector"] = False
if not "telemetry_use_propagation_only" in self.config:
self.config["telemetry_use_propagation_only"] = False
if not "telemetry_try_propagation_on_fail" in self.config:
self.config["telemetry_try_propagation_on_fail"] = True
if not "telemetry_s_location" in self.config: if not "telemetry_s_location" in self.config:
self.config["telemetry_s_location"] = False self.config["telemetry_s_location"] = False
@ -853,6 +857,77 @@ class SidebandCore():
else: else:
return None return None
def outbound_telemetry_finished(self, message):
if message.state == LXMF.LXMessage.FAILED and hasattr(message, "try_propagation_on_fail") and message.try_propagation_on_fail:
RNS.log("Direct delivery of telemetry update "+str(message)+" failed. Retrying as propagated message.", RNS.LOG_VERBOSE)
message.try_propagation_on_fail = None
message.delivery_attempts = 0
del message.next_delivery_attempt
message.packed = None
message.desired_method = LXMF.LXMessage.PROPAGATED
self.message_router.handle_outbound(message)
else:
if message.state == LXMF.LXMessage.DELIVERED:
self.setpersistent("telemetry.collector_last_send_success_timebase", message.telemetry_timebase)
self.telemetry_update_sending = False
else:
self.telemetry_update_sending = False
def request_latest_telemetry(self, from_addr=None):
pass
def send_latest_telemetry(self, to_addr=None):
if hasattr(self, "telemetry_update_sending") and self.telemetry_update_sending == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_VERBOSE)
return "in_progress"
if to_addr != None and self.latest_packed_telemetry != None and self.latest_telemetry != None:
dest_identity = RNS.Identity.recall(to_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_VERBOSE)
RNS.Transport.request_path(to_addr)
return "destination_unknown"
else:
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
lxm_fields = self.get_message_fields(to_addr)
if lxm_fields != None and LXMF.FIELD_TELEMETRY in lxm_fields:
telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY])
telemetry_timebase = telemeter.read_all()["time"]["utc"]
if telemetry_timebase > (self.getpersistent("telemetry.collector_last_send_success_timebase") or 0):
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields)
lxm.telemetry_timebase = telemetry_timebase
lxm.register_delivery_callback(self.outbound_telemetry_finished)
lxm.register_failed_callback(self.outbound_telemetry_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_VERBOSE)
self.setpersistent("telemetry.collector_last_send_attempt", time.time())
self.telemetry_update_sending = True
self.message_router.handle_outbound(lxm)
return "sent"
else:
RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_VERBOSE)
return "already_sent"
else:
RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING)
return "not_sent"
def list_telemetry(self, context_dest = None, after = None, before = None, limit = None): def list_telemetry(self, context_dest = None, after = None, before = None, limit = None):
return self._db_telemetry(context_dest = context_dest, after = after, before = before, limit = limit) or [] return self._db_telemetry(context_dest = context_dest, after = after, before = before, limit = limit) or []
@ -1840,48 +1915,54 @@ class SidebandCore():
physical_link["q"] = lxm.q physical_link["q"] = lxm.q
packed_telemetry = self._db_save_telemetry(context_dest, lxm.fields[LXMF.FIELD_TELEMETRY], physical_link=physical_link, source_dest=context_dest) packed_telemetry = self._db_save_telemetry(context_dest, lxm.fields[LXMF.FIELD_TELEMETRY], physical_link=physical_link, source_dest=context_dest)
db = self.__db_connect() if LXMF.FIELD_TELEMETRY_STREAM in lxm.fields:
dbc = db.cursor() for telemetry_entry in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]:
# TODO: Implement
pass
if not lxm.packed: if len(lxm.content) != 0 or len(lxm.title) != 0:
lxm.pack() db = self.__db_connect()
dbc = db.cursor()
if lxm.method == LXMF.LXMessage.PAPER: if not lxm.packed:
packed_lxm = msgpack.packb([lxm.packed, lxm.paper_packed]) lxm.pack()
else:
packed_lxm = lxm.packed
extras = {} if lxm.method == LXMF.LXMessage.PAPER:
if lxm.rssi or lxm.snr or lxm.q: packed_lxm = msgpack.packb([lxm.packed, lxm.paper_packed])
extras["rssi"] = lxm.rssi else:
extras["snr"] = lxm.snr packed_lxm = lxm.packed
extras["q"] = lxm.q
if packed_telemetry != None: extras = {}
extras["packed_telemetry"] = packed_telemetry if lxm.rssi or lxm.snr or lxm.q:
extras["rssi"] = lxm.rssi
extras["snr"] = lxm.snr
extras["q"] = lxm.q
extras = msgpack.packb(extras) if packed_telemetry != None:
extras["packed_telemetry"] = packed_telemetry
query = "INSERT INTO lxm (lxm_hash, dest, source, title, tx_ts, rx_ts, state, method, t_encrypted, t_encryption, data, extra) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" extras = msgpack.packb(extras)
data = (
lxm.hash,
lxm.destination_hash,
lxm.source_hash,
lxm.title,
lxm.timestamp,
time.time(),
state,
lxm.method,
lxm.transport_encrypted,
lxm.transport_encryption,
packed_lxm,
extras
)
dbc.execute(query, data) query = "INSERT INTO lxm (lxm_hash, dest, source, title, tx_ts, rx_ts, state, method, t_encrypted, t_encryption, data, extra) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
db.commit() data = (
lxm.hash,
lxm.destination_hash,
lxm.source_hash,
lxm.title,
lxm.timestamp,
time.time(),
state,
lxm.method,
lxm.transport_encrypted,
lxm.transport_encryption,
packed_lxm,
extras
)
self.__event_conversation_changed(context_dest) dbc.execute(query, data)
db.commit()
self.__event_conversation_changed(context_dest)
def _db_save_announce(self, destination_hash, app_data, dest_type="lxmf.delivery"): def _db_save_announce(self, destination_hash, app_data, dest_type="lxmf.delivery"):
db = self.__db_connect() db = self.__db_connect()
@ -2256,6 +2337,10 @@ class SidebandCore():
self.setpersistent("lxmf.lastsync", time.time()) self.setpersistent("lxmf.lastsync", time.time())
self.setpersistent("lxmf.syncretrying", False) self.setpersistent("lxmf.syncretrying", False)
if self.config["telemetry_enabled"]:
if self.config["telemetry_send_to_collector"]:
pass
def __start_jobs_deferred(self): def __start_jobs_deferred(self):
if self.is_service: if self.is_service:
self.service_thread = threading.Thread(target=self._service_jobs, daemon=True) self.service_thread = threading.Thread(target=self._service_jobs, daemon=True)
@ -2724,9 +2809,9 @@ class SidebandCore():
else: else:
self.lxm_ingest(message, originator=True) self.lxm_ingest(message, originator=True)
def get_message_fields(self, context_dest): def get_message_fields(self, context_dest, telemetry_update=False):
fields = None fields = None
send_telemetry = self.should_send_telemetry(context_dest) send_telemetry = (telemetry_update == True) or self.should_send_telemetry(context_dest)
send_appearance = self.config["telemetry_send_appearance"] or send_telemetry send_appearance = self.config["telemetry_send_appearance"] or send_telemetry
if send_telemetry or send_appearance: if send_telemetry or send_appearance:
fields = {} fields = {}
@ -2850,6 +2935,7 @@ class SidebandCore():
def lxm_ingest(self, message, originator = False): def lxm_ingest(self, message, originator = False):
should_notify = False should_notify = False
is_trusted = False is_trusted = False
telemetry_only = False
unread_reason_tx = False unread_reason_tx = False
if originator: if originator:
@ -2869,31 +2955,40 @@ class SidebandCore():
if is_trusted: if is_trusted:
should_notify = True should_notify = True
if self._db_conversation(context_dest) == None: if len(message.content) == 0 and len(message.title) == 0:
self._db_create_conversation(context_dest) if (LXMF.FIELD_TELEMETRY in message.fields or LXMF.FIELD_TELEMETRY_STREAM in message.fields):
self.setstate("app.flags.new_conversations", True) RNS.log("Squelching notification due to telemetry-only message", RNS.LOG_DEBUG)
telemetry_only = True
if self.gui_display() == "messages_screen": if not telemetry_only:
if self.gui_conversation() != context_dest: if self._db_conversation(context_dest) == None:
self._db_create_conversation(context_dest)
self.setstate("app.flags.new_conversations", True)
if self.gui_display() == "messages_screen":
if self.gui_conversation() != context_dest:
self.unread_conversation(context_dest, tx=unread_reason_tx)
self.setstate("app.flags.unread_conversations", True)
else:
self.txtime_conversation(context_dest)
self.setstate("wants.viewupdate.conversations", True)
if self.gui_foreground():
RNS.log("Squelching notification since GUI is in foreground", RNS.LOG_DEBUG)
should_notify = False
else:
self.unread_conversation(context_dest, tx=unread_reason_tx) self.unread_conversation(context_dest, tx=unread_reason_tx)
self.setstate("app.flags.unread_conversations", True) self.setstate("app.flags.unread_conversations", True)
else:
self.txtime_conversation(context_dest)
self.setstate("wants.viewupdate.conversations", True)
if self.gui_foreground():
RNS.log("Squelching notification since GUI is in foreground", RNS.LOG_DEBUG)
should_notify = False
else:
self.unread_conversation(context_dest, tx=unread_reason_tx)
self.setstate("app.flags.unread_conversations", True)
if RNS.vendor.platformutils.is_android(): if RNS.vendor.platformutils.is_android():
if self.gui_display() == "conversations_screen" and self.gui_foreground(): if self.gui_display() == "conversations_screen" and self.gui_foreground():
should_notify = False should_notify = False
if self.is_client: if self.is_client:
should_notify = False should_notify = False
if telemetry_only:
should_notify = False
if should_notify: if should_notify:
nlen = 128 nlen = 128
text = message.content.decode("utf-8") text = message.content.decode("utf-8")

View File

@ -77,16 +77,26 @@ class Conversations():
def trust_icon(self, context_dest, unread): def trust_icon(self, context_dest, unread):
trust_icon = "account-question" trust_icon = "account-question"
if self.app.sideband.is_trusted(context_dest): is_trusted = self.app.sideband.is_trusted(context_dest)
if self.app.sideband.requests_allowed_from(context_dest):
if unread: if unread:
trust_icon = "email-seal" if is_trusted:
trust_icon = "email-seal"
else:
trust_icon = "email"
else: else:
trust_icon = "account-check" trust_icon = "account-lock-open"
else: else:
if unread: if is_trusted:
trust_icon = "email" if unread:
trust_icon = "email-seal"
else:
trust_icon = "account-check"
else: else:
trust_icon = "account-question" if unread:
trust_icon = "email"
else:
trust_icon = "account-question"
return trust_icon return trust_icon

View File

@ -52,6 +52,17 @@ class ObjectDetails():
self.telemetry_list.app = self.app self.telemetry_list.app = self.app
self.screen.ids.object_details_container.add_widget(self.telemetry_list) self.screen.ids.object_details_container.add_widget(self.telemetry_list)
ok_button = MDRectangleFlatButton(text="OK",font_size=dp(18))
self.info_dialog = MDDialog(
title="Info",
text="",
buttons=[ ok_button ],
)
def dl_ok(s):
self.info_dialog.dismiss()
ok_button.bind(on_release=dl_ok)
Clock.schedule_interval(self.reload_job, 2) Clock.schedule_interval(self.reload_job, 2)
def reload_job(self, dt=None): def reload_job(self, dt=None):
@ -107,6 +118,12 @@ class ObjectDetails():
def set_source(self, source_dest, from_conv=False, from_telemetry=False, prefetched=None): def set_source(self, source_dest, from_conv=False, from_telemetry=False, prefetched=None):
self.object_hash = source_dest self.object_hash = source_dest
own_address = self.app.sideband.lxmf_destination.hash
if source_dest == own_address:
self.viewing_self = True
else:
self.viewing_self = False
if from_telemetry: if from_telemetry:
self.from_telemetry = True self.from_telemetry = True
@ -136,7 +153,6 @@ class ObjectDetails():
else: else:
self.screen.ids.request_button.disabled = False self.screen.ids.request_button.disabled = False
self.screen.ids.send_button.disabled = False self.screen.ids.send_button.disabled = False
Clock.schedule_once(djob, 0.0)
if prefetched != None: if prefetched != None:
latest_telemetry = prefetched latest_telemetry = prefetched
@ -147,16 +163,13 @@ class ObjectDetails():
telemetry_timestamp = latest_telemetry[0][0] telemetry_timestamp = latest_telemetry[0][0]
self.lastest_timestamp = telemetry_timestamp self.lastest_timestamp = telemetry_timestamp
own_address = self.app.sideband.lxmf_destination.hash
telemeter = Telemeter.from_packed(latest_telemetry[0][1]) telemeter = Telemeter.from_packed(latest_telemetry[0][1])
self.raw_telemetry = telemeter.read_all() self.raw_telemetry = telemeter.read_all()
relative_to = None relative_to = None
if source_dest != own_address: if source_dest != own_address:
relative_to = self.app.sideband.telemeter relative_to = self.app.sideband.telemeter
self.viewing_self = False
else: else:
self.viewing_self = True
self.screen.ids.name_label.text = pds+" (this device)" self.screen.ids.name_label.text = pds+" (this device)"
rendered_telemetry = telemeter.render(relative_to=relative_to) rendered_telemetry = telemeter.render(relative_to=relative_to)
@ -176,11 +189,39 @@ class ObjectDetails():
self.telemetry_list.update_source(None) self.telemetry_list.update_source(None)
self.telemetry_list.effect_cls = ScrollEffect self.telemetry_list.effect_cls = ScrollEffect
Clock.schedule_once(djob, 0.1)
def reload(self): def reload(self):
self.clear_widget() self.clear_widget()
self.update() self.update()
def send_update(self):
if not self.viewing_self:
result = self.app.sideband.send_latest_telemetry(to_addr=self.object_hash)
if result == "destination_unknown":
title_str = "Unknown Destination"
info_str = "No keys known for the destination. Connected reticules have been queried for the keys."
elif result == "in_progress":
title_str = "Transfer In Progress"
info_str = "There is already an outbound telemetry transfer in progress for this peer."
elif result == "already_sent":
title_str = "Already Delivered"
info_str = "The current telemetry data was already sent and delivered to the peer or propagation network."
elif result == "sent":
title_str = "Update Sent"
info_str = "A telemetry update was sent to the peer."
else:
title_str = "Unknown Status"
info_str = "The status of the telemetry update is unknown."
self.info_dialog.title = title_str
self.info_dialog.text = info_str
self.info_dialog.open()
def request_update(self):
if not self.viewing_self:
result = self.app.sideband.request_latest_telemetry(from_addr=self.object_hash)
def clear_widget(self): def clear_widget(self):
pass pass
@ -589,7 +630,7 @@ MDScreen:
icon_size: dp(24) icon_size: dp(24)
font_size: dp(16) font_size: dp(16)
size_hint: [1.0, None] size_hint: [1.0, None]
on_release: root.delegate.copy_telemetry(self) on_release: root.delegate.request_update()
disabled: False disabled: False
MDRectangleFlatIconButton: MDRectangleFlatIconButton:
@ -600,7 +641,7 @@ MDScreen:
icon_size: dp(24) icon_size: dp(24)
font_size: dp(16) font_size: dp(16)
size_hint: [1.0, None] size_hint: [1.0, None]
on_release: root.delegate.copy_coordinates(self) on_release: root.delegate.send_update()
disabled: False disabled: False
# MDBoxLayout: # MDBoxLayout:

View File

@ -509,19 +509,6 @@ MDScreen:
text_size: self.width, None text_size: self.width, None
height: self.texture_size[1] height: self.texture_size[1]
MDBoxLayout:
orientation: "vertical"
size_hint_y: None
height: self.minimum_height
padding: [0, dp(0), 0, dp(0)]
MDTextField:
id: telemetry_collector
max_text_length: 32
hint_text: "Telemetry Collector LXMF Address"
text: ""
font_size: dp(24)
MDBoxLayout: MDBoxLayout:
orientation: "horizontal" orientation: "horizontal"
padding: [0,0,dp(24),0] padding: [0,0,dp(24),0]
@ -684,60 +671,83 @@ MDScreen:
#height: dp(232) #height: dp(232)
height: self.minimum_height height: self.minimum_height
MDRectangleFlatIconButton: MDTextField:
id: telemetry_sensors_button id: telemetry_collector
icon: "sun-thermometer-outline" max_text_length: 32
text: "Configure Sensors" hint_text: "Telemetry Collector LXMF Address"
padding: [dp(0), dp(14), dp(0), dp(14)] text: ""
icon_size: dp(24) font_size: dp(24)
font_size: dp(16)
size_hint: [1.0, None]
on_release: root.delegate.sensors_action(self)
disabled: False
MDRectangleFlatIconButton:
id: telemetry_own_button # MDRectangleFlatIconButton:
icon: "database-eye-outline" # id: telemetry_copy_button
text: "Display Own Telemetry" # icon: "content-copy"
padding: [dp(0), dp(14), dp(0), dp(14)] # text: "Copy Own Telemetry"
icon_size: dp(24) # padding: [dp(0), dp(14), dp(0), dp(14)]
font_size: dp(16) # icon_size: dp(24)
size_hint: [1.0, None] # font_size: dp(16)
on_release: root.app.map_display_own_telemetry(self) # size_hint: [1.0, None]
disabled: False # on_release: root.delegate.telemetry_copy(self)
# disabled: False
MDRectangleFlatIconButton:
id: telemetry_copy_button
icon: "content-copy"
text: "Copy Telemetry Data To Clipboard"
padding: [dp(0), dp(14), dp(0), dp(14)]
icon_size: dp(24)
font_size: dp(16)
size_hint: [1.0, None]
on_release: root.delegate.telemetry_copy(self)
disabled: False
MDRectangleFlatIconButton: MDBoxLayout:
id: telemetry_send_update_button orientation: "horizontal"
icon: "upload-lock" size_hint_y: None
text: "Send Telemetry To Collector" height: self.minimum_height
padding: [dp(0), dp(14), dp(0), dp(14)] spacing: dp(24)
icon_size: dp(24)
font_size: dp(16)
size_hint: [1.0, None]
on_release: root.app.telemetry_send_update(self)
disabled: False
MDRectangleFlatIconButton: MDRectangleFlatIconButton:
id: telemetry_request_button id: telemetry_send_update_button
icon: "arrow-down-bold-hexagon-outline" icon: "upload-lock"
text: "Request Telemetry From Collector" text: "Send Now"
padding: [dp(0), dp(14), dp(0), dp(14)] padding: [dp(0), dp(14), dp(0), dp(14)]
icon_size: dp(24) icon_size: dp(24)
font_size: dp(16) font_size: dp(16)
size_hint: [1.0, None] size_hint: [1.0, None]
on_release: root.app.telemetry_request_action(self) on_release: root.app.telemetry_send_update(self)
disabled: False disabled: False
MDRectangleFlatIconButton:
id: telemetry_request_button
icon: "arrow-down-bold-hexagon-outline"
text: "Request Now"
padding: [dp(0), dp(14), dp(0), dp(14)]
icon_size: dp(24)
font_size: dp(16)
size_hint: [1.0, None]
on_release: root.app.telemetry_request_action(self)
disabled: False
MDBoxLayout:
orientation: "horizontal"
size_hint_y: None
height: self.minimum_height
spacing: dp(24)
MDRectangleFlatIconButton:
id: telemetry_sensors_button
icon: "sun-thermometer-outline"
text: "Configure Sensors"
padding: [dp(0), dp(14), dp(0), dp(14)]
icon_size: dp(24)
font_size: dp(16)
size_hint: [1.0, None]
on_release: root.delegate.sensors_action(self)
disabled: False
MDRectangleFlatIconButton:
id: telemetry_own_button
icon: "database-eye-outline"
text: "Display Own"
padding: [dp(0), dp(14), dp(0), dp(14)]
icon_size: dp(24)
font_size: dp(16)
size_hint: [1.0, None]
on_release: root.app.map_display_own_telemetry(self)
disabled: False
MDLabel: MDLabel:
text: "Display Options" text: "Display Options"
@ -788,14 +798,13 @@ MDScreen:
orientation: "vertical" orientation: "vertical"
size_hint_y: None size_hint_y: None
padding: [dp(0),dp(24),dp(0),dp(12)] padding: [dp(0),dp(24),dp(0),dp(12)]
height: dp(74) height: self.minimum_height
MDBoxLayout: MDBoxLayout:
orientation: "horizontal" orientation: "horizontal"
#size_hint_y: None size_hint_y: None
height: self.minimum_height
spacing: dp(24) spacing: dp(24)
# padding: [0,0,dp(24),dp(0)]
# height: dp(48)
MDRectangleFlatIconButton: MDRectangleFlatIconButton:
id: telemetry_icons_button id: telemetry_icons_button