From 1b4754bca843d98e1386f9bc57a5e2cba001741f Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 20 Oct 2023 12:58:55 +0200 Subject: [PATCH] Updated telemetry manager --- sbapp/sideband/sense.py | 236 ++++++++++++++++++++++++++++++++++------ 1 file changed, 204 insertions(+), 32 deletions(-) diff --git a/sbapp/sideband/sense.py b/sbapp/sideband/sense.py index 285b416..05117c7 100644 --- a/sbapp/sideband/sense.py +++ b/sbapp/sideband/sense.py @@ -1,12 +1,89 @@ import RNS import time +import struct from RNS.vendor import umsgpack as umsgpack +class Telemeter(): + @staticmethod + def from_packed(packed): + try: + p = umsgpack.unpackb(packed) + t = Telemeter(from_packed=True) + for sid in p: + if sid in t.sids: + name = None + s = t.sids[sid]() + for n in t.available: + if t.available[n] == type(s): + name = n + + if name != None: + s.data = s.unpack(p[sid]) + s.active = True + t.sensors[name] = s + + return t + + except Exception as e: + RNS.log("An error occurred while unpacking telemetry. The contained exception was: "+str(e), RNS.LOG_ERROR) + return None + + def __init__(self, from_packed=False): + self.sids = {Sensor.SID_BATTERY: Battery, Sensor.SID_BAROMETER: Barometer, Sensor.SID_LOCATION: Location} + self.available = {"battery": Battery, "barometer": Barometer, "location": Location} + self.from_packed = from_packed + self.sensors = {} + + def enable(self, sensor): + if not self.from_packed: + if sensor in self.available: + if not sensor in self.sensors: + self.sensors[sensor] = self.available[sensor]() + if not self.sensors[sensor].active: + self.sensors[sensor].start() + + def disable(self, sensor): + if not self.from_packed: + if sensor in self.available: + if sensor in self.sensors: + if self.sensors[sensor].active: + self.sensors[sensor].stop() + + def read(self, sensor): + if not self.from_packed: + if sensor in self.available: + if sensor in self.sensors: + return self.sensors[sensor].data + return None + else: + if sensor in self.available: + if sensor in self.sensors: + return self.sensors[sensor]._data + + def read_all(self): + readings = {} + for sensor in self.sensors: + if self.sensors[sensor].active: + if not self.from_packed: + readings[sensor] = self.sensors[sensor].data + else: + readings[sensor] = self.sensors[sensor]._data + + return readings + + def packed(self): + packed = {} + for sensor in self.sensors: + if self.sensors[sensor].active: + packed[self.sensors[sensor].sid] = self.sensors[sensor].pack() + return umsgpack.packb(packed) + class Sensor(): SID_NONE = 0x00 SID_BATTERY = 0x01 SID_BAROMETER = 0x02 + SID_LOCATION = 0x03 def __init__(self, sid = None, stale_time = None): self._sid = sid or Sensor.SID_NONE @@ -155,40 +232,135 @@ class Barometer(Sensor): except: return None -class Telemeter(): - def __init__(self): - self.available = {"battery": Battery, "barometer": Barometer} - self.sensors = {} - - def enable(self, sensor): - if sensor in self.available: - if not sensor in self.sensors: - self.sensors[sensor] = self.available[sensor]() - if not self.sensors[sensor].active: - self.sensors[sensor].start() +class Location(Sensor): + SID = Sensor.SID_LOCATION - def disable(self, sensor): - if sensor in self.available: - if sensor in self.sensors: - if self.sensors[sensor].active: - self.sensors[sensor].stop() + STALE_TIME = 10 + MIN_DISTANCE = 5 + ACCURACY_TARGET = 250 - def read(self, sensor): - if sensor in self.available: - if sensor in self.sensors: - return self.sensors[sensor].data - return None + def __init__(self): + super().__init__(type(self).SID, type(self).STALE_TIME) - def read_all(self): - readings = {} - for sensor in self.sensors: - if self.sensors[sensor].active: - readings[sensor] = self.sensors[sensor].data - return readings + self._raw = None + self._min_distance = Location.MIN_DISTANCE + self._accuracy_target = Location.ACCURACY_TARGET + + self.latitude = None + self.longtitude = None + self.altitude = None + self.speed = None + self.bearing = None + self.accuracy = None + + if RNS.vendor.platformutils.is_android(): + from plyer import gps + self.gps = gps + + def set_min_distance(self, distance): + try: + d = float(distance) + if d >= 0: + self._min_distance = d + self.teardown_sensor() + self.setup_sensor() + except: + pass + + def set_accuracy_target(self, target): + try: + t = float(target) + if t >= 0: + self._accuracy_target = t + except: + pass + + def setup_sensor(self): + if RNS.vendor.platformutils.is_android(): + from android.permissions import request_permissions, check_permission + if not check_permission("android.permission.ACCESS_COARSE_LOCATION") or not check_permission("android.permission.ACCESS_FINE_LOCATION"): + RNS.log("Requesting location permission", RNS.LOG_DEBUG) + request_permissions(["android.permission.ACCESS_COARSE_LOCATION", "android.permission.ACCESS_FINE_LOCATION"]) + + self.gps.configure(on_location=self.android_location_callback) + self.gps.start(minTime=self._stale_time, minDistance=self._min_distance) + self.update_data() + + def teardown_sensor(self): + if RNS.vendor.platformutils.is_android(): + self.gps.stop() + self.data = None + + def android_location_callback(self, **kwargs): + self._raw = kwargs + self._last_update = time.time() + + def update_data(self): + try: + if RNS.vendor.platformutils.is_android(): + if "lat" in self._raw: + self.latitude = self._raw["lat"] + if "lon" in self._raw: + self.longtitude = self._raw["lon"] + if "altitude" in self._raw: + self.altitude = self._raw["altitude"] + if "speed" in self._raw: + self.speed = self._raw["speed"] + if self.speed < 0: + self.speed = 0 + if "bearing" in self._raw: + self.bearing = self._raw["bearing"] + if "accuracy" in self._raw: + self.accuracy = self._raw["accuracy"] + if self.accuracy < 0: + self.accuracy = 0 + + if self.accuracy != None and self.accuracy <= self._accuracy_target: + self.data = { + "latitude": round(self.latitude, 6), + "longtitude": round(self.longtitude, 6), + "altitude": round(self.altitude, 2), + "speed": round(self.speed, 2), + "bearing": round(self.bearing, 2), + "accuracy": round(self.accuracy, 2), + "last_update": int(self._last_update), + } + + except: + self.data = None def pack(self): - packed = {} - for sensor in self.sensors: - if self.sensors[sensor].active: - packed[self.sensors[sensor].sid] = self.sensors[sensor].pack() - return umsgpack.packb(packed) \ No newline at end of file + d = self.data + if d == None: + return None + else: + try: + return [ + struct.pack("!i", int(round(d["latitude"], 6)*1e6)), + struct.pack("!i", int(round(d["longtitude"], 6)*1e6)), + struct.pack("!I", int(round(d["altitude"], 2)*1e2)), + struct.pack("!I", int(round(d["speed"], 2)*1e2)), + struct.pack("!I", int(round(d["bearing"], 2)*1e2)), + struct.pack("!H", int(round(d["accuracy"], 2)*1e2)), + d["last_update"], + ] + except Exception as e: + RNS.log("An error occurred while packing location sensor data. The contained exception was: "+str(e), RNS.LOG_ERROR) + return None + + def unpack(self, packed): + try: + if packed == None: + return None + else: + return { + "latitude": struct.unpack("!i", packed[0])[0]/1e6, + "longtitude": struct.unpack("!i", packed[1])[0]/1e6, + "altitude": struct.unpack("!I", packed[2])[0]/1e2, + "speed": struct.unpack("!I", packed[3])[0]/1e2, + "bearing": struct.unpack("!I", packed[4])[0]/1e2, + "accuracy": struct.unpack("!H", packed[5])[0]/1e2, + "last_update": packed[6], + } + except: + return None \ No newline at end of file