diff --git a/gui.py b/gui.py index aa72ab9..858c8c2 100644 --- a/gui.py +++ b/gui.py @@ -1,3 +1,4 @@ +print("Please wait...") import dearpygui.dearpygui as dpg import easygui import threading @@ -91,10 +92,14 @@ class App: signaltype = str(dpg.get_value("opussignaltype")).lower() profile = str(dpg.get_value("opusprofile")).strip().lower() stereomode = str(dpg.get_value("opusstereomode")).lower() - if stereomode == "stereo l/r": + bitbalance = dpg.get_value("opusbitratebalance") + + if stereomode == "l/r": stereomode = 1 - elif stereomode == "stereo mid/side": + elif stereomode == "mid/side": stereomode = 2 + elif stereomode == "intensity": + stereomode = 3 else: stereomode = 2 @@ -108,27 +113,29 @@ class App: signalauto = True signalvoice = False + if bitbalance == -1: + bitbalance = None + try: total = 0 current = 0 filename = os.path.splitext(os.path.basename(self.inputfilepath))[0] dpg.set_value("convertstatus", "init encoder...") - print(profile) if profile == "xhe-opus v1": encoder = libxheopus.DualOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion")) + + encoder.set_stereo_mode(stereomode, dpg.get_value("opusautomono"), dpg.get_value("opusmsautomonogate"), dpg.get_value("opusintensity")) + encoder.set_bitrates(int(dpg.get_value("opusbitrate") * 1000), balance_percent=bitbalance) else: encoder = libxheopus.PSOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion")) + encoder.set_bitrates(int(dpg.get_value("opusbitrate") * 1000)) encoder.set_bitrate_mode(dpg.get_value("opusbitmode")) encoder.set_bandwidth(dpg.get_value("opusbandwidth")) - encoder.set_bitrates(int(dpg.get_value("opusbitrate")*1000)) encoder.set_compression(dpg.get_value("opuscompression")) encoder.set_packet_loss(dpg.get_value("opuspacketloss")) - if profile != "xhe-opus v2": - encoder.set_stereo_mode(stereomode, dpg.get_value("opusenajoint")) - encoder.set_feature(dpg.get_value("opusenapred"), False, dpg.get_value("opusenadtx")) encoder.enable_voice_mode(signalvoice, signalauto) @@ -285,25 +292,33 @@ class App: thread.start() def window(self): - with dpg.window(label="Encoder", width=420, no_close=True): + with dpg.window(label="Encoder", width=500, no_close=True): dpg.add_text("input:", tag="inpathshow") dpg.add_text("output:", tag="outpathshow") + dpg.add_button(label="Select Input File", callback=self.selectinputfile) dpg.add_button(label="Select Output Path", callback=self.selectoutputpath) + dpg.add_combo(["xHE-Opus v1", "xHE-Opus v2"], label="Profile", default_value="xHE-Opus v1", tag="opusprofile", callback=self.changeprofileopus) dpg.add_combo(["hev2", "exper", "stable", "old"], label="Version", default_value="hev2", tag="opusversion", callback=self.changeversionopus) dpg.add_combo(["120", "100", "80", "60", "40", "20", "10", "5"], label="Frame Size (ms)", tag="opusframesize", default_value="120") dpg.add_combo(["voip", "audio", "restricted_lowdelay"], label="Application", default_value="restricted_lowdelay", tag="opusapp") dpg.add_combo(["VBR", "CVBR", "CBR"], label="Bitrate Mode", default_value="CVBR", tag="opusbitmode") dpg.add_combo(["auto", "fullband", "superwideband", "wideband", "mediumband", "narrowband"], label="Bandwidth", tag="opusbandwidth", default_value="fullband") - dpg.add_combo(["Stereo L/R", "Stereo Mid/Side"], label="Stereo Mode", tag="opusstereomode", default_value="Stereo L/R") + dpg.add_combo(["L/R", "Mid/Side", "Intensity"], label="Stereo Mode", tag="opusstereomode", default_value="L/R") dpg.add_combo(["Auto", "Voice", "Music"], label="Signal Type", tag="opussignaltype", default_value="Auto") + + dpg.add_input_int(label="Bitrates Balance", min_value=-1, max_value=100, min_clamped=True, max_clamped=True, step_fast=1, default_value=-1, tag="opusbitratebalance") dpg.add_input_float(label="Bitrates", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64, tag="opusbitrate") dpg.add_input_int(label="Compression Level", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=10, tag="opuscompression") dpg.add_input_int(label="Packet Loss", max_clamped=True, min_clamped=True, min_value=0, max_value=100, default_value=0, tag="opuspacketloss") + dpg.add_input_int(label="Auto Mono Threshold (Mid/Side Encoding)", min_value=0, max_value=-100, min_clamped=True, max_clamped=True, step_fast=1, default_value=-50, tag="opusmsautomonogate") + dpg.add_input_float(label="Intensity (Intensity Encoding)", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=1, tag="opusintensity") + dpg.add_checkbox(label="Prediction", tag="opusenapred") dpg.add_checkbox(label="DTX", tag="opusenadtx") - dpg.add_checkbox(label="Auto Mono (Mid/Side Encoding)", tag="opusenajoint") + dpg.add_checkbox(label="Auto Mono (Mid/Side Encoding)", tag="opusautomono") + dpg.add_button(label="Convert", callback=self.startconvert) with dpg.window(label="converting", show=False, tag="convertingwindow", modal=True, no_resize=True, no_move=True, no_title_bar=True, width=320): diff --git a/libxheopus.py b/libxheopus.py index e2abc8b..6e3cd38 100644 --- a/libxheopus.py +++ b/libxheopus.py @@ -15,7 +15,7 @@ def int16_to_float32(data_int16): return data_float32 class DualOpusEncoder: - def __init__(self, app="audio", samplerate=48000, version="stable"): + def __init__(self, app="restricted_lowdelay", samplerate=48000, version="stable"): """ ----------------------------- version-------------------------- hev2: libopus 1.5.1 (fre:ac) @@ -51,8 +51,14 @@ class DualOpusEncoder: """ self.version = version self.samplerate = samplerate - self.stereomode = 1 #0 = mono, 1 = stereo LR, 2 = stereo Mid/Side - self.audiomono = False + self.stereomode = 1 #0 = mono, 1 = Stereo LR, 2 = Stereo Mid/Side, 3 = Stereo Intensity + self.automonogate = -50 + self.automono = False + self.msmono = False + self.overallbitrate = 0 + self.secbitrate = 0 + self.intensity = 1 + self.bitratemode = 1 # 0 = CBR, 1 = CVBR, 2 = VBR os.environ["pyogg_win_libopus_version"] = version importlib.reload(pyogg.opus) @@ -82,18 +88,46 @@ class DualOpusEncoder: self.Lencoder.set_compresion_complex(level) self.Rencoder.set_compresion_complex(level) - def set_bitrates(self, bitrates=64000, samebitrate=False): - """input birate unit: bps""" + def set_bitrates(self, bitrates=64000, samebitrate=False, balance_percent=None): + """ + input birate unit: bps + + balance_percent is working good with M/S stereo + """ if bitrates <= 5000: bitrates = 5000 - if samebitrate: - bitperchannel = bitrates - else: - bitperchannel = bitrates / 2 + if balance_percent is None: + if self.stereomode == 0: + balance_percent = 100 + elif self.stereomode == 2: + balance_percent = 75 + else: + balance_percent = 50 - self.Lencoder.set_bitrates(int(bitperchannel)) - self.Rencoder.set_bitrates(int(bitperchannel)) + self.overallbitrate = bitrates + + if samebitrate: + self.Lencoder.set_bitrates(int(bitrates)) + self.Rencoder.set_bitrates(int(bitrates)) + else: + percentage_decimal = balance_percent / 100 + bitratech1 = round(bitrates * percentage_decimal) + bitratech2 = bitrates - bitratech1 + + if bitratech1 < 2500: + bitratech1 = 2500 + + if bitratech2 < 2500: + self.msmono = True + bitratech2 = 2500 + else: + self.msmono = False + + self.secbitrate = bitratech1 + + self.Lencoder.set_bitrates(int(bitratech1)) + self.Rencoder.set_bitrates(int(bitratech2)) def set_bandwidth(self, bandwidth="fullband"): """ @@ -117,19 +151,25 @@ class DualOpusEncoder: self.Lencoder.set_bandwidth(bandwidth) self.Rencoder.set_bandwidth(bandwidth) - def set_stereo_mode(self, mode=1, audiomono=False): + def set_stereo_mode(self, mode=1, automono=False, automonogate=-50, intensity=1, changebitratesbalance=False): """ - 0 = mono + 0 = mono (not recommend) 1 = stereo LR - 2 = stereo Mid/Side (Joint encoding) + 2 = stereo Mid/Side + 3 = Intensity """ if mode > 2: mode = 1 self.stereomode = mode - self.audiomono = audiomono + self.automono = automono + self.automonogate = automonogate + self.intensity = intensity - def set_frame_size(self, size=60): + if changebitratesbalance: + self.set_bitrates(self.overallbitrate) + + def set_frame_size(self, size=60, nocheck=False): """ Set the desired frame duration (in milliseconds). Valid options are 2.5, 5, 10, 20, 40, or 60ms. Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms. @@ -139,8 +179,8 @@ class DualOpusEncoder: if self.version != "hev2" and size > 60: raise ValueError("non hev2 can't use framesize > 60") - self.Lencoder.set_frame_size(size) - self.Rencoder.set_frame_size(size) + self.Lencoder.set_frame_size(size, nocheck) + self.Rencoder.set_frame_size(size, nocheck) return int((size / 1000) * self.samplerate) @@ -156,6 +196,14 @@ class DualOpusEncoder: """VBR, CVBR, CBR VBR in 1.5.x replace by CVBR """ + if mode.lower() == "cbr": + self.bitratemode = 0 + elif mode.lower() == "cvbr": + self.bitratemode = 1 + elif mode.lower() == "vbr": + self.bitratemode = 2 + else: + raise ValueError(f"No {mode} bitrate mode option") self.Lencoder.set_bitrate_mode(mode) self.Rencoder.set_bitrate_mode(mode) @@ -217,20 +265,35 @@ class DualOpusEncoder: mid = float32_to_int16(mid) intside = float32_to_int16(side) - midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes() - # check if side is no audio or loudness <= -50 DBFS try: loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side)))) except: loudnessside = 0 - if (loudnessside) <= -50 and self.audiomono: + if (loudnessside) <= self.automonogate and self.automono or self.msmono: sideencoded_packet = b"\\xnl" + if self.bitratemode == 0: # CBR + self.Lencoder.set_bitrates(int(self.overallbitrate - 300)) else: + self.Lencoder.set_bitrates(int(self.secbitrate)) sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes() + midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes() + dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet) + elif self.stereomode == 3: + # stereo intensity (Joint encoding) + left_channel = pcm[:, 0] + right_channel = pcm[:, 1] + + IRChannel = left_channel + self.intensity * (right_channel - left_channel) + + Lencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() + + IRencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(IRChannel)), flush=True)[0][0].tobytes() + + dual_encoded_packet = (Lencoded_packet + b'\\x64\\x77' + IRencoded_packet) else: # stereo LR left_channel = pcm[::2] @@ -244,7 +307,7 @@ class DualOpusEncoder: return dual_encoded_packet class PSOpusEncoder: - def __init__(self, app="audio", samplerate=48000, version="stable"): + def __init__(self, app="restricted_lowdelay", samplerate=48000, version="stable"): """ This version is xHE-Opus v2 (Parametric Stereo) ----------------------------- version-------------------------- @@ -442,6 +505,7 @@ class xOpusDecoder: self.Rdecoder.set_sampling_frequency(sample_rate) self.__prev_pan = 0.0 + self.__prev_max_amplitude = 0.0 def __smooth(self, value, prev_value, alpha=0.1): return alpha * value + (1 - alpha) * prev_value @@ -578,13 +642,6 @@ class xOpusDecoder: shifted_signal = signal_complex * np.exp(1j * phase_shift) return shifted_signal.astype(np.int16) - def __butter_lowpass_filter_stereo(self, data, cutoff, fs, order=5): - nyq = 0.5 * fs - normal_cutoff = cutoff / nyq - b, a = butter(order, normal_cutoff, btype='low', analog=False) - filtered_data = np.apply_along_axis(lambda x: filtfilt(b, a, x), axis=0, arr=data) - return filtered_data.astype(np.int16) - def __synthstereo(self, mono_signal, stereodata): pan = stereodata[2] @@ -616,6 +673,9 @@ class xOpusDecoder: elif b"\\x64\\x75" in dualopusbytes: mode = 1 xopusbytespilted = dualopusbytes.split(b'\\x64\\x75') + elif b"\\x64\\x77" in dualopusbytes: + mode = 4 + xopusbytespilted = dualopusbytes.split(b'\\x64\\x77') elif b"\\x21\\x75" in dualopusbytes: mode = 3 # v2 xopusbytespilted = dualopusbytes.split(b'\\x21\\x75') @@ -628,7 +688,6 @@ class xOpusDecoder: Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) stereo_signal = np.column_stack((Mpcm, Mpcm)) - elif mode == 2: # stereo mid/side (Joint encoding) Mencoded_packet = xopusbytespilted[0] @@ -649,14 +708,32 @@ class xOpusDecoder: stereo_signal = np.column_stack((L, R)) - max_amplitude = np.max(np.abs(stereo_signal)) - if max_amplitude > 1.0: - stereo_signal /= max_amplitude + #max_amplitude = np.max(np.abs(stereo_signal)) + + #if max_amplitude > 1.0: + # stereo_signal /= max_amplitude + + stereo_signal = np.clip(stereo_signal, -1, 1) stereo_signal = float32_to_int16(stereo_signal) else: stereo_signal = np.column_stack((Mpcm, Mpcm)) + elif mode == 4: + # stereo intensity + Lencoded_packet = xopusbytespilted[0] + IRencoded_packet = xopusbytespilted[1] + + decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) + decoded_intensity_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(IRencoded_packet))) + + Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) + IRpcm = np.frombuffer(decoded_intensity_right_channel_pcm, dtype=np.int16) + + recovered_right = Lpcm + (IRpcm - Lpcm) / 1 + + stereo_signal = np.column_stack((Lpcm, recovered_right)) elif mode == 3: + # Parametric Stereo Mencoded_packet = xopusbytespilted[0] stereodatapacked = xopusbytespilted[1] @@ -803,6 +880,7 @@ class XopusReader: def __init__(self, file): self.file = open(file, 'rb') self.xopusline = self.file.read().split(b"\\xa") + self.lastframe = b"" def readmetadata(self): header = HeaderContainer.deserialize(self.xopusline[0]) @@ -828,10 +906,12 @@ class XopusReader: break else: try: - yield decoder.decode(data) + decodeddata = decoder.decode(data) + self.lastframe = decodeddata + yield decodeddata except Exception as e: #print(e) - yield b"" + yield self.lastframe else: decodedlist = [] for data in self.xopusline[1:]: @@ -839,9 +919,11 @@ class XopusReader: break else: try: - decodedlist.append(decoder.decode(data)) + decodeddata = decoder.decode(data) + self.lastframe = decodeddata + decodedlist.append(self.lastframe) except: - decodedlist.append(b"") + decodedlist.append(self.lastframe) return decodedlist def close(self): diff --git a/realtime.py b/realtime.py index c0b5de1..0fb1298 100644 --- a/realtime.py +++ b/realtime.py @@ -3,10 +3,12 @@ import pyaudio import os from libxheopus import DualOpusEncoder, xOpusDecoder -encoder = DualOpusEncoder("restricted_lowdelay", 48000, "hev2") -encoder.set_bitrates(24000) +encoder = DualOpusEncoder(samplerate=48000, version="hev2") +encoder.set_stereo_mode(2) +encoder.set_bitrates(32000, balance_percent=75) encoder.set_bitrate_mode("CVBR") encoder.set_bandwidth("fullband") + encoder.set_compression(10) desired_frame_size = encoder.set_frame_size(120) @@ -22,7 +24,7 @@ for i in range(p.get_device_count()): device_index_input = dev['index'] break -device_name_output = "Speakers (2- USB Audio DAC )" +device_name_output = "Speakers (2- USB AUDIO DEVICE)" device_index_output = 0 for i in range(p.get_device_count()): dev = p.get_device_info_by_index(i) @@ -30,50 +32,46 @@ for i in range(p.get_device_count()): device_index_output = dev['index'] break -streaminput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, input=True, input_device_index=device_index_input) -streamoutput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, output=True, output_device_index=device_index_output) +def callback(in_data, frame_count, time_info, status): + pcm = np.frombuffer(in_data, dtype=np.int16) -print(desired_frame_size) + encoded_packets = encoder.encode(pcm) + print(len(pcm), "-encoded->", len(encoded_packets)) + + decoded_pcm = decoder.decode(encoded_packets) + + + # Check if the decoded PCM is empty or not + if len(decoded_pcm) > 0: + pcm_to_write = np.frombuffer(decoded_pcm, dtype=np.int16) + + print(pcm_to_write) + + return (pcm_to_write.astype(np.int16).tobytes(), pyaudio.paContinue) + + else: + print("Decoded PCM is empty") + return (b"\x00", pyaudio.paContinue) + + + + +stream = p.open(format=pyaudio.paInt16, channels=2, rate=48000, + input=True, input_device_index=device_index_input, + output=True, output_device_index=device_index_output, + stream_callback=callback, frames_per_buffer=desired_frame_size) + +stream.start_stream() + +print("Streaming audio. Press Ctrl+C to stop.") try: - while True: - try: - pcm = np.frombuffer(streaminput.read(desired_frame_size, exception_on_overflow=False), dtype=np.int16) - - if len(pcm) == 0: - # If PCM is empty, break the loop - break - - encoded_packets = encoder.encode(pcm) - - print(len(pcm), "-encoded->", len(encoded_packets)) - - - # print(encoded_packet) - try: - decoded_pcm = decoder.decode(encoded_packets) - except Exception as e: - decoded_pcm = b"" - - - # Check if the decoded PCM is empty or not - if len(decoded_pcm) > 0: - pcm_to_write = np.frombuffer(decoded_pcm, dtype=np.int16) - - streamoutput.write(pcm_to_write.astype(np.int16).tobytes()) - else: - print("Decoded PCM is empty") - - except Exception as e: - print(e) - raise - + while stream.is_active(): + pass except KeyboardInterrupt: - print("Interrupted by user") -finally: - # Clean up PyAudio streams and terminate PyAudio - streaminput.stop_stream() - streaminput.close() - streamoutput.stop_stream() - streamoutput.close() - p.terminate() \ No newline at end of file + print("Stopping stream...") + +# Stop and close stream +stream.stop_stream() +stream.close() +p.terminate() \ No newline at end of file