diff --git a/createnewformat.reg b/createnewformat.reg index a5c57da..5945a01 100644 Binary files a/createnewformat.reg and b/createnewformat.reg differ diff --git a/gui.py b/gui.py index acc3c18..aa72ab9 100644 --- a/gui.py +++ b/gui.py @@ -47,6 +47,14 @@ class App: if int(dpg.get_value("opusframesize")) > 60: dpg.configure_item("opusframesize", default_value="60") + def changeprofileopus(self, sender, data): + if data == "xHE-Opus v2": + dpg.configure_item("opusstereomode", show=False) + dpg.configure_item("opusbitrate", min_value=2.5, max_value=510, min_clamped=True, max_clamped=True, step_fast=1, default_value=64) + else: + dpg.configure_item("opusstereomode", show=True) + dpg.configure_item("opusbitrate", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64) + def selectdeoutputpath(self, sender, data): file_path = easygui.diropenbox() dpg.set_value("deoutpathshow", f"output: {file_path}") @@ -80,6 +88,8 @@ class App: dpg.configure_item("deplayconvert", show=False) def convert(self): + signaltype = str(dpg.get_value("opussignaltype")).lower() + profile = str(dpg.get_value("opusprofile")).strip().lower() stereomode = str(dpg.get_value("opusstereomode")).lower() if stereomode == "stereo l/r": stereomode = 1 @@ -88,20 +98,40 @@ class App: else: stereomode = 2 + if signaltype == "music": + signalauto = False + signalvoice = False + elif signaltype == "voice": + signalauto = False + signalvoice = True + else: + signalauto = True + signalvoice = False + try: total = 0 current = 0 filename = os.path.splitext(os.path.basename(self.inputfilepath))[0] dpg.set_value("convertstatus", "init encoder...") - encoder = libxheopus.DualOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion")) + print(profile) + if profile == "xhe-opus v1": + encoder = libxheopus.DualOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion")) + else: + encoder = libxheopus.PSOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion")) + encoder.set_bitrate_mode(dpg.get_value("opusbitmode")) encoder.set_bandwidth(dpg.get_value("opusbandwidth")) encoder.set_bitrates(int(dpg.get_value("opusbitrate")*1000)) encoder.set_compression(dpg.get_value("opuscompression")) encoder.set_packet_loss(dpg.get_value("opuspacketloss")) - encoder.set_stereo_mode(stereomode, dpg.get_value("opusenajoint")) + + if profile != "xhe-opus v2": + encoder.set_stereo_mode(stereomode, dpg.get_value("opusenajoint")) + encoder.set_feature(dpg.get_value("opusenapred"), False, dpg.get_value("opusenadtx")) + encoder.enable_voice_mode(signalvoice, signalauto) + desired_frame_size = encoder.set_frame_size(int(dpg.get_value("opusframesize"))) dpg.set_value("convertstatus", "init writer...") @@ -168,7 +198,7 @@ class App: self.delen = metadata["footer"]["length"] def playaudiothread(self): - self.decoder = libxheopus.DualOpusDecoder() + self.decoder = libxheopus.xOpusDecoder() for data in self.derender.decode(self.decoder, True, self.depausepos): if self.deplay: @@ -229,7 +259,7 @@ class App: outwav.setsampwidth(2) # 2 bytes (16 bits) per sample outwav.setframerate(48000) - self.decoder = libxheopus.DualOpusDecoder() + self.decoder = libxheopus.xOpusDecoder() for data in self.derender.decode(self.decoder, True, self.depausepos): self.decurrentplay += 1 @@ -260,18 +290,20 @@ class App: dpg.add_text("output:", tag="outpathshow") dpg.add_button(label="Select Input File", callback=self.selectinputfile) dpg.add_button(label="Select Output Path", callback=self.selectoutputpath) + dpg.add_combo(["xHE-Opus v1", "xHE-Opus v2"], label="Profile", default_value="xHE-Opus v1", tag="opusprofile", callback=self.changeprofileopus) dpg.add_combo(["hev2", "exper", "stable", "old"], label="Version", default_value="hev2", tag="opusversion", callback=self.changeversionopus) dpg.add_combo(["120", "100", "80", "60", "40", "20", "10", "5"], label="Frame Size (ms)", tag="opusframesize", default_value="120") dpg.add_combo(["voip", "audio", "restricted_lowdelay"], label="Application", default_value="restricted_lowdelay", tag="opusapp") dpg.add_combo(["VBR", "CVBR", "CBR"], label="Bitrate Mode", default_value="CVBR", tag="opusbitmode") dpg.add_combo(["auto", "fullband", "superwideband", "wideband", "mediumband", "narrowband"], label="Bandwidth", tag="opusbandwidth", default_value="fullband") dpg.add_combo(["Stereo L/R", "Stereo Mid/Side"], label="Stereo Mode", tag="opusstereomode", default_value="Stereo L/R") + dpg.add_combo(["Auto", "Voice", "Music"], label="Signal Type", tag="opussignaltype", default_value="Auto") dpg.add_input_float(label="Bitrates", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64, tag="opusbitrate") dpg.add_input_int(label="Compression Level", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=10, tag="opuscompression") dpg.add_input_int(label="Packet Loss", max_clamped=True, min_clamped=True, min_value=0, max_value=100, default_value=0, tag="opuspacketloss") dpg.add_checkbox(label="Prediction", tag="opusenapred") dpg.add_checkbox(label="DTX", tag="opusenadtx") - dpg.add_checkbox(label="Joint", tag="opusenajoint") + dpg.add_checkbox(label="Auto Mono (Mid/Side Encoding)", tag="opusenajoint") dpg.add_button(label="Convert", callback=self.startconvert) with dpg.window(label="converting", show=False, tag="convertingwindow", modal=True, no_resize=True, no_move=True, no_title_bar=True, width=320): diff --git a/icon.ico b/icon.ico new file mode 100644 index 0000000..a3227cc Binary files /dev/null and b/icon.ico differ diff --git a/icon.png b/icon.png new file mode 100644 index 0000000..d5171ea Binary files /dev/null and b/icon.png differ diff --git a/icon.svg b/icon.svg new file mode 100644 index 0000000..2a5096c --- /dev/null +++ b/icon.svg @@ -0,0 +1,280 @@ + + + + + + + + + + diff --git a/libxheopus.py b/libxheopus.py index 8196f40..e2abc8b 100644 --- a/libxheopus.py +++ b/libxheopus.py @@ -4,6 +4,7 @@ import struct import pyogg import os import numpy as np +from scipy.signal import butter, filtfilt def float32_to_int16(data_float32): data_int16 = (data_float32 * 32767).astype(np.int16) @@ -51,7 +52,8 @@ class DualOpusEncoder: self.version = version self.samplerate = samplerate self.stereomode = 1 #0 = mono, 1 = stereo LR, 2 = stereo Mid/Side - self.enablejoint = False + self.audiomono = False + os.environ["pyogg_win_libopus_version"] = version importlib.reload(pyogg.opus) @@ -97,20 +99,25 @@ class DualOpusEncoder: """ narrowband: Narrowband typically refers to a limited range of frequencies suitable for voice communication. + mediumband (unsupported in libopus 1.3+): Mediumband extends the frequency range compared to narrowband, providing better audio quality. + wideband: Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband. + superwideband: Superwideband extends the frequency range beyond wideband, further enhancing audio quality. + fullband (default): Fullband provides the widest frequency range among the listed options, offering the highest audio quality. + auto: opus is working auto not force """ self.Lencoder.set_bandwidth(bandwidth) self.Rencoder.set_bandwidth(bandwidth) - def set_stereo_mode(self, mode=1, enablejoint=False): + def set_stereo_mode(self, mode=1, audiomono=False): """ 0 = mono 1 = stereo LR @@ -120,7 +127,7 @@ class DualOpusEncoder: mode = 1 self.stereomode = mode - self.enablejoint = enablejoint + self.audiomono = audiomono def set_frame_size(self, size=60): """ Set the desired frame duration (in milliseconds). @@ -162,6 +169,10 @@ class DualOpusEncoder: self.Rencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert)) self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) + def enable_voice_mode(self, enable=True, auto=False): + self.Lencoder.enable_voice_enhance(enable, auto) + self.Rencoder.enable_voice_enhance(enable, auto) + def encode(self, pcmbytes, directpcm=False): """input: pcm bytes accept float32/int16 only x74 is mono @@ -188,9 +199,9 @@ class DualOpusEncoder: intmono = float32_to_int16(mono) - Mencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(intmono)), flush=True)[0][0].tobytes() + midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(intmono)), flush=True)[0][0].tobytes() - dual_encoded_packet = (Mencoded_packet + b'\\x64\\x74') + dual_encoded_packet = (midencoded_packet + b'\\x64\\x74') elif self.stereomode == 2: # stereo mid/side (Joint encoding) # convert to float32 @@ -214,7 +225,7 @@ class DualOpusEncoder: except: loudnessside = 0 - if (loudnessside) <= -50 and self.enablejoint: + if (loudnessside) <= -50 and self.audiomono: sideencoded_packet = b"\\xnl" else: sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes() @@ -232,7 +243,194 @@ class DualOpusEncoder: return dual_encoded_packet -class DualOpusDecoder: +class PSOpusEncoder: + def __init__(self, app="audio", samplerate=48000, version="stable"): + """ + This version is xHE-Opus v2 (Parametric Stereo) + ----------------------------- version-------------------------- + hev2: libopus 1.5.1 (fre:ac) + exper: libopus 1.5.1 + stable: libopus 1.4 + old: libopus 1.3.1 + custom: custom opus path you can use "pyogg_win_libopus_custom_path" env to change opus version (windows only) + ------------------------- App---------------------------------- + + Set the encoding mode. + + This must be one of 'voip', 'audio', or 'restricted_lowdelay'. + + 'voip': Gives best quality at a given bitrate for voice + signals. It enhances the input signal by high-pass + filtering and emphasizing formants and + harmonics. Optionally it includes in-band forward error + correction to protect against packet loss. Use this mode + for typical VoIP applications. Because of the enhancement, + even at high bitrates the output may sound different from + the input. + + 'audio': Gives best quality at a given bitrate for most + non-voice signals like music. Use this mode for music and + mixed (music/voice) content, broadcast, and applications + requiring less than 15 ms of coding delay. + + 'restricted_lowdelay': configures low-delay mode that + disables the speech-optimized mode in exchange for + slightly reduced delay. This mode can only be set on an + newly initialized encoder because it changes the codec + delay. + """ + self.version = version + self.samplerate = samplerate + + os.environ["pyogg_win_libopus_version"] = version + importlib.reload(pyogg.opus) + + self.encoder = pyogg.OpusBufferedEncoder() + + self.encoder.set_application(app) + + self.encoder.set_sampling_frequency(samplerate) + + self.encoder.set_channels(1) + + self.set_frame_size() + self.set_compression() + self.set_feature() + self.set_bitrate_mode() + self.set_bitrates() + self.set_bandwidth() + self.set_packet_loss() + + def set_compression(self, level=10): + """complex 0-10 low-hires""" + self.encoder.set_compresion_complex(level) + + def set_bitrates(self, bitrates=64000): + """input birate unit: bps""" + if bitrates <= 2500: + bitrates = 2500 + + self.encoder.set_bitrates(bitrates) + + def set_bandwidth(self, bandwidth="fullband"): + """ + narrowband: + Narrowband typically refers to a limited range of frequencies suitable for voice communication. + + mediumband (unsupported in libopus 1.3+): + Mediumband extends the frequency range compared to narrowband, providing better audio quality. + + wideband: + Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband. + + superwideband: + Superwideband extends the frequency range beyond wideband, further enhancing audio quality. + + fullband (default): + Fullband provides the widest frequency range among the listed options, offering the highest audio quality. + + auto: opus is working auto not force + """ + self.encoder.set_bandwidth(bandwidth) + + def set_frame_size(self, size=60): + """ Set the desired frame duration (in milliseconds). + Valid options are 2.5, 5, 10, 20, 40, or 60ms. + Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms. + + @return chunk size + """ + if self.version != "hev2" and size > 60: + raise ValueError("non hev2 can't use framesize > 60") + + self.encoder.set_frame_size(size) + + return int((size / 1000) * self.samplerate) + + def set_packet_loss(self, loss=0): + """input: % percent""" + if loss > 100: + raise ValueError("percent must <=100") + + self.encoder.set_packets_loss(loss) + + def set_bitrate_mode(self, mode="CVBR"): + """VBR, CVBR, CBR + VBR in 1.5.x replace by CVBR + """ + + self.encoder.set_bitrate_mode(mode) + + def set_feature(self, prediction=False, phaseinvert=False, DTX=False): + self.encoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction)) + self.encoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert)) + self.encoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) + + def enable_voice_mode(self, enable=True, auto=False): + self.encoder.enable_voice_enhance(enable, auto) + + def __parameterization(self, stereo_signal): + # Convert int16 to float32 for processing + stereo_signal = stereo_signal.astype(np.float32) / 32768.0 + + # Reshape stereo_signal into a 2D array with two channels + stereo_signal = stereo_signal.reshape((-1, 2)) + + # Calculate the magnitude spectrogram for each channel + mag_left = np.abs(np.fft.fft(stereo_signal[:, 0])) + mag_right = np.abs(np.fft.fft(stereo_signal[:, 1])) + + # Calculate the phase difference between the left and right channels + phase_diff = np.angle(stereo_signal[:, 0]) - np.angle(stereo_signal[:, 1]) + + # Compute other spatial features + # Calculate stereo width + stereo_width = np.mean(np.correlate(mag_left, mag_right, mode='full')) + + # Calculate phase coherence + phase_coherence = np.mean(np.cos(phase_diff)) + + # Calculate stereo panning + stereo_panning_left = np.mean(mag_left / (mag_left + mag_right)) + stereo_panning_right = np.mean(mag_right / (mag_left + mag_right)) + + pan = stereo_panning_right - stereo_panning_left + + # Return the derived parameters + return (int(stereo_width), phase_coherence, pan) + + def encode(self, pcmbytes, directpcm=False): + """input: pcm bytes accept float32/int16 only + x74 is mono + x75 is stereo LR + x76 is stereo mid/side + + xnl is no side audio + """ + if directpcm: + if pcmbytes.dtype == np.float32: + pcm = (pcmbytes * 32767).astype(np.int16) + elif pcmbytes.dtype == np.int16: + pcm = pcmbytes.astype(np.int16) + else: + raise TypeError("accept only int16/float32") + else: + pcm = np.frombuffer(pcmbytes, dtype=np.int16) + + pcmreshaped = pcm.reshape(-1, 2) + + mono_data = np.mean(pcmreshaped * 0.5, axis=1, dtype=np.int16) + + stereodata = self.__parameterization(pcmreshaped) + packedstereodata = struct.pack('iff', *stereodata) + + encoded_packet = self.encoder.buffered_encode(memoryview(bytearray(mono_data)), flush=True)[0][0].tobytes() + + encoded_packet = (encoded_packet + b'\\x21\\x75' + packedstereodata) + + return encoded_packet + +class xOpusDecoder: def __init__(self, sample_rate=48000): self.Ldecoder = pyogg.OpusDecoder() self.Rdecoder = pyogg.OpusDecoder() @@ -243,22 +441,189 @@ class DualOpusDecoder: self.Ldecoder.set_sampling_frequency(sample_rate) self.Rdecoder.set_sampling_frequency(sample_rate) + self.__prev_pan = 0.0 + + def __smooth(self, value, prev_value, alpha=0.1): + return alpha * value + (1 - alpha) * prev_value + + def __expand_and_pan(self, input_signal, pan_value, expansion_factor, gain): + """ + Apply stereo expansion and panning to an input audio signal. + + Parameters: + - input_signal: Input audio signal (numpy array of int16). + - expansion_factor: Factor to expand the stereo width (0 to 1). + - pan_value: Pan value (-1 to 1, where -1 is full left, 1 is full right). + - gain: Gain factor to adjust the volume. + + Returns: + - output_signal: Processed audio signal (stereo, numpy array of int16). + """ + + # Convert int16 to float32 for processing + input_signal_float = input_signal.astype(np.float32) / 32768.0 + + # Separate the channels + left_channel = input_signal_float[:, 0] + right_channel = input_signal_float[:, 1] + + # Apply panning + pan_left = (1 - pan_value) / 2 + pan_right = (1 + pan_value) / 2 + left_channel *= pan_left + right_channel *= pan_right + + # Apply stereo expansion + center = (left_channel + right_channel) / 2 + left_channel = center + (left_channel - center) * expansion_factor + right_channel = center + (right_channel - center) * expansion_factor + + # Apply gain + left_channel *= gain + right_channel *= gain + + # Ensure no clipping by normalizing if necessary + max_val = max(np.max(np.abs(left_channel)), np.max(np.abs(right_channel))) + if max_val > 1.0: + left_channel /= max_val + right_channel /= max_val + + # Merge the channels + output_signal = np.stack((left_channel, right_channel), axis=-1) + + return (output_signal * 32767).astype(np.int16) + + def __mix_stereo_signals(self, signal1, signal2, volume1=1.0, volume2=1.0): + # Ensure both signals have the same length + length = max(len(signal1), len(signal2)) + signal1 = np.pad(signal1, ((0, length - len(signal1)), (0, 0)), mode='constant') + signal2 = np.pad(signal2, ((0, length - len(signal2)), (0, 0)), mode='constant') + + # Convert signals to float + signal1 = signal1.astype(np.float32) + signal2 = signal2.astype(np.float32) + + # Adjust volume + signal1 *= volume1 + signal2 *= volume2 + + # Mix the signals + mixed_signal = signal1 + signal2 + + # Normalize the mixed signal to prevent clipping + max_amplitude = np.max(np.abs(mixed_signal)) + if max_amplitude > 32767: + mixed_signal = (mixed_signal / max_amplitude) * 32767 + + return mixed_signal.astype(np.int16) + + def __apply_smoothing_window(self, audio_data, window_size): + """ + Apply a smoothing window to the beginning and end of the audio data. + + Parameters: + - audio_data: 2D numpy array with shape (num_samples, 2) + - window_size: Size of the smoothing window in samples + + Returns: + - smoothed_audio_data: 2D numpy array with the smoothing window applied + """ + window = np.hanning(window_size * 2) + fade_in = window[:window_size] + fade_out = window[-window_size:] + + audio_data[:window_size, :] *= fade_in[:, np.newaxis] + audio_data[-window_size:, :] *= fade_out[:, np.newaxis] + + return audio_data + + def __stereo_widening_effect(self, data, delay_samples=10, gain=0.8, window_size=100): + audio_data = data.reshape(-1, 2) + + # Convert int16 to float32 for processing + audio_data = audio_data.astype(np.float32) + + # Apply delay to the right channel + right_channel = np.roll(audio_data[:, 1], delay_samples) + + # Apply gain to both channels + audio_data[:, 0] *= gain + right_channel *= gain + + # Combine channels back into stereo + widened_audio_data = np.stack((audio_data[:, 0], right_channel), axis=1) + + # Apply smoothing window to reduce clicks + widened_audio_data = self.__apply_smoothing_window(widened_audio_data, window_size) + + # Clip to avoid overflow + widened_audio_data = np.clip(widened_audio_data, -32768, 32767) + + # Convert float32 back to int16 + widened_audio_data = widened_audio_data.astype(np.int16) + + return widened_audio_data + + def __apply_phase_coherence_to_stereo(self, signal, phase_coherence): + # Convert phase coherence to phase shift in radians + phase_shift = np.arccos(phase_coherence) + # Apply phase shift to both channels + return self.__apply_phase_shift(signal, phase_shift) + + # Function to apply phase shift to one channel + def __apply_phase_shift(self, signal, phase_shift): + # Convert to complex + signal_complex = signal.astype(np.complex64) + # Apply phase shift + shifted_signal = signal_complex * np.exp(1j * phase_shift) + return shifted_signal.astype(np.int16) + + def __butter_lowpass_filter_stereo(self, data, cutoff, fs, order=5): + nyq = 0.5 * fs + normal_cutoff = cutoff / nyq + b, a = butter(order, normal_cutoff, btype='low', analog=False) + filtered_data = np.apply_along_axis(lambda x: filtfilt(b, a, x), axis=0, arr=data) + return filtered_data.astype(np.int16) + + def __synthstereo(self, mono_signal, stereodata): + pan = stereodata[2] + + # Smooth the pan value + pan = self.__smooth(pan, self.__prev_pan, alpha=0.25) + self.__prev_pan = pan + + stereo_exp = stereodata[0] / 10000 + + try: + delayed = self.__stereo_widening_effect(mono_signal, int(stereo_exp), 1, int(stereo_exp) * 2) + except: + delayed = mono_signal + + l1 = self.__expand_and_pan(mono_signal, pan, 1, 2) + + stereo_signal_shifted = self.__apply_phase_coherence_to_stereo(delayed, stereodata[1]) + + return self.__mix_stereo_signals(l1, stereo_signal_shifted, volume1=1, volume2=0.5).astype(np.int16) + def decode(self, dualopusbytes: bytes, outputformat=np.int16): # mode check if b"\\x64\\x74" in dualopusbytes: mode = 0 - dualopusbytespilted = dualopusbytes.split(b'\\x64\\x74') + xopusbytespilted = dualopusbytes.split(b'\\x64\\x74') elif b"\\x64\\x76" in dualopusbytes: mode = 2 - dualopusbytespilted = dualopusbytes.split(b'\\x64\\x76') + xopusbytespilted = dualopusbytes.split(b'\\x64\\x76') elif b"\\x64\\x75" in dualopusbytes: mode = 1 - dualopusbytespilted = dualopusbytes.split(b'\\x64\\x75') + xopusbytespilted = dualopusbytes.split(b'\\x64\\x75') + elif b"\\x21\\x75" in dualopusbytes: + mode = 3 # v2 + xopusbytespilted = dualopusbytes.split(b'\\x21\\x75') else: - raise TypeError("this is not dual opus") + raise TypeError("this is not xopus bytes") if mode == 0: # mono - Mencoded_packet = dualopusbytespilted[0] + Mencoded_packet = xopusbytespilted[0] decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) @@ -266,8 +631,8 @@ class DualOpusDecoder: elif mode == 2: # stereo mid/side (Joint encoding) - Mencoded_packet = dualopusbytespilted[0] - Sencoded_packet = dualopusbytespilted[1] + Mencoded_packet = xopusbytespilted[0] + Sencoded_packet = xopusbytespilted[1] decoded_mid_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) Mpcm = np.frombuffer(decoded_mid_channel_pcm, dtype=np.int16) @@ -279,18 +644,34 @@ class DualOpusDecoder: Mpcm = int16_to_float32(Mpcm) Spcm = int16_to_float32(Spcm) - L = (Mpcm + Spcm) / 1.5 - R = (Mpcm - Spcm) / 1.5 + L = Mpcm + Spcm + R = Mpcm - Spcm stereo_signal = np.column_stack((L, R)) + + max_amplitude = np.max(np.abs(stereo_signal)) + if max_amplitude > 1.0: + stereo_signal /= max_amplitude + stereo_signal = float32_to_int16(stereo_signal) else: stereo_signal = np.column_stack((Mpcm, Mpcm)) + elif mode == 3: + Mencoded_packet = xopusbytespilted[0] + stereodatapacked = xopusbytespilted[1] + stereodata = struct.unpack('iff', stereodatapacked) + + mono_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) + Mpcm = np.frombuffer(mono_channel_pcm, dtype=np.int16) + + stereo_audio = np.stack((Mpcm, Mpcm)).T.reshape(-1, 2) + + stereo_signal = self.__synthstereo(stereo_audio, stereodata) else: # stereo LR - Lencoded_packet = dualopusbytespilted[0] - Rencoded_packet = dualopusbytespilted[1] + Lencoded_packet = xopusbytespilted[0] + Rencoded_packet = xopusbytespilted[1] decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) @@ -368,10 +749,13 @@ class FooterContainer: return loudness_avg, length class XopusWriter: - def __init__(self, file, encoder: DualOpusEncoder, metadata={}): + def __init__(self, file, encoder: DualOpusEncoder, metadata=None): self.file = file self.encoder = encoder + if metadata is None: + metadata = {} + systemmetadata = { "format": "Xopus", "audio": { @@ -445,7 +829,8 @@ class XopusReader: else: try: yield decoder.decode(data) - except: + except Exception as e: + #print(e) yield b"" else: decodedlist = [] diff --git a/player.py b/player.py index 5bf2ef7..0e1da48 100644 --- a/player.py +++ b/player.py @@ -1,5 +1,5 @@ import pyaudio -from libxheopus import DualOpusDecoder, XopusReader +from libxheopus import xOpusDecoder, XopusReader import argparse from tqdm import tqdm import wave @@ -16,7 +16,7 @@ progress = tqdm() # Initialize PyAudio p = pyaudio.PyAudio() -decoder = DualOpusDecoder() +decoder = xOpusDecoder() xopusdecoder = XopusReader(args.input) diff --git a/realtime.py b/realtime.py new file mode 100644 index 0000000..c0b5de1 --- /dev/null +++ b/realtime.py @@ -0,0 +1,79 @@ +import numpy as np +import pyaudio +import os +from libxheopus import DualOpusEncoder, xOpusDecoder + +encoder = DualOpusEncoder("restricted_lowdelay", 48000, "hev2") +encoder.set_bitrates(24000) +encoder.set_bitrate_mode("CVBR") +encoder.set_bandwidth("fullband") +encoder.set_compression(10) +desired_frame_size = encoder.set_frame_size(120) + +decoder = xOpusDecoder(48000) + +p = pyaudio.PyAudio() + +device_name_input = "Line 5 (Virtual Audio Cable)" +device_index_input = 0 +for i in range(p.get_device_count()): + dev = p.get_device_info_by_index(i) + if dev['name'] == device_name_input: + device_index_input = dev['index'] + break + +device_name_output = "Speakers (2- USB Audio DAC )" +device_index_output = 0 +for i in range(p.get_device_count()): + dev = p.get_device_info_by_index(i) + if dev['name'] == device_name_output: + device_index_output = dev['index'] + break + +streaminput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, input=True, input_device_index=device_index_input) +streamoutput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, output=True, output_device_index=device_index_output) + +print(desired_frame_size) + +try: + while True: + try: + pcm = np.frombuffer(streaminput.read(desired_frame_size, exception_on_overflow=False), dtype=np.int16) + + if len(pcm) == 0: + # If PCM is empty, break the loop + break + + encoded_packets = encoder.encode(pcm) + + print(len(pcm), "-encoded->", len(encoded_packets)) + + + # print(encoded_packet) + try: + decoded_pcm = decoder.decode(encoded_packets) + except Exception as e: + decoded_pcm = b"" + + + # Check if the decoded PCM is empty or not + if len(decoded_pcm) > 0: + pcm_to_write = np.frombuffer(decoded_pcm, dtype=np.int16) + + streamoutput.write(pcm_to_write.astype(np.int16).tobytes()) + else: + print("Decoded PCM is empty") + + except Exception as e: + print(e) + raise + +except KeyboardInterrupt: + print("Interrupted by user") +finally: + # Clean up PyAudio streams and terminate PyAudio + streaminput.stop_stream() + streaminput.close() + streamoutput.stop_stream() + streamoutput.close() + p.terminate() \ No newline at end of file