From a1f69c3f9d91803b79b2c4390244111eeed48f61 Mon Sep 17 00:00:00 2001 From: damp11113 Date: Tue, 7 May 2024 17:11:53 +0700 Subject: [PATCH] Joint encoding update add Joint encoding with Mid/Side --- createnewformat.reg | Bin 298 -> 1086 bytes gui.py | 17 +++++- libxheopus.py | 145 ++++++++++++++++++++++++++++++++++++++------ 3 files changed, 142 insertions(+), 20 deletions(-) diff --git a/createnewformat.reg b/createnewformat.reg index 472d3b104f9684e64c419ae3cb5e08808b50233c..a5c57da3f0bf563790aa23f4a4f275fbdc358b7d 100644 GIT binary patch literal 1086 zcmcJO-A=+l5QWbR@D2@$m!e*HV`3r!5{ViK;9smESgK$OG!z70Uj1fUTW&CzXf_Mo zotZOd&dh#(+-anV2AXQAM4s9T^~znMxn^2()4)37H0A8+7W)kAi1UQ=u_`)bl~|8D z*9HC$>ZztnRn=iN*gbHsSzY<~uZWGc0Og6OGo7jo{!o3FcL4sTPx%+v+fj;44f4;a z!m1A09=uXE9*h^R+9|g+Sei!wyAr$QDi8P_>D@(k(I@7s>?-Kzv8lhsPqIvN@V0p$ zZ0M0jRzZz*gFdG9RmLMyQ}rxYH&d&_O&hiDb463NcJ0M>y?l^Ez0i||BC2SjgVj^$ zm}c6o=Rf61s3jyv-~Rm z@|~t*jp1X{Sa7t=rdj&Nd|1?U!rSrlsYu)@aghmc*KBqC1Nt?l3Y)ytXA`T%JWch6 jJ-}`;E8c8F`}rHNyEs$GrY}{&7 2: + mode = 1 + + self.stereomode = mode + self.enablejoint = enablejoint + def set_frame_size(self, size=60): """ Set the desired frame duration (in milliseconds). Valid options are 2.5, 5, 10, 20, 40, or 60ms. @@ -141,7 +163,13 @@ class DualOpusEncoder: self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) def encode(self, pcmbytes, directpcm=False): - """input: pcm bytes accept float32/int16 only""" + """input: pcm bytes accept float32/int16 only + x74 is mono + x75 is stereo LR + x76 is stereo mid/side + + xnl is no side audio + """ if directpcm: if pcmbytes.dtype == np.float32: pcm = (pcmbytes * 32767).astype(np.int16) @@ -152,18 +180,58 @@ class DualOpusEncoder: else: pcm = np.frombuffer(pcmbytes, dtype=np.int16) - left_channel = pcm[::2] - right_channel = pcm[1::2] + if self.stereomode == 0: + # mono + left_channel = pcm[::2] + right_channel = pcm[1::2] + mono = (left_channel + right_channel) / 2 - Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() - Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][ - 0].tobytes() + intmono = float32_to_int16(mono) - dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet) + Mencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(intmono)), flush=True)[0][0].tobytes() + + dual_encoded_packet = (Mencoded_packet + b'\\x64\\x74') + elif self.stereomode == 2: + # stereo mid/side (Joint encoding) + # convert to float32 + pcm = int16_to_float32(pcm) + + left_channel = pcm[::2] + right_channel = pcm[1::2] + + mid = (left_channel + right_channel) / 2 + side = (left_channel - right_channel) / 2 + + # convert back to int16 + mid = float32_to_int16(mid) + intside = float32_to_int16(side) + + midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes() + + # check if side is no audio or loudness <= -50 DBFS + try: + loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side)))) + except: + loudnessside = 0 + + if (loudnessside) <= -50 and self.enablejoint: + sideencoded_packet = b"\\xnl" + else: + sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes() + + dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet) + else: + # stereo LR + left_channel = pcm[::2] + right_channel = pcm[1::2] + + Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() + Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][0].tobytes() + + dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet) return dual_encoded_packet - class DualOpusDecoder: def __init__(self, sample_rate=48000): self.Ldecoder = pyogg.OpusDecoder() @@ -176,22 +244,61 @@ class DualOpusDecoder: self.Rdecoder.set_sampling_frequency(sample_rate) def decode(self, dualopusbytes: bytes, outputformat=np.int16): - try: + # mode check + if b"\\x64\\x74" in dualopusbytes: + mode = 0 + dualopusbytespilted = dualopusbytes.split(b'\\x64\\x74') + elif b"\\x64\\x76" in dualopusbytes: + mode = 2 + dualopusbytespilted = dualopusbytes.split(b'\\x64\\x76') + elif b"\\x64\\x75" in dualopusbytes: + mode = 1 dualopusbytespilted = dualopusbytes.split(b'\\x64\\x75') - Lencoded_packet = dualopusbytespilted[0] - Rencoded_packet = dualopusbytespilted[1] - except: + else: raise TypeError("this is not dual opus") - decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) - decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) + if mode == 0: # mono + Mencoded_packet = dualopusbytespilted[0] + decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) + Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) - Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=outputformat) - Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=outputformat) + stereo_signal = np.column_stack((Mpcm, Mpcm)) - stereo_signal = np.empty((len(Lpcm), 2), dtype=Lpcm.dtype) - stereo_signal[:, 0] = Lpcm - stereo_signal[:, 1] = Rpcm + elif mode == 2: + # stereo mid/side (Joint encoding) + Mencoded_packet = dualopusbytespilted[0] + Sencoded_packet = dualopusbytespilted[1] + + decoded_mid_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet))) + Mpcm = np.frombuffer(decoded_mid_channel_pcm, dtype=np.int16) + + if Sencoded_packet != b"\\xnl": + decoded_side_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Sencoded_packet))) + Spcm = np.frombuffer(decoded_side_channel_pcm, dtype=np.int16) + + Mpcm = int16_to_float32(Mpcm) + Spcm = int16_to_float32(Spcm) + + L = (Mpcm + Spcm) / 1.5 + R = (Mpcm - Spcm) / 1.5 + + stereo_signal = np.column_stack((L, R)) + stereo_signal = float32_to_int16(stereo_signal) + else: + stereo_signal = np.column_stack((Mpcm, Mpcm)) + + else: + # stereo LR + Lencoded_packet = dualopusbytespilted[0] + Rencoded_packet = dualopusbytespilted[1] + + decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) + decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) + + Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) + Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=np.int16) + + stereo_signal = np.column_stack((Lpcm, Rpcm)) return stereo_signal.astype(outputformat).tobytes()