Implemented telemetry collector requests and responses

This commit is contained in:
Mark Qvist 2023-10-30 03:09:57 +01:00
parent 7076fae5cc
commit 8f5a980bf1

View File

@ -533,6 +533,8 @@ class SidebandCore():
self.config["telemetry_use_propagation_only"] = False self.config["telemetry_use_propagation_only"] = False
if not "telemetry_try_propagation_on_fail" in self.config: if not "telemetry_try_propagation_on_fail" in self.config:
self.config["telemetry_try_propagation_on_fail"] = True self.config["telemetry_try_propagation_on_fail"] = True
if not "telemetry_requests_only_send_latest" in self.config:
self.config["telemetry_requests_only_send_latest"] = True
if not "telemetry_s_location" in self.config: if not "telemetry_s_location" in self.config:
self.config["telemetry_s_location"] = False self.config["telemetry_s_location"] = False
@ -952,7 +954,7 @@ class SidebandCore():
return "not_sent" return "not_sent"
def send_latest_telemetry(self, to_addr=None): def send_latest_telemetry(self, to_addr=None, stream=None):
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True: if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG) RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress" return "in_progress"
@ -975,9 +977,19 @@ class SidebandCore():
desired_method = LXMF.LXMessage.DIRECT desired_method = LXMF.LXMessage.DIRECT
lxm_fields = self.get_message_fields(to_addr) lxm_fields = self.get_message_fields(to_addr)
if lxm_fields != None and LXMF.FIELD_TELEMETRY in lxm_fields: if stream != None and len(stream) > 0:
lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream
if lxm_fields != None and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields):
if LXMF.FIELD_TELEMETRY in lxm_fields:
telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY]) telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY])
telemetry_timebase = telemeter.read_all()["time"]["utc"] telemetry_timebase = telemeter.read_all()["time"]["utc"]
elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields:
telemetry_timebase = 0
for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]:
ts = te[1]
telemetry_timebase = max(telemetry_timebase, ts)
if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0): if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0):
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields) lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields)
lxm.telemetry_timebase = telemetry_timebase lxm.telemetry_timebase = telemetry_timebase
@ -1471,7 +1483,6 @@ class SidebandCore():
db = self.__db_connect() db = self.__db_connect()
dbc = db.cursor() dbc = db.cursor()
# TODO: Implement limit
limit_part = "" limit_part = ""
if limit: if limit:
limit_part = " LIMIT "+str(int(limit)) limit_part = " LIMIT "+str(int(limit))
@ -1996,7 +2007,7 @@ class SidebandCore():
if LXMF.FIELD_TELEMETRY_STREAM in lxm.fields: if LXMF.FIELD_TELEMETRY_STREAM in lxm.fields:
for telemetry_entry in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]: for telemetry_entry in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]:
# TODO: Implement # TODO: Implement
pass RNS.log("TODO: Save this telemetry stream entry: "+str(telemetry_entry), RNS.LOG_WARNING)
if own_command or len(lxm.content) != 0 or len(lxm.title) != 0: if own_command or len(lxm.content) != 0 or len(lxm.title) != 0:
db = self.__db_connect() db = self.__db_connect()
@ -2934,11 +2945,10 @@ class SidebandCore():
self.lxm_ingest(message, originator=True) self.lxm_ingest(message, originator=True)
def get_message_fields(self, context_dest, telemetry_update=False): def get_message_fields(self, context_dest, telemetry_update=False):
fields = None fields = {}
send_telemetry = (telemetry_update == True) or self.should_send_telemetry(context_dest) send_telemetry = (telemetry_update == True) or self.should_send_telemetry(context_dest)
send_appearance = self.config["telemetry_send_appearance"] or send_telemetry send_appearance = self.config["telemetry_send_appearance"] or send_telemetry
if send_telemetry or send_appearance: if send_telemetry or send_appearance:
fields = {}
if send_appearance: if send_appearance:
def fth(c): def fth(c):
r = c[0]; g = c[1]; b = c[2] r = c[0]; g = c[1]; b = c[2]
@ -3307,11 +3317,12 @@ class SidebandCore():
def handle_commands(self, commands, message): def handle_commands(self, commands, message):
try: try:
context_dest = message.source_hash context_dest = message.source_hash
RNS.log("Handling commands from "+str(context_dest), RNS.LOG_DEBUG) RNS.log("Handling commands from "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG)
for command in commands: for command in commands:
if Commands.TELEMETRY_REQUEST in command: if Commands.TELEMETRY_REQUEST in command:
timebase = int(command[Commands.TELEMETRY_REQUEST]) timebase = int(command[Commands.TELEMETRY_REQUEST])
RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG) RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG)
self.create_telemetry_response(to_addr=context_dest, timebase=timebase)
elif Commands.ECHO in command: elif Commands.ECHO in command:
msg_content = "Echo reply: "+command[Commands.ECHO].decode("utf-8") msg_content = "Echo reply: "+command[Commands.ECHO].decode("utf-8")
@ -3337,6 +3348,26 @@ class SidebandCore():
except Exception as e: except Exception as e:
RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR) RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR)
def create_telemetry_response(self, to_addr, timebase):
sources = {}
sources = self.list_telemetry(after=timebase)
only_latest = self.config["telemetry_requests_only_send_latest"]
telemetry_stream = []
for source in sources:
if source != to_addr:
for entry in sources[source]:
timestamp = entry[0]; packed_telemetry = entry[1]
te = [source, timestamp, packed_telemetry]
if only_latest:
if not source in sources:
sources[source] = True
telemetry_stream.append(te)
else:
telemetry_stream.append(te)
self.send_latest_telemetry(to_addr=to_addr, stream=telemetry_stream)
def get_display_name_bytes(self): def get_display_name_bytes(self):
return self.config["display_name"].encode("utf-8") return self.config["display_name"].encode("utf-8")