import os import io import math import time import struct import numpy as np import RNS import LXMF if RNS.vendor.platformutils.is_android(): import pyogg from pydub import AudioSegment else: if RNS.vendor.platformutils.is_linux(): from sbapp.pyogg import OpusFile, OpusBufferedEncoder, OggOpusWriter else: from pyogg import OpusFile, OpusBufferedEncoder, OggOpusWriter from sbapp.pydub import AudioSegment codec2_modes = { # LXMF.AM_CODEC2_450PWB: ???, # No bindings # LXMF.AM_CODEC2_450: ???, # No bindings LXMF.AM_CODEC2_700C: 700, LXMF.AM_CODEC2_1200: 1200, LXMF.AM_CODEC2_1300: 1300, LXMF.AM_CODEC2_1400: 1400, LXMF.AM_CODEC2_1600: 1600, LXMF.AM_CODEC2_2400: 2400, LXMF.AM_CODEC2_3200: 3200, } def samples_from_ogg(file_path=None): if file_path != None and os.path.isfile(file_path): opus_file = OpusFile(file_path) audio = AudioSegment( bytes(opus_file.as_array()), frame_rate=opus_file.frequency, sample_width=opus_file.bytes_per_sample, channels=opus_file.channels) audio = audio.split_to_mono()[0] audio = audio.apply_gain(-audio.max_dBFS) audio = audio.set_frame_rate(8000) audio = audio.set_sample_width(2) return audio.get_array_of_samples() def samples_to_ogg(samples=None, file_path=None): try: if file_path != None and samples != None: pcm_data = io.BytesIO(samples) RNS.log(f"Samples: {len(samples)}") RNS.log(f"Type : {type(samples)}") channels = 1; samples_per_second = 8000; bytes_per_sample = 2 opus_buffered_encoder = OpusBufferedEncoder() opus_buffered_encoder.set_application("audio") opus_buffered_encoder.set_sampling_frequency(samples_per_second) opus_buffered_encoder.set_channels(channels) opus_buffered_encoder.set_frame_size(20) # milliseconds ogg_opus_writer = OggOpusWriter(file_path, opus_buffered_encoder) frame_duration = 0.020 frame_size = int(frame_duration * samples_per_second) bytes_per_frame = frame_size*bytes_per_sample read_bytes = 0 written_bytes = 0 while True: pcm = pcm_data.read(bytes_per_frame) if len(pcm) == 0: break else: read_bytes += len(pcm) ogg_opus_writer.write(memoryview(bytearray(pcm))) written_bytes += len(pcm) ogg_opus_writer.close() RNS.log(f"Read {read_bytes} bytes") RNS.log(f"Wrote {written_bytes} bytes") return True except Exception as e: RNS.trace_exception(e) return False def samples_to_wav(samples=None, file_path=None): if samples != None and file_path != None: import wave with wave.open(file_path, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(8000) wf.writeframes(samples) return True # Samples must be 8KHz, 16-bit, 1 channel def encode_codec2(samples, mode): ap_start = time.time() import pycodec2 if not mode in codec2_modes: return None c2 = pycodec2.Codec2(codec2_modes[mode]) SPF = c2.samples_per_frame() PACKET_SIZE = SPF * 2 # 16-bit samples STRUCT_FORMAT = '{}h'.format(SPF) F_FRAMES = len(samples)/SPF N_FRAMES = math.floor(len(samples)/SPF) # TODO: Add padding to align to whole frames frames = np.array(samples[0:N_FRAMES*SPF], dtype=np.int16) encoded = b"" for pi in range(0, N_FRAMES): pstart = pi*SPF pend = (pi+1)*SPF frame = frames[pstart:pend] encoded_packet = c2.encode(frame) encoded += encoded_packet ap_duration = time.time() - ap_start RNS.log("Codec2 encoding complete in "+RNS.prettytime(ap_duration)+", bytes out: "+str(len(encoded)), RNS.LOG_DEBUG) return encoded def decode_codec2(encoded_bytes, mode): ap_start = time.time() import pycodec2 if not mode in codec2_modes: return None c2 = pycodec2.Codec2(codec2_modes[mode]) SPF = c2.samples_per_frame() BPF = c2.bytes_per_frame() STRUCT_FORMAT = '{}h'.format(SPF) N_FRAMES = math.floor(len(encoded_bytes)/BPF) decoded = b"" for pi in range(0, N_FRAMES): pstart = pi*BPF pend = (pi+1)*BPF encoded_packet = encoded_bytes[pstart:pend] decoded_frame = c2.decode(encoded_packet) decoded += struct.pack(STRUCT_FORMAT, *decoded_frame) ap_duration = time.time() - ap_start RNS.log("Codec2 decoding complete in "+RNS.prettytime(ap_duration)+", samples out: "+str(len(decoded)), RNS.LOG_DEBUG) return decoded