diff --git a/createnewformat.reg b/createnewformat.reg index 472d3b1..a5c57da 100644 Binary files a/createnewformat.reg and b/createnewformat.reg differ diff --git a/gui.py b/gui.py index 9ba9fbf..acc3c18 100644 --- a/gui.py +++ b/gui.py @@ -80,6 +80,14 @@ class App: dpg.configure_item("deplayconvert", show=False) def convert(self): + stereomode = str(dpg.get_value("opusstereomode")).lower() + if stereomode == "stereo l/r": + stereomode = 1 + elif stereomode == "stereo mid/side": + stereomode = 2 + else: + stereomode = 2 + try: total = 0 current = 0 @@ -92,6 +100,7 @@ class App: encoder.set_bitrates(int(dpg.get_value("opusbitrate")*1000)) encoder.set_compression(dpg.get_value("opuscompression")) encoder.set_packet_loss(dpg.get_value("opuspacketloss")) + encoder.set_stereo_mode(stereomode, dpg.get_value("opusenajoint")) encoder.set_feature(dpg.get_value("opusenapred"), False, dpg.get_value("opusenadtx")) desired_frame_size = encoder.set_frame_size(int(dpg.get_value("opusframesize"))) @@ -136,6 +145,7 @@ class App: except Exception as e: dpg.set_value("convertstatus", str(e)) + raise e else: dpg.set_value("convertstatus", "Converted") @@ -170,7 +180,10 @@ class App: self.decurrentplay += 1 - dpg.set_value("deplayingprog", min(1.0, max(0.0, self.decurrentplay / self.delen))) + try: + dpg.set_value("deplayingprog", min(1.0, max(0.0, self.decurrentplay / self.delen))) + except: + dpg.set_value("deplayingprog", 0) else: if self.decurrentplay != 0: self.depausepos = self.decurrentplay @@ -252,11 +265,13 @@ class App: dpg.add_combo(["voip", "audio", "restricted_lowdelay"], label="Application", default_value="restricted_lowdelay", tag="opusapp") dpg.add_combo(["VBR", "CVBR", "CBR"], label="Bitrate Mode", default_value="CVBR", tag="opusbitmode") dpg.add_combo(["auto", "fullband", "superwideband", "wideband", "mediumband", "narrowband"], label="Bandwidth", tag="opusbandwidth", default_value="fullband") + dpg.add_combo(["Stereo L/R", "Stereo Mid/Side"], label="Stereo Mode", tag="opusstereomode", default_value="Stereo L/R") dpg.add_input_float(label="Bitrates", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64, tag="opusbitrate") dpg.add_input_int(label="Compression Level", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=10, tag="opuscompression") dpg.add_input_int(label="Packet Loss", max_clamped=True, min_clamped=True, min_value=0, max_value=100, default_value=0, tag="opuspacketloss") dpg.add_checkbox(label="Prediction", tag="opusenapred") dpg.add_checkbox(label="DTX", tag="opusenadtx") + dpg.add_checkbox(label="Joint", tag="opusenajoint") dpg.add_button(label="Convert", callback=self.startconvert) with dpg.window(label="converting", show=False, tag="convertingwindow", modal=True, no_resize=True, no_move=True, no_title_bar=True, width=320): diff --git a/libxheopus.py b/libxheopus.py index c403b4d..8196f40 100644 --- a/libxheopus.py +++ b/libxheopus.py @@ -5,6 +5,14 @@ import pyogg import os import numpy as np +def float32_to_int16(data_float32): + data_int16 = (data_float32 * 32767).astype(np.int16) + return data_int16 + +def int16_to_float32(data_int16): + data_float32 = data_int16.astype(np.float32) / 32767.0 + return data_float32 + class DualOpusEncoder: def __init__(self, app="audio", samplerate=48000, version="stable"): """ @@ -42,6 +50,8 @@ class DualOpusEncoder: """ self.version = version self.samplerate = samplerate + self.stereomode = 1 #0 = mono, 1 = stereo LR, 2 = stereo Mid/Side + self.enablejoint = False os.environ["pyogg_win_libopus_version"] = version importlib.reload(pyogg.opus) @@ -100,6 +110,18 @@ class DualOpusEncoder: self.Lencoder.set_bandwidth(bandwidth) self.Rencoder.set_bandwidth(bandwidth) + def set_stereo_mode(self, mode=1, enablejoint=False): + """ + 0 = mono + 1 = stereo LR + 2 = stereo Mid/Side (Joint encoding) + """ + if mode > 2: + mode = 1 + + self.stereomode = mode + self.enablejoint = enablejoint + def set_frame_size(self, size=60): """ Set the desired frame duration (in milliseconds). Valid options are 2.5, 5, 10, 20, 40, or 60ms. @@ -141,7 +163,13 @@ class DualOpusEncoder: self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) def encode(self, pcmbytes, directpcm=False): - """input: pcm bytes accept float32/int16 only""" + """input: pcm bytes accept float32/int16 only + x74 is mono + x75 is stereo LR + x76 is stereo mid/side + + xnl is no side audio + """ if directpcm: if pcmbytes.dtype == np.float32: pcm = (pcmbytes * 32767).astype(np.int16) @@ -152,18 +180,58 @@ class DualOpusEncoder: else: pcm = np.frombuffer(pcmbytes, dtype=np.int16) - left_channel = pcm[::2] - right_channel = pcm[1::2] + if self.stereomode == 0: + # mono + left_channel = pcm[::2] + right_channel = pcm[1::2] + mono = (left_channel + right_channel) / 2 - Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() - Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][ - 0].tobytes() + intmono = float32_to_int16(mono) - dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet) + Mencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(intmono)), flush=True)[0][0].tobytes() + + dual_encoded_packet = (Mencoded_packet + b'\\x64\\x74') + elif self.stereomode == 2: + # stereo mid/side (Joint encoding) + # convert to float32 + pcm = int16_to_float32(pcm) + + left_channel = pcm[::2] + right_channel = pcm[1::2] + + mid = (left_channel + right_channel) / 2 + side = (left_channel - right_channel) / 2 + + # convert back to int16 + mid = float32_to_int16(mid) + intside = float32_to_int16(side) + + midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes() + + # check if side is no audio or loudness <= -50 DBFS + try: + loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side)))) + except: + loudnessside = 0 + + if (loudnessside) <= -50 and self.enablejoint: + sideencoded_packet = b"\\xnl" + else: + sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes() + + dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet) + else: + # stereo LR + left_channel = pcm[::2] + right_channel = pcm[1::2] + + Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() + Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][0].tobytes() + + dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet) return dual_encoded_packet - class DualOpusDecoder: def __init__(self, sample_rate=48000): self.Ldecoder = pyogg.OpusDecoder() @@ -176,22 +244,61 @@ class DualOpusDecoder: self.Rdecoder.set_sampling_frequency(sample_rate) def decode(self, dualopusbytes: bytes, outputformat=np.int16): - try: + # mode check + if b"\\x64\\x74" in dualopusbytes: + mode = 0 + dualopusbytespilted = dualopusbytes.split(b'\\x64\\x74') + elif b"\\x64\\x76" in dualopusbytes: + mode = 2 + dualopusbytespilted = dualopusbytes.split(b'\\x64\\x76') + elif b"\\x64\\x75" in dualopusbytes: + mode = 1 dualopusbytespilted = dualopusbytes.split(b'\\x64\\x75') - Lencoded_packet = dualopusbytespilted[0] - Rencoded_packet = dualopusbytespilted[1] - except: + else: raise TypeError("this is not dual opus") - decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) - decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) + if mode == 0: # mono + Mencoded_packet = dualopusbytespilted[0] + decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) + Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) - Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=outputformat) - Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=outputformat) + stereo_signal = np.column_stack((Mpcm, Mpcm)) - stereo_signal = np.empty((len(Lpcm), 2), dtype=Lpcm.dtype) - stereo_signal[:, 0] = Lpcm - stereo_signal[:, 1] = Rpcm + elif mode == 2: + # stereo mid/side (Joint encoding) + Mencoded_packet = dualopusbytespilted[0] + Sencoded_packet = dualopusbytespilted[1] + + decoded_mid_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) + Mpcm = np.frombuffer(decoded_mid_channel_pcm, dtype=np.int16) + + if Sencoded_packet != b"\\xnl": + decoded_side_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Sencoded_packet))) + Spcm = np.frombuffer(decoded_side_channel_pcm, dtype=np.int16) + + Mpcm = int16_to_float32(Mpcm) + Spcm = int16_to_float32(Spcm) + + L = (Mpcm + Spcm) / 1.5 + R = (Mpcm - Spcm) / 1.5 + + stereo_signal = np.column_stack((L, R)) + stereo_signal = float32_to_int16(stereo_signal) + else: + stereo_signal = np.column_stack((Mpcm, Mpcm)) + + else: + # stereo LR + Lencoded_packet = dualopusbytespilted[0] + Rencoded_packet = dualopusbytespilted[1] + + decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) + decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) + + Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) + Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=np.int16) + + stereo_signal = np.column_stack((Lpcm, Rpcm)) return stereo_signal.astype(outputformat).tobytes()