mirror of
				https://github.com/liberatedsystems/Sideband_CE.git
				synced 2024-09-03 04:13:27 +02:00 
			
		
		
		
	Updated telemetry manager
This commit is contained in:
		
							parent
							
								
									601164f58c
								
							
						
					
					
						commit
						1b4754bca8
					
				@ -1,12 +1,89 @@
 | 
			
		||||
import RNS
 | 
			
		||||
import time
 | 
			
		||||
import struct
 | 
			
		||||
 | 
			
		||||
from RNS.vendor import umsgpack as umsgpack
 | 
			
		||||
 | 
			
		||||
class Telemeter():
 | 
			
		||||
  @staticmethod
 | 
			
		||||
  def from_packed(packed):
 | 
			
		||||
    try:
 | 
			
		||||
      p = umsgpack.unpackb(packed)
 | 
			
		||||
      t = Telemeter(from_packed=True)
 | 
			
		||||
      for sid in p:
 | 
			
		||||
        if sid in t.sids:
 | 
			
		||||
          name = None
 | 
			
		||||
          s = t.sids[sid]()
 | 
			
		||||
          for n in t.available:
 | 
			
		||||
            if t.available[n] == type(s):
 | 
			
		||||
              name = n
 | 
			
		||||
 | 
			
		||||
          if name != None:
 | 
			
		||||
            s.data = s.unpack(p[sid])
 | 
			
		||||
            s.active = True
 | 
			
		||||
            t.sensors[name] = s
 | 
			
		||||
 | 
			
		||||
      return t
 | 
			
		||||
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
      RNS.log("An error occurred while unpacking telemetry. The contained exception was: "+str(e), RNS.LOG_ERROR)
 | 
			
		||||
      return None
 | 
			
		||||
 | 
			
		||||
  def __init__(self, from_packed=False):
 | 
			
		||||
    self.sids = {Sensor.SID_BATTERY: Battery, Sensor.SID_BAROMETER: Barometer, Sensor.SID_LOCATION: Location}
 | 
			
		||||
    self.available = {"battery": Battery, "barometer": Barometer, "location": Location}
 | 
			
		||||
    self.from_packed = from_packed
 | 
			
		||||
    self.sensors = {}
 | 
			
		||||
 | 
			
		||||
  def enable(self, sensor):
 | 
			
		||||
    if not self.from_packed:
 | 
			
		||||
      if sensor in self.available:
 | 
			
		||||
        if not sensor in self.sensors:
 | 
			
		||||
          self.sensors[sensor] = self.available[sensor]()
 | 
			
		||||
        if not self.sensors[sensor].active:
 | 
			
		||||
          self.sensors[sensor].start()
 | 
			
		||||
  
 | 
			
		||||
  def disable(self, sensor):
 | 
			
		||||
    if not self.from_packed:
 | 
			
		||||
      if sensor in self.available:
 | 
			
		||||
        if sensor in self.sensors:
 | 
			
		||||
          if self.sensors[sensor].active:
 | 
			
		||||
            self.sensors[sensor].stop()
 | 
			
		||||
 | 
			
		||||
  def read(self, sensor):
 | 
			
		||||
    if not self.from_packed:
 | 
			
		||||
      if sensor in self.available:
 | 
			
		||||
        if sensor in self.sensors:
 | 
			
		||||
          return self.sensors[sensor].data
 | 
			
		||||
      return None
 | 
			
		||||
    else:
 | 
			
		||||
      if sensor in self.available:
 | 
			
		||||
        if sensor in self.sensors:
 | 
			
		||||
          return self.sensors[sensor]._data
 | 
			
		||||
 | 
			
		||||
  def read_all(self):
 | 
			
		||||
    readings = {}
 | 
			
		||||
    for sensor in self.sensors:
 | 
			
		||||
      if self.sensors[sensor].active:
 | 
			
		||||
        if not self.from_packed:
 | 
			
		||||
          readings[sensor] = self.sensors[sensor].data
 | 
			
		||||
        else:
 | 
			
		||||
          readings[sensor] = self.sensors[sensor]._data
 | 
			
		||||
 | 
			
		||||
    return readings
 | 
			
		||||
 | 
			
		||||
  def packed(self):
 | 
			
		||||
    packed = {}
 | 
			
		||||
    for sensor in self.sensors:
 | 
			
		||||
      if self.sensors[sensor].active:
 | 
			
		||||
        packed[self.sensors[sensor].sid] = self.sensors[sensor].pack()
 | 
			
		||||
    return umsgpack.packb(packed)
 | 
			
		||||
 | 
			
		||||
class Sensor():
 | 
			
		||||
  SID_NONE      = 0x00
 | 
			
		||||
  SID_BATTERY   = 0x01
 | 
			
		||||
  SID_BAROMETER = 0x02
 | 
			
		||||
  SID_LOCATION  = 0x03
 | 
			
		||||
 | 
			
		||||
  def __init__(self, sid = None, stale_time = None):
 | 
			
		||||
    self._sid = sid or Sensor.SID_NONE
 | 
			
		||||
