Joint encoding update

add Joint encoding with Mid/Side
This commit is contained in:
dharm pimsen 2024-05-07 17:11:53 +07:00
parent 063d97ebb8
commit a1f69c3f9d
3 changed files with 142 additions and 20 deletions

Binary file not shown.

17
gui.py
View File

@ -80,6 +80,14 @@ class App:
dpg.configure_item("deplayconvert", show=False) dpg.configure_item("deplayconvert", show=False)
def convert(self): def convert(self):
stereomode = str(dpg.get_value("opusstereomode")).lower()
if stereomode == "stereo l/r":
stereomode = 1
elif stereomode == "stereo mid/side":
stereomode = 2
else:
stereomode = 2
try: try:
total = 0 total = 0
current = 0 current = 0
@ -92,6 +100,7 @@ class App:
encoder.set_bitrates(int(dpg.get_value("opusbitrate")*1000)) encoder.set_bitrates(int(dpg.get_value("opusbitrate")*1000))
encoder.set_compression(dpg.get_value("opuscompression")) encoder.set_compression(dpg.get_value("opuscompression"))
encoder.set_packet_loss(dpg.get_value("opuspacketloss")) encoder.set_packet_loss(dpg.get_value("opuspacketloss"))
encoder.set_stereo_mode(stereomode, dpg.get_value("opusenajoint"))
encoder.set_feature(dpg.get_value("opusenapred"), False, dpg.get_value("opusenadtx")) encoder.set_feature(dpg.get_value("opusenapred"), False, dpg.get_value("opusenadtx"))
desired_frame_size = encoder.set_frame_size(int(dpg.get_value("opusframesize"))) desired_frame_size = encoder.set_frame_size(int(dpg.get_value("opusframesize")))
@ -136,6 +145,7 @@ class App:
except Exception as e: except Exception as e:
dpg.set_value("convertstatus", str(e)) dpg.set_value("convertstatus", str(e))
raise e
else: else:
dpg.set_value("convertstatus", "Converted") dpg.set_value("convertstatus", "Converted")
@ -170,7 +180,10 @@ class App:
self.decurrentplay += 1 self.decurrentplay += 1
dpg.set_value("deplayingprog", min(1.0, max(0.0, self.decurrentplay / self.delen))) try:
dpg.set_value("deplayingprog", min(1.0, max(0.0, self.decurrentplay / self.delen)))
except:
dpg.set_value("deplayingprog", 0)
else: else:
if self.decurrentplay != 0: if self.decurrentplay != 0:
self.depausepos = self.decurrentplay self.depausepos = self.decurrentplay
@ -252,11 +265,13 @@ class App:
dpg.add_combo(["voip", "audio", "restricted_lowdelay"], label="Application", default_value="restricted_lowdelay", tag="opusapp") dpg.add_combo(["voip", "audio", "restricted_lowdelay"], label="Application", default_value="restricted_lowdelay", tag="opusapp")
dpg.add_combo(["VBR", "CVBR", "CBR"], label="Bitrate Mode", default_value="CVBR", tag="opusbitmode") dpg.add_combo(["VBR", "CVBR", "CBR"], label="Bitrate Mode", default_value="CVBR", tag="opusbitmode")
dpg.add_combo(["auto", "fullband", "superwideband", "wideband", "mediumband", "narrowband"], label="Bandwidth", tag="opusbandwidth", default_value="fullband") dpg.add_combo(["auto", "fullband", "superwideband", "wideband", "mediumband", "narrowband"], label="Bandwidth", tag="opusbandwidth", default_value="fullband")
dpg.add_combo(["Stereo L/R", "Stereo Mid/Side"], label="Stereo Mode", tag="opusstereomode", default_value="Stereo L/R")
dpg.add_input_float(label="Bitrates", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64, tag="opusbitrate") dpg.add_input_float(label="Bitrates", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64, tag="opusbitrate")
dpg.add_input_int(label="Compression Level", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=10, tag="opuscompression") dpg.add_input_int(label="Compression Level", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=10, tag="opuscompression")
dpg.add_input_int(label="Packet Loss", max_clamped=True, min_clamped=True, min_value=0, max_value=100, default_value=0, tag="opuspacketloss") dpg.add_input_int(label="Packet Loss", max_clamped=True, min_clamped=True, min_value=0, max_value=100, default_value=0, tag="opuspacketloss")
dpg.add_checkbox(label="Prediction", tag="opusenapred") dpg.add_checkbox(label="Prediction", tag="opusenapred")
dpg.add_checkbox(label="DTX", tag="opusenadtx") dpg.add_checkbox(label="DTX", tag="opusenadtx")
dpg.add_checkbox(label="Joint", tag="opusenajoint")
dpg.add_button(label="Convert", callback=self.startconvert) dpg.add_button(label="Convert", callback=self.startconvert)
with dpg.window(label="converting", show=False, tag="convertingwindow", modal=True, no_resize=True, no_move=True, no_title_bar=True, width=320): with dpg.window(label="converting", show=False, tag="convertingwindow", modal=True, no_resize=True, no_move=True, no_title_bar=True, width=320):

