From a091389fa150f70e3e1ae5ca50763318f12edacf Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 22 Oct 2023 20:16:41 +0200 Subject: [PATCH] Run IPC over multiprocessing lib instead of db --- sbapp/main.py | 6 +++ sbapp/mapview/view.py | 2 +- sbapp/sideband/core.py | 114 +++++++++++++++++++++++++++++------------ 3 files changed, 87 insertions(+), 35 deletions(-) diff --git a/sbapp/main.py b/sbapp/main.py index a1ea5b5..5f27c6e 100644 --- a/sbapp/main.py +++ b/sbapp/main.py @@ -199,6 +199,9 @@ class SidebandApp(MDApp): self.update_loading_text() self.init_announces_view() self.announces_view.update() + self.telemetry_init() + self.settings_init() + self.connectivity_init() # Wait a little extra for user to react to permissions prompt # if RNS.vendor.platformutils.get_platform() == "android": @@ -563,6 +566,9 @@ class SidebandApp(MDApp): return screen + def _state_jobs(self): + props = [] + def jobs(self, delta_time): if self.final_load_completed: if RNS.vendor.platformutils.is_android() and not self.sideband.service_available(): diff --git a/sbapp/mapview/view.py b/sbapp/mapview/view.py index 678c338..83fc5b4 100644 --- a/sbapp/mapview/view.py +++ b/sbapp/mapview/view.py @@ -169,7 +169,7 @@ class CustomMapMarker(ButtonBehavior, Image): lim = 0.5 lum = (bg[0]+bg[1]+bg[2])/3 if lum >= lim: - self.source = join(dirname(__file__), "icons", "marker_light.png") + self.source = join(dirname(__file__), "icons", "marker_dark.png") else: self.source = join(dirname(__file__), "icons", "marker_dark.png") diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index 61dcfe3..17d802a 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -11,6 +11,8 @@ import random import RNS.vendor.umsgpack as msgpack import RNS.Interfaces.Interface as Interface +import multiprocessing.connection + from .res import sideband_fb_data from .sense import Telemeter @@ -105,6 +107,8 @@ class SidebandCore(): self.telemetry_running = False self.latest_telemetry = None self.telemetry_changes = 0 + self.state_db = {} + self.rpc_connection = None self.app_dir = plyer.storagepath.get_home_dir()+"/.config/sideband" if self.app_dir.startswith("file://"): @@ -329,6 +333,9 @@ class SidebandCore(): RNS.log("Loading Sideband identity...", RNS.LOG_DEBUG) self.identity = RNS.Identity.from_file(self.identity_path) + self.rpc_addr = ("127.0.0.1", 48165) + self.rpc_key = RNS.Identity.full_hash(self.identity.get_private_key()) + RNS.log("Loading Sideband configuration... "+str(self.config_path), RNS.LOG_DEBUG) config_file = open(self.config_path, "rb") self.config = msgpack.unpackb(config_file.read()) @@ -796,48 +803,84 @@ class SidebandCore(): return self._db_getstate("app.active_conversation") def setstate(self, prop, val): - self.getstate_cache[prop] = val - self._db_setstate(prop, val) - # def cb(): - # self._db_setstate(prop, val) - # threading.Thread(target=cb, daemon=True).start() + if not RNS.vendor.platformutils.is_android(): + self.getstate_cache[prop] = val + self._db_setstate(prop, val) + else: + if self.is_service: + self.state_db[prop] = val + return True + else: + try: + if self.rpc_connection == None: + self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) + self.rpc_connection.send({"setstate": (prop, val)}) + response = self.rpc_connection.recv() + return response + except Exception as e: + RNS.log("Error while setting state over RPC: "+str(e), RNS.LOG_ERROR) + return False def getstate(self, prop, allow_cache=False): if not RNS.vendor.platformutils.is_android(): return self._db_getstate(prop) - else: - db_timeout = 0.060 - cached_value = None - has_cached_value = False - if prop in self.getstate_cache: - cached_value = self.getstate_cache[prop] - has_cached_value = True - - if not allow_cache or not has_cached_value: - self.getstate_cache[prop] = self._db_getstate(prop) - return self.getstate_cache[prop] - - else: - get_thread_running = True - def get_job(): - self.getstate_cache[prop] = self._db_getstate(prop) - get_thread_running = False - - get_thread = threading.Thread(target=get_job, daemon=True) - get_thread.timeout = time.time()+db_timeout - get_thread.start() - - while get_thread.is_alive() and time.time() < get_thread.timeout: - time.sleep(0.01) - - if get_thread.is_alive(): - RNS.log("GETSTATE TIMED OUT, STILL WAITING!", RNS.LOG_WARNING) - return self.getstate_cache[prop] + if self.is_service: + if prop in self.state_db: + return self.state_db[prop] else: - return self.getstate_cache[prop] + return None + else: + try: + if self.rpc_connection == None: + self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) + self.rpc_connection.send({"getstate": prop}) + response = self.rpc_connection.recv() + return response + except Exception as e: + RNS.log("Error while retrieving state "+str(prop)+" over RPC: "+str(e), RNS.LOG_DEBUG) + self.rpc_connection = None + return None + + def __start_rpc_listener(self): + try: + RNS.log("Starting RPC listener", RNS.LOG_DEBUG) + self.rpc_listener = multiprocessing.connection.Listener(self.rpc_addr, authkey=self.rpc_key) + thread = threading.Thread(target=self.__rpc_loop) + thread.daemon = True + thread.start() + except Exception as e: + RNS.log("Could not start RPC listener on "+str(self.rpc_addr)+". Terminating now. Clear up anything using the port and try again.", RNS.LOG_ERROR) + RNS.panic() + def __rpc_loop(self): + while True: + try: + RNS.log("Ready for next RPC client", RNS.LOG_DEBUG) + rpc_connection = self.rpc_listener.accept() + RNS.log("Accepted RPC client", RNS.LOG_DEBUG) + + def job_factory(connection): + def rpc_client_job(): + try: + while connection: + call = connection.recv() + if "getstate" in call: + prop = call["getstate"] + connection.send(self.getstate(prop)) + elif "setstate" in call: + prop, val = call["setstate"] + connection.send(self.setstate(prop, val)) + except Exception as e: + RNS.log("Error on client RPC connection: "+str(e), RNS.LOG_ERROR) + connection.close() + return rpc_client_job + + threading.Thread(target=job_factory(rpc_connection), daemon=True).start() + + except Exception as e: + RNS.log("An error ocurred while handling RPC call from local client: "+str(e), RNS.LOG_ERROR) def setpersistent(self, prop, val): @@ -1924,6 +1967,9 @@ class SidebandCore(): self.setstate("init.loadingstate", "Substantiating Reticulum") self.reticulum = RNS.Reticulum(configdir=self.rns_configdir, loglevel=selected_level) + if self.is_service: + self.__start_rpc_listener() + if RNS.vendor.platformutils.get_platform() == "android": # TODO: Just log to console for, but add option to export log # files at some point.