import importlib import math import struct import pyogg import os import numpy as np from scipy.signal import butter, filtfilt def float32_to_int16(data_float32): data_int16 = (data_float32 * 32767).astype(np.int16) return data_int16 def int16_to_float32(data_int16): data_float32 = data_int16.astype(np.float32) / 32767.0 return data_float32 class DualOpusEncoder: def __init__(self, app="restricted_lowdelay", samplerate=48000, version="stable"): """ ----------------------------- version-------------------------- hev2: libopus 1.5.1 (fre:ac) exper: libopus 1.5.1 stable: libopus 1.4 old: libopus 1.3.1 custom: custom opus path you can use "pyogg_win_libopus_custom_path" env to change opus version (windows only) ------------------------- App---------------------------------- Set the encoding mode. This must be one of 'voip', 'audio', or 'restricted_lowdelay'. 'voip': Gives best quality at a given bitrate for voice signals. It enhances the input signal by high-pass filtering and emphasizing formants and harmonics. Optionally it includes in-band forward error correction to protect against packet loss. Use this mode for typical VoIP applications. Because of the enhancement, even at high bitrates the output may sound different from the input. 'audio': Gives best quality at a given bitrate for most non-voice signals like music. Use this mode for music and mixed (music/voice) content, broadcast, and applications requiring less than 15 ms of coding delay. 'restricted_lowdelay': configures low-delay mode that disables the speech-optimized mode in exchange for slightly reduced delay. This mode can only be set on an newly initialized encoder because it changes the codec delay. """ self.version = version self.samplerate = samplerate self.stereomode = 1 #0 = mono, 1 = Stereo LR, 2 = Stereo Mid/Side, 3 = Stereo Intensity self.automonogate = -50 self.automono = False self.msmono = False self.overallbitrate = 0 self.secbitrate = 0 self.intensity = 1 self.bitratemode = 1 # 0 = CBR, 1 = CVBR, 2 = VBR os.environ["pyogg_win_libopus_version"] = version importlib.reload(pyogg.opus) self.Lencoder = pyogg.OpusBufferedEncoder() self.Rencoder = pyogg.OpusBufferedEncoder() self.Lencoder.set_application(app) self.Rencoder.set_application(app) self.Lencoder.set_sampling_frequency(samplerate) self.Rencoder.set_sampling_frequency(samplerate) self.Lencoder.set_channels(1) self.Rencoder.set_channels(1) self.set_frame_size() self.set_compression() self.set_feature() self.set_bitrate_mode() self.set_bitrates() self.set_bandwidth() self.set_packet_loss() def set_compression(self, level=10): """complex 0-10 low-hires""" self.Lencoder.set_compresion_complex(level) self.Rencoder.set_compresion_complex(level) def set_bitrates(self, bitrates=64000, samebitrate=False, balance_percent=None): """ input birate unit: bps balance_percent is working good with M/S stereo """ if bitrates <= 5000: bitrates = 5000 if balance_percent is None: if self.stereomode == 0: balance_percent = 100 elif self.stereomode == 2: balance_percent = 75 else: balance_percent = 50 self.overallbitrate = bitrates if samebitrate: self.Lencoder.set_bitrates(int(bitrates)) self.Rencoder.set_bitrates(int(bitrates)) else: percentage_decimal = balance_percent / 100 bitratech1 = round(bitrates * percentage_decimal) bitratech2 = bitrates - bitratech1 if bitratech1 < 2500: bitratech1 = 2500 if bitratech2 < 2500: self.msmono = True bitratech2 = 2500 else: self.msmono = False self.secbitrate = bitratech1 self.Lencoder.set_bitrates(int(bitratech1)) self.Rencoder.set_bitrates(int(bitratech2)) def set_bandwidth(self, bandwidth="fullband"): """ narrowband: Narrowband typically refers to a limited range of frequencies suitable for voice communication. mediumband (unsupported in libopus 1.3+): Mediumband extends the frequency range compared to narrowband, providing better audio quality. wideband: Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband. superwideband: Superwideband extends the frequency range beyond wideband, further enhancing audio quality. fullband (default): Fullband provides the widest frequency range among the listed options, offering the highest audio quality. auto: opus is working auto not force """ self.Lencoder.set_bandwidth(bandwidth) self.Rencoder.set_bandwidth(bandwidth) def set_stereo_mode(self, mode=1, automono=False, automonogate=-50, intensity=1, changebitratesbalance=False): """ 0 = mono (not recommend) 1 = stereo LR 2 = stereo Mid/Side 3 = Intensity """ if mode > 2: mode = 1 self.stereomode = mode self.automono = automono self.automonogate = automonogate self.intensity = intensity if changebitratesbalance: self.set_bitrates(self.overallbitrate) def set_frame_size(self, size=60, nocheck=False): """ Set the desired frame duration (in milliseconds). Valid options are 2.5, 5, 10, 20, 40, or 60ms. Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms. @return chunk size """ if self.version != "hev2" and size > 60: raise ValueError("non hev2 can't use framesize > 60") self.Lencoder.set_frame_size(size, nocheck) self.Rencoder.set_frame_size(size, nocheck) return int((size / 1000) * self.samplerate) def set_packet_loss(self, loss=0): """input: % percent""" if loss > 100: raise ValueError("percent must <=100") self.Lencoder.set_packets_loss(loss) self.Rencoder.set_packets_loss(loss) def set_bitrate_mode(self, mode="CVBR"): """VBR, CVBR, CBR VBR in 1.5.x replace by CVBR """ if mode.lower() == "cbr": self.bitratemode = 0 elif mode.lower() == "cvbr": self.bitratemode = 1 elif mode.lower() == "vbr": self.bitratemode = 2 else: raise ValueError(f"No {mode} bitrate mode option") self.Lencoder.set_bitrate_mode(mode) self.Rencoder.set_bitrate_mode(mode) def set_feature(self, prediction=False, phaseinvert=False, DTX=False): self.Lencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction)) self.Lencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert)) self.Lencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) self.Rencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction)) self.Rencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert)) self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) def enable_voice_mode(self, enable=True, auto=False): self.Lencoder.enable_voice_enhance(enable, auto) self.Rencoder.enable_voice_enhance(enable, auto) def encode(self, pcmbytes, directpcm=False): """input: pcm bytes accept float32/int16 only x74 is mono x75 is stereo LR x76 is stereo mid/side xnl is no side audio """ if directpcm: if pcmbytes.dtype == np.float32: pcm = (pcmbytes * 32767).astype(np.int16) elif pcmbytes.dtype == np.int16: pcm = pcmbytes.astype(np.int16) else: raise TypeError("accept only int16/float32") else: pcm = np.frombuffer(pcmbytes, dtype=np.int16) if self.stereomode == 0: # mono left_channel = pcm[::2] right_channel = pcm[1::2] mono = (left_channel + right_channel) / 2 intmono = float32_to_int16(mono) midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(intmono)), flush=True)[0][0].tobytes() dual_encoded_packet = (midencoded_packet + b'\\x64\\x74') elif self.stereomode == 2: # stereo mid/side (Joint encoding) # convert to float32 pcm = int16_to_float32(pcm) left_channel = pcm[::2] right_channel = pcm[1::2] mid = (left_channel + right_channel) / 2 side = (left_channel - right_channel) / 2 # convert back to int16 mid = float32_to_int16(mid) intside = float32_to_int16(side) # check if side is no audio or loudness <= -50 DBFS try: loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side)))) except: loudnessside = 0 if (loudnessside) <= self.automonogate and self.automono or self.msmono: sideencoded_packet = b"\\xnl" if self.bitratemode == 0: # CBR self.Lencoder.set_bitrates(int(self.overallbitrate - 300)) else: self.Lencoder.set_bitrates(int(self.secbitrate)) sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes() midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes() dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet) elif self.stereomode == 3: # stereo intensity (Joint encoding) left_channel = pcm[:, 0] right_channel = pcm[:, 1] IRChannel = left_channel + self.intensity * (right_channel - left_channel) Lencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() IRencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(IRChannel)), flush=True)[0][0].tobytes() dual_encoded_packet = (Lencoded_packet + b'\\x64\\x77' + IRencoded_packet) else: # stereo LR left_channel = pcm[::2] right_channel = pcm[1::2] Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][0].tobytes() dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet) return dual_encoded_packet class PSOpusEncoder: def __init__(self, app="restricted_lowdelay", samplerate=48000, version="stable"): """ This version is xHE-Opus v2 (Parametric Stereo) ----------------------------- version-------------------------- hev2: libopus 1.5.1 (fre:ac) exper: libopus 1.5.1 stable: libopus 1.4 old: libopus 1.3.1 custom: custom opus path you can use "pyogg_win_libopus_custom_path" env to change opus version (windows only) ------------------------- App---------------------------------- Set the encoding mode. This must be one of 'voip', 'audio', or 'restricted_lowdelay'. 'voip': Gives best quality at a given bitrate for voice signals. It enhances the input signal by high-pass filtering and emphasizing formants and harmonics. Optionally it includes in-band forward error correction to protect against packet loss. Use this mode for typical VoIP applications. Because of the enhancement, even at high bitrates the output may sound different from the input. 'audio': Gives best quality at a given bitrate for most non-voice signals like music. Use this mode for music and mixed (music/voice) content, broadcast, and applications requiring less than 15 ms of coding delay. 'restricted_lowdelay': configures low-delay mode that disables the speech-optimized mode in exchange for slightly reduced delay. This mode can only be set on an newly initialized encoder because it changes the codec delay. """ self.version = version self.samplerate = samplerate os.environ["pyogg_win_libopus_version"] = version importlib.reload(pyogg.opus) self.encoder = pyogg.OpusBufferedEncoder() self.encoder.set_application(app) self.encoder.set_sampling_frequency(samplerate) self.encoder.set_channels(1) self.set_frame_size() self.set_compression() self.set_feature() self.set_bitrate_mode() self.set_bitrates() self.set_bandwidth() self.set_packet_loss() def set_compression(self, level=10): """complex 0-10 low-hires""" self.encoder.set_compresion_complex(level) def set_bitrates(self, bitrates=64000): """input birate unit: bps""" if bitrates <= 2500: bitrates = 2500 self.encoder.set_bitrates(bitrates) def set_bandwidth(self, bandwidth="fullband"): """ narrowband: Narrowband typically refers to a limited range of frequencies suitable for voice communication. mediumband (unsupported in libopus 1.3+): Mediumband extends the frequency range compared to narrowband, providing better audio quality. wideband: Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband. superwideband: Superwideband extends the frequency range beyond wideband, further enhancing audio quality. fullband (default): Fullband provides the widest frequency range among the listed options, offering the highest audio quality. auto: opus is working auto not force """ self.encoder.set_bandwidth(bandwidth) def set_frame_size(self, size=60): """ Set the desired frame duration (in milliseconds). Valid options are 2.5, 5, 10, 20, 40, or 60ms. Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms. @return chunk size """ if self.version != "hev2" and size > 60: raise ValueError("non hev2 can't use framesize > 60") self.encoder.set_frame_size(size) return int((size / 1000) * self.samplerate) def set_packet_loss(self, loss=0): """input: % percent""" if loss > 100: raise ValueError("percent must <=100") self.encoder.set_packets_loss(loss) def set_bitrate_mode(self, mode="CVBR"): """VBR, CVBR, CBR VBR in 1.5.x replace by CVBR """ self.encoder.set_bitrate_mode(mode) def set_feature(self, prediction=False, phaseinvert=False, DTX=False): self.encoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction)) self.encoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert)) self.encoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) def enable_voice_mode(self, enable=True, auto=False): self.encoder.enable_voice_enhance(enable, auto) def __parameterization(self, stereo_signal): # Convert int16 to float32 for processing stereo_signal = stereo_signal.astype(np.float32) / 32768.0 # Reshape stereo_signal into a 2D array with two channels stereo_signal = stereo_signal.reshape((-1, 2)) # Calculate the magnitude spectrogram for each channel mag_left = np.abs(np.fft.fft(stereo_signal[:, 0])) mag_right = np.abs(np.fft.fft(stereo_signal[:, 1])) # Calculate the phase difference between the left and right channels phase_diff = np.angle(stereo_signal[:, 0]) - np.angle(stereo_signal[:, 1]) # Compute other spatial features # Calculate stereo width stereo_width = np.mean(np.correlate(mag_left, mag_right, mode='full')) # Calculate phase coherence phase_coherence = np.mean(np.cos(phase_diff)) # Calculate stereo panning stereo_panning_left = np.mean(mag_left / (mag_left + mag_right)) stereo_panning_right = np.mean(mag_right / (mag_left + mag_right)) pan = stereo_panning_right - stereo_panning_left # Return the derived parameters return (int(stereo_width), phase_coherence, pan) def encode(self, pcmbytes, directpcm=False): """input: pcm bytes accept float32/int16 only x74 is mono x75 is stereo LR x76 is stereo mid/side xnl is no side audio """ if directpcm: if pcmbytes.dtype == np.float32: pcm = (pcmbytes * 32767).astype(np.int16) elif pcmbytes.dtype == np.int16: pcm = pcmbytes.astype(np.int16) else: raise TypeError("accept only int16/float32") else: pcm = np.frombuffer(pcmbytes, dtype=np.int16) pcmreshaped = pcm.reshape(-1, 2) mono_data = np.mean(pcmreshaped * 0.5, axis=1, dtype=np.int16) stereodata = self.__parameterization(pcmreshaped) packedstereodata = struct.pack('iff', *stereodata) encoded_packet = self.encoder.buffered_encode(memoryview(bytearray(mono_data)), flush=True)[0][0].tobytes() encoded_packet = (encoded_packet + b'\\x21\\x75' + packedstereodata) return encoded_packet class xOpusDecoder: def __init__(self, sample_rate=48000): self.Ldecoder = pyogg.OpusDecoder() self.Rdecoder = pyogg.OpusDecoder() self.Ldecoder.set_channels(1) self.Rdecoder.set_channels(1) self.Ldecoder.set_sampling_frequency(sample_rate) self.Rdecoder.set_sampling_frequency(sample_rate) self.__prev_pan = 0.0 self.__prev_max_amplitude = 0.0 def __smooth(self, value, prev_value, alpha=0.1): return alpha * value + (1 - alpha) * prev_value def __expand_and_pan(self, input_signal, pan_value, expansion_factor, gain): """ Apply stereo expansion and panning to an input audio signal. Parameters: - input_signal: Input audio signal (numpy array of int16). - expansion_factor: Factor to expand the stereo width (0 to 1). - pan_value: Pan value (-1 to 1, where -1 is full left, 1 is full right). - gain: Gain factor to adjust the volume. Returns: - output_signal: Processed audio signal (stereo, numpy array of int16). """ # Convert int16 to float32 for processing input_signal_float = input_signal.astype(np.float32) / 32768.0 # Separate the channels left_channel = input_signal_float[:, 0] right_channel = input_signal_float[:, 1] # Apply panning pan_left = (1 - pan_value) / 2 pan_right = (1 + pan_value) / 2 left_channel *= pan_left right_channel *= pan_right # Apply stereo expansion center = (left_channel + right_channel) / 2 left_channel = center + (left_channel - center) * expansion_factor right_channel = center + (right_channel - center) * expansion_factor # Apply gain left_channel *= gain right_channel *= gain # Ensure no clipping by normalizing if necessary max_val = max(np.max(np.abs(left_channel)), np.max(np.abs(right_channel))) if max_val > 1.0: left_channel /= max_val right_channel /= max_val # Merge the channels output_signal = np.stack((left_channel, right_channel), axis=-1) return (output_signal * 32767).astype(np.int16) def __mix_stereo_signals(self, signal1, signal2, volume1=1.0, volume2=1.0): # Ensure both signals have the same length length = max(len(signal1), len(signal2)) signal1 = np.pad(signal1, ((0, length - len(signal1)), (0, 0)), mode='constant') signal2 = np.pad(signal2, ((0, length - len(signal2)), (0, 0)), mode='constant') # Convert signals to float signal1 = signal1.astype(np.float32) signal2 = signal2.astype(np.float32) # Adjust volume signal1 *= volume1 signal2 *= volume2 # Mix the signals mixed_signal = signal1 + signal2 # Normalize the mixed signal to prevent clipping max_amplitude = np.max(np.abs(mixed_signal)) if max_amplitude > 32767: mixed_signal = (mixed_signal / max_amplitude) * 32767 return mixed_signal.astype(np.int16) def __apply_smoothing_window(self, audio_data, window_size): """ Apply a smoothing window to the beginning and end of the audio data. Parameters: - audio_data: 2D numpy array with shape (num_samples, 2) - window_size: Size of the smoothing window in samples Returns: - smoothed_audio_data: 2D numpy array with the smoothing window applied """ window = np.hanning(window_size * 2) fade_in = window[:window_size] fade_out = window[-window_size:] audio_data[:window_size, :] *= fade_in[:, np.newaxis] audio_data[-window_size:, :] *= fade_out[:, np.newaxis] return audio_data def __stereo_widening_effect(self, data, delay_samples=10, gain=0.8, window_size=100): audio_data = data.reshape(-1, 2) # Convert int16 to float32 for processing audio_data = audio_data.astype(np.float32) # Apply delay to the right channel right_channel = np.roll(audio_data[:, 1], delay_samples) # Apply gain to both channels audio_data[:, 0] *= gain right_channel *= gain # Combine channels back into stereo widened_audio_data = np.stack((audio_data[:, 0], right_channel), axis=1) # Apply smoothing window to reduce clicks widened_audio_data = self.__apply_smoothing_window(widened_audio_data, window_size) # Clip to avoid overflow widened_audio_data = np.clip(widened_audio_data, -32768, 32767) # Convert float32 back to int16 widened_audio_data = widened_audio_data.astype(np.int16) return widened_audio_data def __apply_phase_coherence_to_stereo(self, signal, phase_coherence): # Convert phase coherence to phase shift in radians phase_shift = np.arccos(phase_coherence) # Apply phase shift to both channels return self.__apply_phase_shift(signal, phase_shift) # Function to apply phase shift to one channel def __apply_phase_shift(self, signal, phase_shift): # Convert to complex signal_complex = signal.astype(np.complex64) # Apply phase shift shifted_signal = signal_complex * np.exp(1j * phase_shift) return shifted_signal.astype(np.int16) def __synthstereo(self, mono_signal, stereodata): pan = stereodata[2] # Smooth the pan value pan = self.__smooth(pan, self.__prev_pan, alpha=0.25) self.__prev_pan = pan stereo_exp = stereodata[0] / 10000 try: delayed = self.__stereo_widening_effect(mono_signal, int(stereo_exp), 1, int(stereo_exp) * 2) except: delayed = mono_signal l1 = self.__expand_and_pan(mono_signal, pan, 1, 2) stereo_signal_shifted = self.__apply_phase_coherence_to_stereo(delayed, stereodata[1]) return self.__mix_stereo_signals(l1, stereo_signal_shifted, volume1=1, volume2=0.5).astype(np.int16) def decode(self, dualopusbytes: bytes, outputformat=np.int16): # mode check if b"\\x64\\x74" in dualopusbytes: mode = 0 xopusbytespilted = dualopusbytes.split(b'\\x64\\x74') elif b"\\x64\\x76" in dualopusbytes: mode = 2 xopusbytespilted = dualopusbytes.split(b'\\x64\\x76') elif b"\\x64\\x75" in dualopusbytes: mode = 1 xopusbytespilted = dualopusbytes.split(b'\\x64\\x75') elif b"\\x64\\x77" in dualopusbytes: mode = 4 xopusbytespilted = dualopusbytes.split(b'\\x64\\x77') elif b"\\x21\\x75" in dualopusbytes: mode = 3 # v2 xopusbytespilted = dualopusbytes.split(b'\\x21\\x75') else: raise TypeError("this is not xopus bytes") if mode == 0: # mono Mencoded_packet = xopusbytespilted[0] decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) stereo_signal = np.column_stack((Mpcm, Mpcm)) elif mode == 2: # stereo mid/side (Joint encoding) Mencoded_packet = xopusbytespilted[0] Sencoded_packet = xopusbytespilted[1] decoded_mid_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) Mpcm = np.frombuffer(decoded_mid_channel_pcm, dtype=np.int16) if Sencoded_packet != b"\\xnl": decoded_side_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Sencoded_packet))) Spcm = np.frombuffer(decoded_side_channel_pcm, dtype=np.int16) Mpcm = int16_to_float32(Mpcm) Spcm = int16_to_float32(Spcm) L = Mpcm + Spcm R = Mpcm - Spcm stereo_signal = np.column_stack((L, R)) #max_amplitude = np.max(np.abs(stereo_signal)) #if max_amplitude > 1.0: # stereo_signal /= max_amplitude stereo_signal = np.clip(stereo_signal, -1, 1) stereo_signal = float32_to_int16(stereo_signal) else: stereo_signal = np.column_stack((Mpcm, Mpcm)) elif mode == 4: # stereo intensity Lencoded_packet = xopusbytespilted[0] IRencoded_packet = xopusbytespilted[1] decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) decoded_intensity_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(IRencoded_packet))) Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) IRpcm = np.frombuffer(decoded_intensity_right_channel_pcm, dtype=np.int16) recovered_right = Lpcm + (IRpcm - Lpcm) / 1 stereo_signal = np.column_stack((Lpcm, recovered_right)) elif mode == 3: # Parametric Stereo Mencoded_packet = xopusbytespilted[0] stereodatapacked = xopusbytespilted[1] stereodata = struct.unpack('iff', stereodatapacked) mono_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) Mpcm = np.frombuffer(mono_channel_pcm, dtype=np.int16) stereo_audio = np.stack((Mpcm, Mpcm)).T.reshape(-1, 2) stereo_signal = self.__synthstereo(stereo_audio, stereodata) else: # stereo LR Lencoded_packet = xopusbytespilted[0] Rencoded_packet = xopusbytespilted[1] decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=np.int16) stereo_signal = np.column_stack((Lpcm, Rpcm)) return stereo_signal.astype(outputformat).tobytes() class HeaderContainer: def __init__(self, capture_pattern, version, metadata): self.capture_pattern = capture_pattern self.version = version self.metadata = metadata def serialize(self): header = struct.pack('<4sB', self.capture_pattern, self.version) metadata_bytes = self.serialize_metadata() return header + metadata_bytes def serialize_metadata(self): metadata_bytes = b'' for key, value in self.metadata.items(): key_bytes = key.encode('utf-8') value_bytes = value.encode('utf-8') if isinstance(value, str) else str(value).encode('utf-8') metadata_bytes += struct.pack(f'