@ -155,40 +232,135 @@ class Barometer(Sensor):
 | 
			
		||||
    except:
 | 
			
		||||
      return None
 | 
			
		||||
 | 
			
		||||
class Telemeter():
 | 
			
		||||
class Location(Sensor):
 | 
			
		||||
  SID = Sensor.SID_LOCATION
 | 
			
		||||
  
 | 
			
		||||
  STALE_TIME = 10
 | 
			
		||||
  MIN_DISTANCE = 5
 | 
			
		||||
  ACCURACY_TARGET = 250
 | 
			
		||||
 | 
			
		||||
  def __init__(self):
 | 
			
		||||
    self.available = {"battery": Battery, "barometer": Barometer}
 | 
			
		||||
    self.sensors = {}
 | 
			
		||||
    super().__init__(type(self).SID, type(self).STALE_TIME)
 | 
			
		||||
 | 
			
		||||
  def enable(self, sensor):
 | 
			
		||||
    if sensor in self.available:
 | 
			
		||||
      if not sensor in self.sensors:
 | 
			
		||||
        self.sensors[sensor] = self.available[sensor]()
 | 
			
		||||
      if not self.sensors[sensor].active:
 | 
			
		||||
        self.sensors[sensor].start()
 | 
			
		||||
    self._raw = None
 | 
			
		||||
    self._min_distance = Location.MIN_DISTANCE
 | 
			
		||||
    self._accuracy_target = Location.ACCURACY_TARGET
 | 
			
		||||
 | 
			
		||||
  def disable(self, sensor):
 | 
			
		||||
    if sensor in self.available:
 | 
			
		||||
      if sensor in self.sensors:
 | 
			
		||||
        if self.sensors[sensor].active:
 | 
			
		||||
          self.sensors[sensor].stop()
 | 
			
		||||
    self.latitude = None
 | 
			
		||||
    self.longtitude = None
 | 
			
		||||
    self.altitude = None
 | 
			
		||||
    self.speed = None
 | 
			
		||||
    self.bearing = None
 | 
			
		||||
    self.accuracy = None
 | 
			
		||||
 | 
			
		||||
  def read(self, sensor):
 | 
			
		||||
    if sensor in self.available:
 | 
			
		||||
      if sensor in self.sensors:
 | 
			
		||||
        return self.sensors[sensor].data
 | 
			
		||||
    return None
 | 
			
		||||
    if RNS.vendor.platformutils.is_android():
 | 
			
		||||
      from plyer import gps
 | 
			
		||||
      self.gps = gps
 | 
			
		||||
 | 
			
		||||
  def read_all(self):
 | 
			
		||||
    readings = {}
 | 
			
		||||
    for sensor in self.sensors:
 | 
			
		||||
      if self.sensors[sensor].active:
 | 
			
		||||
        readings[sensor] = self.sensors[sensor].data
 | 
			
		||||
    return readings
 | 
			
		||||
  def set_min_distance(self, distance):
 | 
			
		||||
    try:
 | 
			
		||||
      d = float(distance)
 | 
			
		||||
      if d >= 0:
 | 
			
		||||
        self._min_distance = d
 | 
			
		||||
        self.teardown_sensor()
 | 
			
		||||
        self.setup_sensor()
 | 
			
		||||
    except:
 | 
			
		||||
      pass
 | 
			
		||||
 | 
			
		||||
  def set_accuracy_target(self, target):
 | 
			
		||||
    try:
 | 
			
		||||
      t = float(target)
 | 
			
		||||
      if t >= 0:
 | 
			
		||||
        self._accuracy_target = t
 | 
			
		||||
    except:
 | 
			
		||||
      pass
 | 
			
		||||
 | 
			
		||||
  def setup_sensor(self):
 | 
			
		||||
    if RNS.vendor.platformutils.is_android():
 | 
			
		||||
      from android.permissions import request_permissions, check_permission
 | 
			
		||||
      if not check_permission("android.permission.ACCESS_COARSE_LOCATION") or not check_permission("android.permission.ACCESS_FINE_LOCATION"):
 | 
			
		||||
          RNS.log("Requesting location permission", RNS.LOG_DEBUG)
 | 
			
		||||
          request_permissions(["android.permission.ACCESS_COARSE_LOCATION", "android.permission.ACCESS_FINE_LOCATION"])
 | 
			
		||||
 | 
			
		||||
      self.gps.configure(on_location=self.android_location_callback)
 | 
			
		||||
      self.gps.start(minTime=self._stale_time, minDistance=self._min_distance)
 | 
			
		||||
      self.update_data()
 | 
			
		||||
 | 
			
		||||
  def teardown_sensor(self):
 | 
			
		||||
    if RNS.vendor.platformutils.is_android():
 | 
			
		||||
      self.gps.stop()
 | 
			
		||||
      self.data = None
 | 
			
		||||
 | 
			
		||||
  def android_location_callback(self, **kwargs):
 | 
			
		||||
    self._raw = kwargs
 | 
			
		||||
    self._last_update = time.time()
 | 
			
		||||
 | 
			
		||||
  def update_data(self):
 | 
			
		||||
    try:
 | 
			
		||||
      if RNS.vendor.platformutils.is_android():
 | 
			
		||||
        if "lat" in self._raw:
 | 
			
		||||
          self.latitude   = self._raw["lat"]
 | 
			
		||||
        if "lon" in self._raw:
 | 
			
		||||
          self.longtitude = self._raw["lon"]
 | 
			
		||||
        if "altitude" in self._raw:
 | 
			
		||||
          self.altitude   = self._raw["altitude"]
 | 
			
		||||
        if "speed" in self._raw:
 | 
			
		||||
          self.speed      = self._raw["speed"]
 | 
			
		||||
          if self.speed < 0:
 | 
			
		||||
            self.speed = 0
 | 
			
		||||
        if "bearing" in self._raw:
 | 
			
		||||
          self.bearing    = self._raw["bearing"]
 | 
			
		||||
        if "accuracy" in self._raw:
 | 
			
		||||
          self.accuracy   = self._raw["accuracy"]
 | 
			
		||||
          if self.accuracy < 0:
 | 
			
		||||
            self.accuracy = 0
 | 
			
		||||
 | 
			
		||||
        if self.accuracy != None and self.accuracy <= self._accuracy_target:        
 | 
			
		||||
          self.data = {
 | 
			
		||||
            "latitude": round(self.latitude, 6),
 | 
			
		||||
            "longtitude": round(self.longtitude, 6),
 | 
			
		||||
            "altitude": round(self.altitude, 2),
 | 
			
		||||
            "speed": round(self.speed, 2),
 | 
			
		||||
            "bearing": round(self.bearing, 2),
 | 
			
		||||
            "accuracy": round(self.accuracy, 2),
 | 
			
		||||
            "last_update": int(self._last_update),
 | 
			
		||||
            }
 | 
			
		||||
    
 | 
			
		||||
    except:
 | 
			
		||||
      self.data = None
 | 
			
		||||
 | 
			
		||||
  def pack(self):
 | 
			
		||||
    packed = {}
 | 
			
		||||
    for sensor in self.sensors:
 | 
			
		||||
      if self.sensors[sensor].active:
 | 
			
		||||
        packed[self.sensors[sensor].sid] = self.sensors[sensor].pack()
 | 
			
		||||
    return umsgpack.packb(packed)
 | 
			
		||||
    d = self.data
 | 
			
		||||
    if d == None:
 | 
			
		||||
      return None
 | 
			
		||||
    else:
 | 
			
		||||
      try:
 | 
			
		||||
        return [
 | 
			
		||||
          struct.pack("!i", int(round(d["latitude"], 6)*1e6)),
 | 
			
		||||
          struct.pack("!i", int(round(d["longtitude"], 6)*1e6)),
 | 
			
		||||
          struct.pack("!I", int(round(d["altitude"], 2)*1e2)),
 | 
			
		||||
          struct.pack("!I", int(round(d["speed"], 2)*1e2)),
 | 
			
		||||
          struct.pack("!I", int(round(d["bearing"], 2)*1e2)),
 | 
			
		||||
          struct.pack("!H", int(round(d["accuracy"], 2)*1e2)),
 | 
			
		||||
          d["last_update"],
 | 
			
		||||
        ]
 | 
			
		||||
      except Exception as e:
 | 
			
		||||
        RNS.log("An error occurred while packing location sensor data. The contained exception was: "+str(e), RNS.LOG_ERROR)
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
  def unpack(self, packed):
 | 
			
		||||
    try:
 | 
			
		||||
      if packed == None:
 | 
			
		||||
        return None
 | 
			
		||||
      else:
 | 
			
		||||
        return {
 | 
			
		||||
          "latitude": struct.unpack("!i", packed[0])[0]/1e6,
 | 
			
		||||
          "longtitude": struct.unpack("!i", packed[1])[0]/1e6,
 | 
			
		||||
          "altitude": struct.unpack("!I", packed[2])[0]/1e2,
 | 
			
		||||
          "speed": struct.unpack("!I", packed[3])[0]/1e2,
 | 
			
		||||
          "bearing": struct.unpack("!I", packed[4])[0]/1e2,
 | 
			
		||||
          "accuracy": struct.unpack("!H", packed[5])[0]/1e2,
 | 
			
		||||
          "last_update": packed[6],
 | 
			
		||||
        }
 | 
			
		||||
    except:
 | 
			
		||||
      return None
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user