From 2cf7d8ad84dc78ae2df25d5a50e060b98f29adfe Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 1 Oct 2024 16:22:47 +0200 Subject: [PATCH] Added thread lock to service RPC calls --- sbapp/sideband/core.py | 99 +++++++++++++----------------------------- 1 file changed, 31 insertions(+), 68 deletions(-) diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index 1055643..0f68a29 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -153,6 +153,7 @@ class SidebandCore(): self.state_lock = Lock() self.message_router = None self.rpc_connection = None + self.rpc_lock = Lock() self.service_stopped = False self.service_context = service_context self.owner_service = owner_service @@ -1223,13 +1224,8 @@ class SidebandCore(): else: if self.is_client: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - - self.rpc_connection.send({"request_latest_telemetry": {"from_addr": from_addr}}) - response = self.rpc_connection.recv() - return response - + return self.service_rpc_request({"request_latest_telemetry": {"from_addr": from_addr}}) + except Exception as e: RNS.log("Error while requesting latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG) RNS.trace_exception(e) @@ -1302,17 +1298,12 @@ class SidebandCore(): else: if self.is_client: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - - self.rpc_connection.send({"send_latest_telemetry": { + return self.service_rpc_request({"send_latest_telemetry": { "to_addr": to_addr, "stream": stream, "is_authorized_telemetry_request": is_authorized_telemetry_request} }) - response = self.rpc_connection.recv() - return response - + except Exception as e: RNS.log("Error while sending latest telemetry over RPC: "+str(e), RNS.LOG_DEBUG) RNS.trace_exception(e) @@ -1520,11 +1511,7 @@ class SidebandCore(): return True else: def set(): - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - self.rpc_connection.send({"setstate": (prop, val)}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"setstate": (prop, val)}) try: set() @@ -1546,11 +1533,7 @@ class SidebandCore(): return True else: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - self.rpc_connection.send({"latest_telemetry": (latest_telemetry, latest_packed_telemetry)}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"latest_telemetry": (latest_telemetry, latest_packed_telemetry)}) except Exception as e: RNS.log("Error while setting telemetry over RPC: "+str(e), RNS.LOG_DEBUG) return False @@ -1567,11 +1550,7 @@ class SidebandCore(): return True else: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - self.rpc_connection.send({"set_debug": debug}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"set_debug": debug}) except Exception as e: RNS.log("Error while setting log level over RPC: "+str(e), RNS.LOG_DEBUG) return False @@ -1585,15 +1564,25 @@ class SidebandCore(): return True else: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - self.rpc_connection.send({"set_ui_recording": recording}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"set_ui_recording": recording}) except Exception as e: RNS.log("Error while setting UI recording status over RPC: "+str(e), RNS.LOG_DEBUG) return False + def service_rpc_request(self, request): + # RNS.log("Running service RPC call: "+str(request), RNS.LOG_DEBUG) + try: + with self.rpc_lock: + if self.rpc_connection == None: + self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) + self.rpc_connection.send(request) + response = self.rpc_connection.recv() + return response + + except Exception as e: + RNS.log(f"An error occurred while executing the service RPC request: {request}", RNS.LOG_ERROR) + RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR) + def getstate(self, prop, allow_cache=False): with self.state_lock: if not self.service_stopped: @@ -1611,11 +1600,7 @@ class SidebandCore(): return None else: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - self.rpc_connection.send({"getstate": prop}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"getstate": prop}) except Exception as e: RNS.log("Error while retrieving state "+str(prop)+" over RPC: "+str(e), RNS.LOG_DEBUG) @@ -1650,11 +1635,7 @@ class SidebandCore(): return self._get_plugins_info() else: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - self.rpc_connection.send({"get_plugins_info": True}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"get_plugins_info": True}) except Exception as e: ed = "Error while getting plugins info over RPC: "+str(e) RNS.log(ed, RNS.LOG_DEBUG) @@ -1689,11 +1670,7 @@ class SidebandCore(): return self._get_destination_establishment_rate(destination_hash) else: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - self.rpc_connection.send({"get_destination_establishment_rate": destination_hash}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"get_destination_establishment_rate": destination_hash}) except Exception as e: ed = "Error while getting destination link etablishment rate over RPC: "+str(e) RNS.log(ed, RNS.LOG_DEBUG) @@ -1782,6 +1759,7 @@ class SidebandCore(): except Exception as e: RNS.log("Error on client RPC connection: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) try: connection.close() except: @@ -4001,12 +3979,7 @@ class SidebandCore(): else: if self.is_client: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - - self.rpc_connection.send({"get_lxm_progress": {"lxm_hash": lxm_hash}}) - response = self.rpc_connection.recv() - return response + return self.service_rpc_request({"get_lxm_progress": {"lxm_hash": lxm_hash}}) except Exception as e: RNS.log("Error while getting LXM progress over RPC: "+str(e), RNS.LOG_DEBUG) @@ -4040,10 +4013,7 @@ class SidebandCore(): else: if self.is_client: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - - self.rpc_connection.send({"send_message": { + return self.service_rpc_request({"send_message": { "content": content, "destination_hash": destination_hash, "propagation": propagation, @@ -4053,8 +4023,6 @@ class SidebandCore(): "image": image, "audio": audio} }) - response = self.rpc_connection.recv() - return response except Exception as e: RNS.log("Error while sending message over RPC: "+str(e), RNS.LOG_DEBUG) @@ -4069,17 +4037,12 @@ class SidebandCore(): else: if self.is_client: try: - if self.rpc_connection == None: - self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key) - - self.rpc_connection.send({"send_command": { + return self.service_rpc_request({"send_command": { "content": content, "destination_hash": destination_hash, "propagation": propagation} }) - response = self.rpc_connection.recv() - return response - + except Exception as e: RNS.log("Error while sending command over RPC: "+str(e), RNS.LOG_DEBUG) RNS.trace_exception(e)