View File

@ -5,6 +5,14 @@ import pyogg
import os import os
import numpy as np import numpy as np
def float32_to_int16(data_float32):
data_int16 = (data_float32 * 32767).astype(np.int16)
return data_int16
def int16_to_float32(data_int16):
data_float32 = data_int16.astype(np.float32) / 32767.0
return data_float32
class DualOpusEncoder: class DualOpusEncoder:
def __init__(self, app="audio", samplerate=48000, version="stable"): def __init__(self, app="audio", samplerate=48000, version="stable"):
""" """
@ -42,6 +50,8 @@ class DualOpusEncoder:
""" """
self.version = version self.version = version
self.samplerate = samplerate self.samplerate = samplerate
self.stereomode = 1 #0 = mono, 1 = stereo LR, 2 = stereo Mid/Side
self.enablejoint = False
os.environ["pyogg_win_libopus_version"] = version os.environ["pyogg_win_libopus_version"] = version
importlib.reload(pyogg.opus) importlib.reload(pyogg.opus)
@ -100,6 +110,18 @@ class DualOpusEncoder:
self.Lencoder.set_bandwidth(bandwidth) self.Lencoder.set_bandwidth(bandwidth)
self.Rencoder.set_bandwidth(bandwidth) self.Rencoder.set_bandwidth(bandwidth)
def set_stereo_mode(self, mode=1, enablejoint=False):
"""
0 = mono
1 = stereo LR
2 = stereo Mid/Side (Joint encoding)
"""
if mode > 2:
mode = 1
self.stereomode = mode
self.enablejoint = enablejoint
def set_frame_size(self, size=60): def set_frame_size(self, size=60):
""" Set the desired frame duration (in milliseconds). """ Set the desired frame duration (in milliseconds).
Valid options are 2.5, 5, 10, 20, 40, or 60ms. Valid options are 2.5, 5, 10, 20, 40, or 60ms.
@ -141,7 +163,13 @@ class DualOpusEncoder:
self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
def encode(self, pcmbytes, directpcm=False): def encode(self, pcmbytes, directpcm=False):
"""input: pcm bytes accept float32/int16 only""" """input: pcm bytes accept float32/int16 only
x74 is mono
x75 is stereo LR
x76 is stereo mid/side
xnl is no side audio
"""
if directpcm: if directpcm:
if pcmbytes.dtype == np.float32: if pcmbytes.dtype == np.float32:
pcm = (pcmbytes * 32767).astype(np.int16) pcm = (pcmbytes * 32767).astype(np.int16)
@ -152,18 +180,58 @@ class DualOpusEncoder:
else: else:
pcm = np.frombuffer(pcmbytes, dtype=np.int16) pcm = np.frombuffer(pcmbytes, dtype=np.int16)
left_channel = pcm[::2] if self.stereomode == 0:
right_channel = pcm[1::2] # mono
left_channel = pcm[::2]
right_channel = pcm[1::2]
mono = (left_channel + right_channel) / 2
Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() intmono = float32_to_int16(mono)
Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][
0].tobytes()
dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet) Mencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(intmono)), flush=True)[0][0].tobytes()
dual_encoded_packet = (Mencoded_packet + b'\\x64\\x74')
elif self.stereomode == 2:
# stereo mid/side (Joint encoding)
# convert to float32
pcm = int16_to_float32(pcm)
left_channel = pcm[::2]
right_channel = pcm[1::2]
mid = (left_channel + right_channel) / 2
side = (left_channel - right_channel) / 2
# convert back to int16
mid = float32_to_int16(mid)
intside = float32_to_int16(side)
midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes()
# check if side is no audio or loudness <= -50 DBFS
try:
loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side))))
except:
loudnessside = 0
if (loudnessside) <= -50 and self.enablejoint:
sideencoded_packet = b"\\xnl"
else:
sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes()
dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet)
else:
# stereo LR
left_channel = pcm[::2]
right_channel = pcm[1::2]
Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes()
Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][0].tobytes()
dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet)
return dual_encoded_packet return dual_encoded_packet
class DualOpusDecoder: class DualOpusDecoder:
def __init__(self, sample_rate=48000): def __init__(self, sample_rate=48000):
self.Ldecoder = pyogg.OpusDecoder() self.Ldecoder = pyogg.OpusDecoder()
@ -176,22 +244,61 @@ class DualOpusDecoder:
self.Rdecoder.set_sampling_frequency(sample_rate) self.Rdecoder.set_sampling_frequency(sample_rate)
def decode(self, dualopusbytes: bytes, outputformat=np.int16): def decode(self, dualopusbytes: bytes, outputformat=np.int16):
try: # mode check
if b"\\x64\\x74" in dualopusbytes:
mode = 0
dualopusbytespilted = dualopusbytes.split(b'\\x64\\x74')
elif b"\\x64\\x76" in dualopusbytes:
mode = 2
dualopusbytespilted = dualopusbytes.split(b'\\x64\\x76')
elif b"\\x64\\x75" in dualopusbytes:
mode = 1
dualopusbytespilted = dualopusbytes.split(b'\\x64\\x75') dualopusbytespilted = dualopusbytes.split(b'\\x64\\x75')
Lencoded_packet = dualopusbytespilted[0] else:
Rencoded_packet = dualopusbytespilted[1]
except:
raise TypeError("this is not dual opus") raise TypeError("this is not dual opus")
decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) if mode == 0: # mono
decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) Mencoded_packet = dualopusbytespilted[0]
decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet)))
Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16)
Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=outputformat) stereo_signal = np.column_stack((Mpcm, Mpcm))
Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=outputformat)
stereo_signal = np.empty((len(Lpcm), 2), dtype=Lpcm.dtype) elif mode == 2:
stereo_signal[:, 0] = Lpcm # stereo mid/side (Joint encoding)
stereo_signal[:, 1] = Rpcm Mencoded_packet = dualopusbytespilted[0]
Sencoded_packet = dualopusbytespilted[1]
decoded_mid_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet)))
Mpcm = np.frombuffer(decoded_mid_channel_pcm, dtype=np.int16)
if Sencoded_packet != b"\\xnl":
decoded_side_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Sencoded_packet)))
Spcm = np.frombuffer(decoded_side_channel_pcm, dtype=np.int16)
Mpcm = int16_to_float32(Mpcm)
Spcm = int16_to_float32(Spcm)
L = (Mpcm + Spcm) / 1.5
R = (Mpcm - Spcm) / 1.5
stereo_signal = np.column_stack((L, R))
stereo_signal = float32_to_int16(stereo_signal)
else:
stereo_signal = np.column_stack((Mpcm, Mpcm))
else:
# stereo LR
Lencoded_packet = dualopusbytespilted[0]
Rencoded_packet = dualopusbytespilted[1]
decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet)))
decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet)))
Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16)
Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=np.int16)
stereo_signal = np.column_stack((Lpcm, Rpcm))
return stereo_signal.astype(outputformat).tobytes() return stereo_signal.astype(outputformat).tobytes()