Added PTT playback queue and playback lock while recording. Added database operation thread locking.

This commit is contained in:
Mark Qvist 2024-09-02 13:05:32 +02:00
parent 08bf53c998
commit 19c54319a6
3 changed files with 707 additions and 673 deletions

View File

@ -1052,9 +1052,7 @@ class SidebandApp(MDApp):
elif keycode == 40:
self.msg_rec_a_save(None)
elif not self.rec_dialog_is_open:
if not self.messages_view.ids.message_text.focus:
if self.messages_view.ptt_enabled and keycode == 44:
elif not self.rec_dialog_is_open and not self.messages_view.ids.message_text.focus and self.messages_view.ptt_enabled and keycode == 44:
if not self.key_ptt_down:
self.key_ptt_down = True
self.message_ptt_down_action()
@ -1763,6 +1761,9 @@ class SidebandApp(MDApp):
RNS.trace_exception(e)
def message_ptt_down_action(self, sender=None):
if self.sideband.ui_recording:
return
self.sideband.ui_started_recording()
self.audio_msg_mode = LXMF.AM_CODEC2_2400
self.message_attach_action(attach_type="audio", nodialog=True)
@ -1782,7 +1783,9 @@ class SidebandApp(MDApp):
def message_ptt_up_action(self, sender=None):
self.sideband.ui_stopped_recording()
if not self.sideband.ui_recording:
return
self.rec_dialog.recording = False
el_button = self.messages_view.ids.message_ptt_button
el_icon = self.messages_view.ids.message_ptt_button.children[0].children[1]
@ -1792,10 +1795,16 @@ class SidebandApp(MDApp):
el_icon.theme_text_color="Custom"
el_icon.text_color=mdc("BlueGray","500")
def cb_s(dt):
try:
self.msg_audio.stop()
self.message_process_audio()
except Exception as e:
RNS.log("An error occurred while stopping recording: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
self.sideband.ui_stopped_recording()
if self.message_process_audio():
self.message_send_action()
Clock.schedule_once(cb_s, 0.25)
Clock.schedule_once(cb_s, 0.35)
def message_process_audio(self):
if self.audio_msg_mode == LXMF.AM_OPUS_OGG:
@ -1850,7 +1859,9 @@ class SidebandApp(MDApp):
os.unlink(self.msg_audio._file_path)
else:
self.display_codec2_error()
return
return False
return True
def message_init_rec_dialog(self):
ss = int(dp(18))

View File

@ -68,13 +68,21 @@ class AndroidAudio(Audio):
def _stop(self):
if self._recorder:
try:
self._recorder.stop()
self._recorder.release()
except Exception as e:
print("Could not stop recording: "+str(e))
self._recorder = None
if self._player:
try:
self._player.stop()
self._player.release()
except Exception as e:
print("Could not stop playback: "+str(e))
self._player = None
self.is_playing = False

View File

@ -1476,14 +1476,10 @@ class SidebandCore():
pass
else:
if self.is_service:
# TODO: Remove
RNS.log("Indicating recording in service: "+str(recording))
self.ui_recording = recording
return True
else:
try:
# TODO: Remove
RNS.log("Passing recording indication to service: "+str(recording))
if self.rpc_connection == None:
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"set_ui_recording": recording})
@ -1496,8 +1492,6 @@ class SidebandCore():
def getstate(self, prop, allow_cache=False):
with self.state_lock:
if not self.service_stopped:
# TODO: remove
# us = time.time()
if not RNS.vendor.platformutils.is_android():
if prop in self.state_db:
@ -1516,8 +1510,6 @@ class SidebandCore():
self.rpc_connection = multiprocessing.connection.Client(self.rpc_addr, authkey=self.rpc_key)
self.rpc_connection.send({"getstate": prop})
response = self.rpc_connection.recv()
# TODO: Remove
# RNS.log("RPC getstate result for "+str(prop)+"="+str(response)+" in "+RNS.prettytime(time.time()-us), RNS.LOG_WARNING)
return response
except Exception as e:
@ -1721,6 +1713,7 @@ class SidebandCore():
db.commit()
def _db_getpersistent(self, prop):
with self.db_lock:
try:
db = self.__db_connect()
dbc = db.cursor()
@ -1750,13 +1743,16 @@ class SidebandCore():
self.db = None
def _db_setpersistent(self, prop, val):
existing_prop = self._db_getpersistent(prop)
with self.db_lock:
try:
db = self.__db_connect()
dbc = db.cursor()
uprop = prop.encode("utf-8")
bval = msgpack.packb(val)
if self._db_getpersistent(prop) == None:
if existing_prop == None:
try:
query = "INSERT INTO persistent (property, value) values (?, ?)"
data = (uprop, bval)
@ -1780,6 +1776,7 @@ class SidebandCore():
self.db = None
def _db_conversation_update_txtime(self, context_dest, is_retry = False):
with self.db_lock:
try:
db = self.__db_connect()
dbc = db.cursor()
@ -1793,11 +1790,12 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating conversation TX time: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_conversation_update_txtime(context_dest, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_conversation_update_txtime(context_dest, is_retry=True)
def _db_conversation_set_unread(self, context_dest, unread, tx = False, is_retry = False):
with self.db_lock:
try:
db = self.__db_connect()
dbc = db.cursor()
@ -1819,11 +1817,12 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating conversation unread flag: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_conversation_set_unread(context_dest, unread, tx, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_conversation_set_unread(context_dest, unread, tx, is_retry=True)
def _db_telemetry(self, context_dest = None, after = None, before = None, limit = None):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -1878,6 +1877,7 @@ class SidebandCore():
return results
def _db_save_telemetry(self, context_dest, telemetry, physical_link = None, source_dest = None, via = None, is_retry = False):
with self.db_lock:
try:
remote_telemeter = Telemeter.from_packed(telemetry)
read_telemetry = remote_telemeter.read_all()
@ -1947,9 +1947,9 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while commiting telemetry to database: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_save_telemetry(context_dest, telemetry, physical_link, source_dest, via, is_retry = True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_save_telemetry(context_dest, telemetry, physical_link, source_dest, via, is_retry = True)
return
self.setstate("app.flags.last_telemetry", time.time())
@ -1973,6 +1973,7 @@ class SidebandCore():
self.setpersistent("temp.peer_appearance."+RNS.hexrep(context_dest, delimit=False), ae)
else:
with self.db_lock:
data_dict = conv["data"]
if data_dict == None:
data_dict = {}
@ -2052,6 +2053,7 @@ class SidebandCore():
data_dict["telemetry"] = send_telemetry
packed_dict = msgpack.packb(data_dict)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2065,9 +2067,9 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating conversation telemetry options: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_conversation_set_telemetry(context_dest, send_telemetry, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_conversation_set_telemetry(context_dest, send_telemetry, is_retry=True)
def _db_conversation_set_requests(self, context_dest, allow_requests=False, is_retry=False):
conv = self._db_conversation(context_dest)
@ -2078,6 +2080,7 @@ class SidebandCore():
data_dict["allow_requests"] = allow_requests
packed_dict = msgpack.packb(data_dict)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2104,6 +2107,7 @@ class SidebandCore():
data_dict["is_object"] = is_object
packed_dict = msgpack.packb(data_dict)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2117,9 +2121,9 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating conversation object option: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_conversation_set_object(context_dest, is_object, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_conversation_set_object(context_dest, is_object, is_retry=True)
def _db_conversation_set_ptt_enabled(self, context_dest, ptt_enabled=False):
conv = self._db_conversation(context_dest)
@ -2130,6 +2134,7 @@ class SidebandCore():
data_dict["ptt_enabled"] = ptt_enabled
packed_dict = msgpack.packb(data_dict)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2143,11 +2148,12 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating conversation PTT option: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_conversation_set_ptt_enabled(context_dest, ptt_enabled, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_conversation_set_ptt_enabled(context_dest, ptt_enabled, is_retry=True)
def _db_conversation_set_trusted(self, context_dest, trusted):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2161,11 +2167,12 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating conversation trusted option: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_conversation_set_trusted(context_dest, trusted, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_conversation_set_trusted(context_dest, trusted, is_retry=True)
def _db_conversation_set_name(self, context_dest, name):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2178,11 +2185,12 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating conversation name option: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_conversation_set_name(context_dest, name, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_conversation_set_name(context_dest, name, is_retry=True)
def _db_conversations(self, conversations=True, objects=False):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2227,6 +2235,7 @@ class SidebandCore():
return sorted(convs, key=lambda c: c["last_activity"], reverse=True)
def _db_announces(self):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2256,6 +2265,7 @@ class SidebandCore():
return announces
def _db_conversation(self, context_dest):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2281,6 +2291,7 @@ class SidebandCore():
def _db_clear_conversation(self, context_dest):
RNS.log("Clearing conversation with "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2290,6 +2301,7 @@ class SidebandCore():
def _db_clear_telemetry(self, context_dest):
RNS.log("Clearing telemetry for "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2301,6 +2313,7 @@ class SidebandCore():
def _db_delete_conversation(self, context_dest):
RNS.log("Deleting conversation with "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2311,6 +2324,7 @@ class SidebandCore():
def _db_delete_announce(self, context_dest):
RNS.log("Deleting announce with "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2320,6 +2334,7 @@ class SidebandCore():
def _db_create_conversation(self, context_dest, name = None, trust = False):
RNS.log("Creating conversation for "+RNS.prettyhexrep(context_dest), RNS.LOG_DEBUG)
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2340,6 +2355,7 @@ class SidebandCore():
def _db_delete_message(self, msg_hash):
RNS.log("Deleting message "+RNS.prettyhexrep(msg_hash))
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2349,6 +2365,7 @@ class SidebandCore():
def _db_clean_messages(self):
RNS.log("Purging stale messages... "+str(self.db_path))
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2357,6 +2374,7 @@ class SidebandCore():
db.commit()
def _db_message_set_state(self, lxm_hash, state, is_retry=False):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2370,11 +2388,12 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating message state: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_message_set_state(lxm_hash, state, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_message_set_state(lxm_hash, state, is_retry=True)
def _db_message_set_method(self, lxm_hash, method):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2388,14 +2407,15 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while updating message method: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_message_set_method(lxm_hash, method, is_retry=True)
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_message_set_method(lxm_hash, method, is_retry=True)
def message(self, msg_hash):
return self._db_message(msg_hash)
def _db_message(self, msg_hash):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2436,6 +2456,7 @@ class SidebandCore():
return message
def _db_message_count(self, context_dest):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2450,6 +2471,7 @@ class SidebandCore():
return result[0][0]
def _db_messages(self, context_dest, after = None, before = None, limit = None):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2551,6 +2573,7 @@ class SidebandCore():
RNS.log("Received telemetry stream field with no data: "+str(lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]), RNS.LOG_DEBUG)
if own_command or len(lxm.content) != 0 or len(lxm.title) != 0:
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -2596,14 +2619,15 @@ class SidebandCore():
except Exception as e:
RNS.log("An error occurred while saving message to database: "+str(e), RNS.LOG_ERROR)
self.__db_reconnect()
if not is_retry:
RNS.log("Retrying operation...", RNS.LOG_ERROR)
self._db_save_lxm(lxm, context_dest, originator = originator, own_command = own_command, is_retry = True)
return
# if not is_retry:
# RNS.log("Retrying operation...", RNS.LOG_ERROR)
# self._db_save_lxm(lxm, context_dest, originator = originator, own_command = own_command, is_retry = True)
# return
self.__event_conversation_changed(context_dest)
def _db_save_announce(self, destination_hash, app_data, dest_type="lxmf.delivery"):
with self.db_lock:
db = self.__db_connect()
dbc = db.cursor()
@ -3895,6 +3919,7 @@ class SidebandCore():
if not originator and LXMF.FIELD_AUDIO in message.fields and ptt_enabled:
self.ptt_event(message)
should_notify = False
if self.is_client:
should_notify = False
@ -3922,8 +3947,9 @@ class SidebandCore():
RNS.log("Could not post notification for received message: "+str(e), RNS.LOG_ERROR)
def ptt_playback(self, message):
while hasattr(self, "msg_sound") and self.msg_sound != None and self.msg_sound.playing():
RNS.log("Waiting for playback to stop")
ptt_timeout = 60
event_time = time.time()
while hasattr(self, "msg_sound") and self.msg_sound != None and self.msg_sound.playing() and time.time() < event_time+ptt_timeout:
time.sleep(0.1)
time.sleep(0.5)
@ -3979,7 +4005,6 @@ class SidebandCore():
if self.msg_sound != None:
RNS.log("Starting playback", RNS.LOG_DEBUG)
self.msg_sound.play()
should_notify = False
else:
RNS.log("Playback was requested, but no audio data was loaded for playback", RNS.LOG_ERROR)
@ -3990,32 +4015,22 @@ class SidebandCore():
def ptt_event(self, message):
def ptt_job():
try:
# TODO: Remove logs
RNS.log("Taking lock")
self.ptt_playback_lock.acquire()
while self.ui_recording:
RNS.log("Waiting for UI recording to finish")
time.sleep(0.5)
RNS.log("Starting playback")
self.ptt_playback(message)
except Exception as e:
RNS.log("Error while starting playback for PTT-enabled conversation: "+str(e), RNS.LOG_ERROR)
finally:
RNS.log("Releasing lock")
self.ptt_playback_lock.release()
threading.Thread(target=ptt_job, daemon=True).start()
def ui_started_recording(self):
# TODO: Remove
RNS.log("Indicating recording started")
self.ui_recording = True
self.service_rpc_set_ui_recording(True)
def ui_stopped_recording(self):
# TODO: Remove
RNS.log("Indicating recording stopped")
self.ui_recording = False
self.service_rpc_set_ui_recording(False)