Implemented automated and manual telemetry updates to collector

This commit is contained in:
Mark Qvist 2023-10-29 23:03:51 +01:00
parent 92d8b4305b
commit aa99ef058a
3 changed files with 95 additions and 33 deletions

View File

@ -120,6 +120,11 @@ class SidebandCore():
self.telemetry_running = False
self.latest_telemetry = None
self.telemetry_changes = 0
self.pending_telemetry_send = False
self.pending_telemetry_send_try = 0
self.pending_telemetry_send_maxtries = 2
self.telemetry_send_blocked_until = 0
self.pending_telemetry_request = False
self.state_db = {}
self.rpc_connection = None
@ -868,24 +873,29 @@ class SidebandCore():
self.message_router.handle_outbound(message)
else:
if message.state == LXMF.LXMessage.DELIVERED:
self.setpersistent("telemetry.collector_last_send_success_timebase", message.telemetry_timebase)
self.telemetry_update_sending = False
self.setpersistent(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.last_send_success_timebase", message.telemetry_timebase)
self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.update_sending", False)
if message.destination_hash == self.config["telemetry_collector"]:
self.pending_telemetry_send = False
self.pending_telemetry_send_try = 0
self.telemetry_send_blocked_until = 0
else:
self.telemetry_update_sending = False
self.setstate(f"telemetry.{RNS.hexrep(message.destination_hash, delimit=False)}.update_sending", False)
def request_latest_telemetry(self, from_addr=None):
pass
def send_latest_telemetry(self, to_addr=None):
if hasattr(self, "telemetry_update_sending") and self.telemetry_update_sending == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_VERBOSE)
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
if to_addr != None and self.latest_packed_telemetry != None and self.latest_telemetry != None:
dest_identity = RNS.Identity.recall(to_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_VERBOSE)
RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(to_addr)
return "destination_unknown"
@ -902,7 +912,7 @@ class SidebandCore():
if lxm_fields != None and LXMF.FIELD_TELEMETRY in lxm_fields:
telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY])
telemetry_timebase = telemeter.read_all()["time"]["utc"]
if telemetry_timebase > (self.getpersistent("telemetry.collector_last_send_success_timebase") or 0):
if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0):
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields)
lxm.telemetry_timebase = telemetry_timebase
lxm.register_delivery_callback(self.outbound_telemetry_finished)
@ -912,16 +922,18 @@ class SidebandCore():
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_VERBOSE)
RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG)
self.setpersistent("telemetry.collector_last_send_attempt", time.time())
self.telemetry_update_sending = True
self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_VERBOSE)
RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG)
return "already_sent"
else:
return "not_sent"
else:
RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING)
@ -2339,7 +2351,53 @@ class SidebandCore():
if self.config["telemetry_enabled"]:
if self.config["telemetry_send_to_collector"]:
pass
if self.config["telemetry_collector"] != None:
try:
now = time.time()
collector_address = self.config["telemetry_collector"]
last_send_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(collector_address, delimit=False)}.last_send_success_timebase") or 0
send_interval = self.config["telemetry_send_interval"]
next_send = last_send_timebase+send_interval
scheduled = next_send-now; blocked = self.telemetry_send_blocked_until-now
next_send_in = max(scheduled, blocked)
RNS.log("Last telemetry send was "+RNS.prettytime(now-last_send_timebase)+" ago", RNS.LOG_DEBUG)
RNS.log("Next telemetry send is "+("in "+RNS.prettytime(next_send_in) if next_send_in > 0 else "now"), RNS.LOG_DEBUG)
if now > last_send_timebase+send_interval and now > self.telemetry_send_blocked_until:
RNS.log("Initiating telemetry send to collector", RNS.LOG_DEBUG)
if not self.pending_telemetry_send_try >= self.pending_telemetry_send_maxtries:
self.pending_telemetry_send = True
self.pending_telemetry_send_try += 1
self.send_latest_telemetry(to_addr=collector_address)
else:
if self.telemetry_send_blocked_until < now:
next_slot = now+send_interval
self.telemetry_send_blocked_until = next_slot
RNS.log(f"Sending telemetry to collector failed after {self.pending_telemetry_send_try} tries.", RNS.LOG_WARNING)
RNS.log(f"Not scheduling further retries until next send slot in {RNS.prettytime(next_slot-now)}.", RNS.LOG_WARNING)
self.pending_telemetry_send_try = 0
except Exception as e:
RNS.log("An error occurred while sending scheduled telemetry to collector: "+str(e), RNS.LOG_ERROR)
if self.config["telemetry_request_from_collector"]:
if self.config["telemetry_collector"] != None:
try:
now = time.time()
collector_address = self.config["telemetry_collector"]
last_request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(collector_address, delimit=False)}.last_request_success_timebase") or 0
request_interval = self.config["telemetry_request_interval"]
next_request = last_request_timebase+request_interval
RNS.log("Last telemetry request was "+RNS.prettytime(now-last_request_timebase)+" ago", RNS.LOG_DEBUG)
RNS.log("Next telemetry request is "+("in "+RNS.prettytime(next_request-now) if next_request-now > 0 else "now"), RNS.LOG_DEBUG)
if now > last_request_timebase+request_interval:
RNS.log("Initiating telemetry request to collector", RNS.LOG_DEBUG)
except Exception as e:
RNS.log("An error occurred while requesting scheduled telemetry from collector: "+str(e), RNS.LOG_ERROR)
def __start_jobs_deferred(self):
if self.is_service:

View File

@ -119,6 +119,7 @@ class ObjectDetails():
def set_source(self, source_dest, from_conv=False, from_telemetry=False, prefetched=None):
self.object_hash = source_dest
own_address = self.app.sideband.lxmf_destination.hash
telemetry_allowed = self.app.sideband.should_send_telemetry(source_dest)
if source_dest == own_address:
self.viewing_self = True
else:
@ -143,16 +144,16 @@ class ObjectDetails():
self.screen.ids.object_appearance.icon = appearance[0]
self.screen.ids.object_appearance.icon_color = appearance[1]
self.screen.ids.object_appearance.md_bg_color = appearance[2]
# self.screen.ids.delete_button.line_color = self.app.color_reject
# self.screen.ids.delete_button.text_color = self.app.color_reject
# self.screen.ids.delete_button.icon_color = self.app.color_reject
def djob(dt):
if self.viewing_self:
self.screen.ids.request_button.disabled = True
self.screen.ids.send_button.disabled = True
else:
self.screen.ids.request_button.disabled = False
self.screen.ids.send_button.disabled = False
if telemetry_allowed:
self.screen.ids.send_button.disabled = False
else:
self.screen.ids.send_button.disabled = True
if prefetched != None:
latest_telemetry = prefetched
@ -210,6 +211,9 @@ class ObjectDetails():
elif result == "sent":
title_str = "Update Sent"
info_str = "A telemetry update was sent to the peer."
elif result == "not_sent":
title_str = "Not Sent"
info_str = "A telemetry update could not be sent."
else:
title_str = "Unknown Status"
info_str = "The status of the telemetry update is unknown."

View File

@ -532,7 +532,22 @@ MDScreen:
height: dp(48)
MDLabel:
text: "Only display from trusted"
text: "Send display style to everyone"
font_style: "H6"
MDSwitch:
id: telemetry_send_appearance
pos_hint: {"center_y": 0.3}
active: False
MDBoxLayout:
orientation: "horizontal"
size_hint_y: None
padding: [0,0,dp(24),dp(0)]
height: dp(48)
MDLabel:
text: "Only display trusted on map"
font_style: "H6"
MDSwitch:
@ -562,7 +577,7 @@ MDScreen:
height: dp(48)
MDLabel:
text: "Send to all trusted peers"
text: "Send telemetry to all trusted"
font_style: "H6"
MDSwitch:
@ -570,21 +585,6 @@ MDScreen:
pos_hint: {"center_y": 0.3}
active: False
MDBoxLayout:
orientation: "horizontal"
size_hint_y: None
padding: [0,0,dp(24),dp(0)]
height: dp(48)
MDLabel:
text: "Send display style to everyone"
font_style: "H6"
MDSwitch:
id: telemetry_send_appearance
pos_hint: {"center_y": 0.3}
active: False
MDBoxLayout:
orientation: "horizontal"
size_hint_y: None