mirror of
https://github.com/damp11113/xHE-Opus.git
synced 2025-04-27 06:28:08 +00:00
849 lines
30 KiB
Python
849 lines
30 KiB
Python
import importlib
|
|
import math
|
|
import struct
|
|
import pyogg
|
|
import os
|
|
import numpy as np
|
|
from scipy.signal import butter, filtfilt
|
|
|
|
def float32_to_int16(data_float32):
|
|
data_int16 = (data_float32 * 32767).astype(np.int16)
|
|
return data_int16
|
|
|
|
def int16_to_float32(data_int16):
|
|
data_float32 = data_int16.astype(np.float32) / 32767.0
|
|
return data_float32
|
|
|
|
class DualOpusEncoder:
|
|
def __init__(self, app="audio", samplerate=48000, version="stable"):
|
|
"""
|
|
----------------------------- version--------------------------
|
|
hev2: libopus 1.5.1 (fre:ac)
|
|
exper: libopus 1.5.1
|
|
stable: libopus 1.4
|
|
old: libopus 1.3.1
|
|
custom: custom opus path you can use "pyogg_win_libopus_custom_path" env to change opus version (windows only)
|
|
------------------------- App----------------------------------
|
|
|
|
Set the encoding mode.
|
|
|
|
This must be one of 'voip', 'audio', or 'restricted_lowdelay'.
|
|
|
|
'voip': Gives best quality at a given bitrate for voice
|
|
signals. It enhances the input signal by high-pass
|
|
filtering and emphasizing formants and
|
|
harmonics. Optionally it includes in-band forward error
|
|
correction to protect against packet loss. Use this mode
|
|
for typical VoIP applications. Because of the enhancement,
|
|
even at high bitrates the output may sound different from
|
|
the input.
|
|
|
|
'audio': Gives best quality at a given bitrate for most
|
|
non-voice signals like music. Use this mode for music and
|
|
mixed (music/voice) content, broadcast, and applications
|
|
requiring less than 15 ms of coding delay.
|
|
|
|
'restricted_lowdelay': configures low-delay mode that
|
|
disables the speech-optimized mode in exchange for
|
|
slightly reduced delay. This mode can only be set on an
|
|
newly initialized encoder because it changes the codec
|
|
delay.
|
|
"""
|
|
self.version = version
|
|
self.samplerate = samplerate
|
|
self.stereomode = 1 #0 = mono, 1 = stereo LR, 2 = stereo Mid/Side
|
|
self.audiomono = False
|
|
|
|
os.environ["pyogg_win_libopus_version"] = version
|
|
importlib.reload(pyogg.opus)
|
|
|
|
self.Lencoder = pyogg.OpusBufferedEncoder()
|
|
self.Rencoder = pyogg.OpusBufferedEncoder()
|
|
|
|
self.Lencoder.set_application(app)
|
|
self.Rencoder.set_application(app)
|
|
|
|
self.Lencoder.set_sampling_frequency(samplerate)
|
|
self.Rencoder.set_sampling_frequency(samplerate)
|
|
|
|
self.Lencoder.set_channels(1)
|
|
self.Rencoder.set_channels(1)
|
|
|
|
self.set_frame_size()
|
|
self.set_compression()
|
|
self.set_feature()
|
|
self.set_bitrate_mode()
|
|
self.set_bitrates()
|
|
self.set_bandwidth()
|
|
self.set_packet_loss()
|
|
|
|
def set_compression(self, level=10):
|
|
"""complex 0-10 low-hires"""
|
|
self.Lencoder.set_compresion_complex(level)
|
|
self.Rencoder.set_compresion_complex(level)
|
|
|
|
def set_bitrates(self, bitrates=64000, samebitrate=False):
|
|
"""input birate unit: bps"""
|
|
if bitrates <= 5000:
|
|
bitrates = 5000
|
|
|
|
if samebitrate:
|
|
bitperchannel = bitrates
|
|
else:
|
|
bitperchannel = bitrates / 2
|
|
|
|
self.Lencoder.set_bitrates(int(bitperchannel))
|
|
self.Rencoder.set_bitrates(int(bitperchannel))
|
|
|
|
def set_bandwidth(self, bandwidth="fullband"):
|
|
"""
|
|
narrowband:
|
|
Narrowband typically refers to a limited range of frequencies suitable for voice communication.
|
|
|
|
mediumband (unsupported in libopus 1.3+):
|
|
Mediumband extends the frequency range compared to narrowband, providing better audio quality.
|
|
|
|
wideband:
|
|
Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband.
|
|
|
|
superwideband:
|
|
Superwideband extends the frequency range beyond wideband, further enhancing audio quality.
|
|
|
|
fullband (default):
|
|
Fullband provides the widest frequency range among the listed options, offering the highest audio quality.
|
|
|
|
auto: opus is working auto not force
|
|
"""
|
|
self.Lencoder.set_bandwidth(bandwidth)
|
|
self.Rencoder.set_bandwidth(bandwidth)
|
|
|
|
def set_stereo_mode(self, mode=1, audiomono=False):
|
|
"""
|
|
0 = mono
|
|
1 = stereo LR
|
|
2 = stereo Mid/Side (Joint encoding)
|
|
"""
|
|
if mode > 2:
|
|
mode = 1
|
|
|
|
self.stereomode = mode
|
|
self.audiomono = audiomono
|
|
|
|
def set_frame_size(self, size=60):
|
|
""" Set the desired frame duration (in milliseconds).
|
|
Valid options are 2.5, 5, 10, 20, 40, or 60ms.
|
|
Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms.
|
|
|
|
@return chunk size
|
|
"""
|
|
if self.version != "hev2" and size > 60:
|
|
raise ValueError("non hev2 can't use framesize > 60")
|
|
|
|
self.Lencoder.set_frame_size(size)
|
|
self.Rencoder.set_frame_size(size)
|
|
|
|
return int((size / 1000) * self.samplerate)
|
|
|
|
def set_packet_loss(self, loss=0):
|
|
"""input: % percent"""
|
|
if loss > 100:
|
|
raise ValueError("percent must <=100")
|
|
|
|
self.Lencoder.set_packets_loss(loss)
|
|
self.Rencoder.set_packets_loss(loss)
|
|
|
|
def set_bitrate_mode(self, mode="CVBR"):
|
|
"""VBR, CVBR, CBR
|
|
VBR in 1.5.x replace by CVBR
|
|
"""
|
|
|
|
self.Lencoder.set_bitrate_mode(mode)
|
|
self.Rencoder.set_bitrate_mode(mode)
|
|
|
|
def set_feature(self, prediction=False, phaseinvert=False, DTX=False):
|
|
self.Lencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction))
|
|
self.Lencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert))
|
|
self.Lencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
|
|
|
|
self.Rencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction))
|
|
self.Rencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert))
|
|
self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
|
|
|
|
def enable_voice_mode(self, enable=True, auto=False):
|
|
self.Lencoder.enable_voice_enhance(enable, auto)
|
|
self.Rencoder.enable_voice_enhance(enable, auto)
|
|
|
|
def encode(self, pcmbytes, directpcm=False):
|
|
"""input: pcm bytes accept float32/int16 only
|
|
x74 is mono
|
|
x75 is stereo LR
|
|
x76 is stereo mid/side
|
|
|
|
xnl is no side audio
|
|
"""
|
|
if directpcm:
|
|
if pcmbytes.dtype == np.float32:
|
|
pcm = (pcmbytes * 32767).astype(np.int16)
|
|
elif pcmbytes.dtype == np.int16:
|
|
pcm = pcmbytes.astype(np.int16)
|
|
else:
|
|
raise TypeError("accept only int16/float32")
|
|
else:
|
|
pcm = np.frombuffer(pcmbytes, dtype=np.int16)
|
|
|
|
if self.stereomode == 0:
|
|
# mono
|
|
left_channel = pcm[::2]
|
|
right_channel = pcm[1::2]
|
|
mono = (left_channel + right_channel) / 2
|
|
|
|
intmono = float32_to_int16(mono)
|
|
|
|
midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(intmono)), flush=True)[0][0].tobytes()
|
|
|
|
dual_encoded_packet = (midencoded_packet + b'\\x64\\x74')
|
|
elif self.stereomode == 2:
|
|
# stereo mid/side (Joint encoding)
|
|
# convert to float32
|
|
pcm = int16_to_float32(pcm)
|
|
|
|
left_channel = pcm[::2]
|
|
right_channel = pcm[1::2]
|
|
|
|
mid = (left_channel + right_channel) / 2
|
|
side = (left_channel - right_channel) / 2
|
|
|
|
# convert back to int16
|
|
mid = float32_to_int16(mid)
|
|
intside = float32_to_int16(side)
|
|
|
|
midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes()
|
|
|
|
# check if side is no audio or loudness <= -50 DBFS
|
|
try:
|
|
loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side))))
|
|
except:
|
|
loudnessside = 0
|
|
|
|
if (loudnessside) <= -50 and self.audiomono:
|
|
sideencoded_packet = b"\\xnl"
|
|
else:
|
|
sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes()
|
|
|
|
dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet)
|
|
else:
|
|
# stereo LR
|
|
left_channel = pcm[::2]
|
|
right_channel = pcm[1::2]
|
|
|
|
Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes()
|
|
Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][0].tobytes()
|
|
|
|
dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet)
|
|
|
|
return dual_encoded_packet
|
|
|
|
class PSOpusEncoder:
|
|
def __init__(self, app="audio", samplerate=48000, version="stable"):
|
|
"""
|
|
This version is xHE-Opus v2 (Parametric Stereo)
|
|
----------------------------- version--------------------------
|
|
hev2: libopus 1.5.1 (fre:ac)
|
|
exper: libopus 1.5.1
|
|
stable: libopus 1.4
|
|
old: libopus 1.3.1
|
|
custom: custom opus path you can use "pyogg_win_libopus_custom_path" env to change opus version (windows only)
|
|
------------------------- App----------------------------------
|
|
|
|
Set the encoding mode.
|
|
|
|
This must be one of 'voip', 'audio', or 'restricted_lowdelay'.
|
|
|
|
'voip': Gives best quality at a given bitrate for voice
|
|
signals. It enhances the input signal by high-pass
|
|
filtering and emphasizing formants and
|
|
harmonics. Optionally it includes in-band forward error
|
|
correction to protect against packet loss. Use this mode
|
|
for typical VoIP applications. Because of the enhancement,
|
|
even at high bitrates the output may sound different from
|
|
the input.
|
|
|
|
'audio': Gives best quality at a given bitrate for most
|
|
non-voice signals like music. Use this mode for music and
|
|
mixed (music/voice) content, broadcast, and applications
|
|
requiring less than 15 ms of coding delay.
|
|
|
|
'restricted_lowdelay': configures low-delay mode that
|
|
disables the speech-optimized mode in exchange for
|
|
slightly reduced delay. This mode can only be set on an
|
|
newly initialized encoder because it changes the codec
|
|
delay.
|
|
"""
|
|
self.version = version
|
|
self.samplerate = samplerate
|
|
|
|
os.environ["pyogg_win_libopus_version"] = version
|
|
importlib.reload(pyogg.opus)
|
|
|
|
self.encoder = pyogg.OpusBufferedEncoder()
|
|
|
|
self.encoder.set_application(app)
|
|
|
|
self.encoder.set_sampling_frequency(samplerate)
|
|
|
|
self.encoder.set_channels(1)
|
|
|
|
self.set_frame_size()
|
|
self.set_compression()
|
|
self.set_feature()
|
|
self.set_bitrate_mode()
|
|
self.set_bitrates()
|
|
self.set_bandwidth()
|
|
self.set_packet_loss()
|
|
|
|
def set_compression(self, level=10):
|
|
"""complex 0-10 low-hires"""
|
|
self.encoder.set_compresion_complex(level)
|
|
|
|
def set_bitrates(self, bitrates=64000):
|
|
"""input birate unit: bps"""
|
|
if bitrates <= 2500:
|
|
bitrates = 2500
|
|
|
|
self.encoder.set_bitrates(bitrates)
|
|
|
|
def set_bandwidth(self, bandwidth="fullband"):
|
|
"""
|
|
narrowband:
|
|
Narrowband typically refers to a limited range of frequencies suitable for voice communication.
|
|
|
|
mediumband (unsupported in libopus 1.3+):
|
|
Mediumband extends the frequency range compared to narrowband, providing better audio quality.
|
|
|
|
wideband:
|
|
Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband.
|
|
|
|
superwideband:
|
|
Superwideband extends the frequency range beyond wideband, further enhancing audio quality.
|
|
|
|
fullband (default):
|
|
Fullband provides the widest frequency range among the listed options, offering the highest audio quality.
|
|
|
|
auto: opus is working auto not force
|
|
"""
|
|
self.encoder.set_bandwidth(bandwidth)
|
|
|
|
def set_frame_size(self, size=60):
|
|
""" Set the desired frame duration (in milliseconds).
|
|
Valid options are 2.5, 5, 10, 20, 40, or 60ms.
|
|
Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms.
|
|
|
|
@return chunk size
|
|
"""
|
|
if self.version != "hev2" and size > 60:
|
|
raise ValueError("non hev2 can't use framesize > 60")
|
|
|
|
self.encoder.set_frame_size(size)
|
|
|
|
return int((size / 1000) * self.samplerate)
|
|
|
|
def set_packet_loss(self, loss=0):
|
|
"""input: % percent"""
|
|
if loss > 100:
|
|
raise ValueError("percent must <=100")
|
|
|
|
self.encoder.set_packets_loss(loss)
|
|
|
|
def set_bitrate_mode(self, mode="CVBR"):
|
|
"""VBR, CVBR, CBR
|
|
VBR in 1.5.x replace by CVBR
|
|
"""
|
|
|
|
self.encoder.set_bitrate_mode(mode)
|
|
|
|
def set_feature(self, prediction=False, phaseinvert=False, DTX=False):
|
|
self.encoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction))
|
|
self.encoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert))
|
|
self.encoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
|
|
|
|
def enable_voice_mode(self, enable=True, auto=False):
|
|
self.encoder.enable_voice_enhance(enable, auto)
|
|
|
|
def __parameterization(self, stereo_signal):
|
|
# Convert int16 to float32 for processing
|
|
stereo_signal = stereo_signal.astype(np.float32) / 32768.0
|
|
|
|
# Reshape stereo_signal into a 2D array with two channels
|
|
stereo_signal = stereo_signal.reshape((-1, 2))
|
|
|
|
# Calculate the magnitude spectrogram for each channel
|
|
mag_left = np.abs(np.fft.fft(stereo_signal[:, 0]))
|
|
mag_right = np.abs(np.fft.fft(stereo_signal[:, 1]))
|
|
|
|
# Calculate the phase difference between the left and right channels
|
|
phase_diff = np.angle(stereo_signal[:, 0]) - np.angle(stereo_signal[:, 1])
|
|
|
|
# Compute other spatial features
|
|
# Calculate stereo width
|
|
stereo_width = np.mean(np.correlate(mag_left, mag_right, mode='full'))
|
|
|
|
# Calculate phase coherence
|
|
phase_coherence = np.mean(np.cos(phase_diff))
|
|
|
|
# Calculate stereo panning
|
|
stereo_panning_left = np.mean(mag_left / (mag_left + mag_right))
|
|
stereo_panning_right = np.mean(mag_right / (mag_left + mag_right))
|
|
|
|
pan = stereo_panning_right - stereo_panning_left
|
|
|
|
# Return the derived parameters
|
|
return (int(stereo_width), phase_coherence, pan)
|
|
|
|
def encode(self, pcmbytes, directpcm=False):
|
|
"""input: pcm bytes accept float32/int16 only
|
|
x74 is mono
|
|
x75 is stereo LR
|
|
x76 is stereo mid/side
|
|
|
|
xnl is no side audio
|
|
"""
|
|
if directpcm:
|
|
if pcmbytes.dtype == np.float32:
|
|
pcm = (pcmbytes * 32767).astype(np.int16)
|
|
elif pcmbytes.dtype == np.int16:
|
|
pcm = pcmbytes.astype(np.int16)
|
|
else:
|
|
raise TypeError("accept only int16/float32")
|
|
else:
|
|
pcm = np.frombuffer(pcmbytes, dtype=np.int16)
|
|
|
|
pcmreshaped = pcm.reshape(-1, 2)
|
|
|
|
mono_data = np.mean(pcmreshaped * 0.5, axis=1, dtype=np.int16)
|
|
|
|
stereodata = self.__parameterization(pcmreshaped)
|
|
packedstereodata = struct.pack('iff', *stereodata)
|
|
|
|
encoded_packet = self.encoder.buffered_encode(memoryview(bytearray(mono_data)), flush=True)[0][0].tobytes()
|
|
|
|
encoded_packet = (encoded_packet + b'\\x21\\x75' + packedstereodata)
|
|
|
|
return encoded_packet
|
|
|
|
class xOpusDecoder:
|
|
def __init__(self, sample_rate=48000):
|
|
self.Ldecoder = pyogg.OpusDecoder()
|
|
self.Rdecoder = pyogg.OpusDecoder()
|
|
|
|
self.Ldecoder.set_channels(1)
|
|
self.Rdecoder.set_channels(1)
|
|
|
|
self.Ldecoder.set_sampling_frequency(sample_rate)
|
|
self.Rdecoder.set_sampling_frequency(sample_rate)
|
|
|
|
self.__prev_pan = 0.0
|
|
|
|
def __smooth(self, value, prev_value, alpha=0.1):
|
|
return alpha * value + (1 - alpha) * prev_value
|
|
|
|
def __expand_and_pan(self, input_signal, pan_value, expansion_factor, gain):
|
|
"""
|
|
Apply stereo expansion and panning to an input audio signal.
|
|
|
|
Parameters:
|
|
- input_signal: Input audio signal (numpy array of int16).
|
|
- expansion_factor: Factor to expand the stereo width (0 to 1).
|
|
- pan_value: Pan value (-1 to 1, where -1 is full left, 1 is full right).
|
|
- gain: Gain factor to adjust the volume.
|
|
|
|
Returns:
|
|
- output_signal: Processed audio signal (stereo, numpy array of int16).
|
|
"""
|
|
|
|
# Convert int16 to float32 for processing
|
|
input_signal_float = input_signal.astype(np.float32) / 32768.0
|
|
|
|
# Separate the channels
|
|
left_channel = input_signal_float[:, 0]
|
|
right_channel = input_signal_float[:, 1]
|
|
|
|
# Apply panning
|
|
pan_left = (1 - pan_value) / 2
|
|
pan_right = (1 + pan_value) / 2
|
|
left_channel *= pan_left
|
|
right_channel *= pan_right
|
|
|
|
# Apply stereo expansion
|
|
center = (left_channel + right_channel) / 2
|
|
left_channel = center + (left_channel - center) * expansion_factor
|
|
right_channel = center + (right_channel - center) * expansion_factor
|
|
|
|
# Apply gain
|
|
left_channel *= gain
|
|
right_channel *= gain
|
|
|
|
# Ensure no clipping by normalizing if necessary
|
|
max_val = max(np.max(np.abs(left_channel)), np.max(np.abs(right_channel)))
|
|
if max_val > 1.0:
|
|
left_channel /= max_val
|
|
right_channel /= max_val
|
|
|
|
# Merge the channels
|
|
output_signal = np.stack((left_channel, right_channel), axis=-1)
|
|
|
|
return (output_signal * 32767).astype(np.int16)
|
|
|
|
def __mix_stereo_signals(self, signal1, signal2, volume1=1.0, volume2=1.0):
|
|
# Ensure both signals have the same length
|
|
length = max(len(signal1), len(signal2))
|
|
signal1 = np.pad(signal1, ((0, length - len(signal1)), (0, 0)), mode='constant')
|
|
signal2 = np.pad(signal2, ((0, length - len(signal2)), (0, 0)), mode='constant')
|
|
|
|
# Convert signals to float
|
|
signal1 = signal1.astype(np.float32)
|
|
signal2 = signal2.astype(np.float32)
|
|
|
|
# Adjust volume
|
|
signal1 *= volume1
|
|
signal2 *= volume2
|
|
|
|
# Mix the signals
|
|
mixed_signal = signal1 + signal2
|
|
|
|
# Normalize the mixed signal to prevent clipping
|
|
max_amplitude = np.max(np.abs(mixed_signal))
|
|
if max_amplitude > 32767:
|
|
mixed_signal = (mixed_signal / max_amplitude) * 32767
|
|
|
|
return mixed_signal.astype(np.int16)
|
|
|
|
def __apply_smoothing_window(self, audio_data, window_size):
|
|
"""
|
|
Apply a smoothing window to the beginning and end of the audio data.
|
|
|
|
Parameters:
|
|
- audio_data: 2D numpy array with shape (num_samples, 2)
|
|
- window_size: Size of the smoothing window in samples
|
|
|
|
Returns:
|
|
- smoothed_audio_data: 2D numpy array with the smoothing window applied
|
|
"""
|
|
window = np.hanning(window_size * 2)
|
|
fade_in = window[:window_size]
|
|
fade_out = window[-window_size:]
|
|
|
|
audio_data[:window_size, :] *= fade_in[:, np.newaxis]
|
|
audio_data[-window_size:, :] *= fade_out[:, np.newaxis]
|
|
|
|
return audio_data
|
|
|
|
def __stereo_widening_effect(self, data, delay_samples=10, gain=0.8, window_size=100):
|
|
audio_data = data.reshape(-1, 2)
|
|
|
|
# Convert int16 to float32 for processing
|
|
audio_data = audio_data.astype(np.float32)
|
|
|
|
# Apply delay to the right channel
|
|
right_channel = np.roll(audio_data[:, 1], delay_samples)
|
|
|
|
# Apply gain to both channels
|
|
audio_data[:, 0] *= gain
|
|
right_channel *= gain
|
|
|
|
# Combine channels back into stereo
|
|
widened_audio_data = np.stack((audio_data[:, 0], right_channel), axis=1)
|
|
|
|
# Apply smoothing window to reduce clicks
|
|
widened_audio_data = self.__apply_smoothing_window(widened_audio_data, window_size)
|
|
|
|
# Clip to avoid overflow
|
|
widened_audio_data = np.clip(widened_audio_data, -32768, 32767)
|
|
|
|
# Convert float32 back to int16
|
|
widened_audio_data = widened_audio_data.astype(np.int16)
|
|
|
|
return widened_audio_data
|
|
|
|
def __apply_phase_coherence_to_stereo(self, signal, phase_coherence):
|
|
# Convert phase coherence to phase shift in radians
|
|
phase_shift = np.arccos(phase_coherence)
|
|
# Apply phase shift to both channels
|
|
return self.__apply_phase_shift(signal, phase_shift)
|
|
|
|
# Function to apply phase shift to one channel
|
|
def __apply_phase_shift(self, signal, phase_shift):
|
|
# Convert to complex
|
|
signal_complex = signal.astype(np.complex64)
|
|
# Apply phase shift
|
|
shifted_signal = signal_complex * np.exp(1j * phase_shift)
|
|
return shifted_signal.astype(np.int16)
|
|
|
|
def __butter_lowpass_filter_stereo(self, data, cutoff, fs, order=5):
|
|
nyq = 0.5 * fs
|
|
normal_cutoff = cutoff / nyq
|
|
b, a = butter(order, normal_cutoff, btype='low', analog=False)
|
|
filtered_data = np.apply_along_axis(lambda x: filtfilt(b, a, x), axis=0, arr=data)
|
|
return filtered_data.astype(np.int16)
|
|
|
|
def __synthstereo(self, mono_signal, stereodata):
|
|
pan = stereodata[2]
|
|
|
|
# Smooth the pan value
|
|
pan = self.__smooth(pan, self.__prev_pan, alpha=0.25)
|
|
self.__prev_pan = pan
|
|
|
|
stereo_exp = stereodata[0] / 10000
|
|
|
|
try:
|
|
delayed = self.__stereo_widening_effect(mono_signal, int(stereo_exp), 1, int(stereo_exp) * 2)
|
|
except:
|
|
delayed = mono_signal
|
|
|
|
l1 = self.__expand_and_pan(mono_signal, pan, 1, 2)
|
|
|
|
stereo_signal_shifted = self.__apply_phase_coherence_to_stereo(delayed, stereodata[1])
|
|
|
|
return self.__mix_stereo_signals(l1, stereo_signal_shifted, volume1=1, volume2=0.5).astype(np.int16)
|
|
|
|
def decode(self, dualopusbytes: bytes, outputformat=np.int16):
|
|
# mode check
|
|
if b"\\x64\\x74" in dualopusbytes:
|
|
mode = 0
|
|
xopusbytespilted = dualopusbytes.split(b'\\x64\\x74')
|
|
elif b"\\x64\\x76" in dualopusbytes:
|
|
mode = 2
|
|
xopusbytespilted = dualopusbytes.split(b'\\x64\\x76')
|
|
elif b"\\x64\\x75" in dualopusbytes:
|
|
mode = 1
|
|
xopusbytespilted = dualopusbytes.split(b'\\x64\\x75')
|
|
elif b"\\x21\\x75" in dualopusbytes:
|
|
mode = 3 # v2
|
|
xopusbytespilted = dualopusbytes.split(b'\\x21\\x75')
|
|
else:
|
|
raise TypeError("this is not xopus bytes")
|
|
|
|
if mode == 0: # mono
|
|
Mencoded_packet = xopusbytespilted[0]
|
|
decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet)))
|
|
Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16)
|
|
|
|
stereo_signal = np.column_stack((Mpcm, Mpcm))
|
|
|
|
elif mode == 2:
|
|
# stereo mid/side (Joint encoding)
|
|
Mencoded_packet = xopusbytespilted[0]
|
|
Sencoded_packet = xopusbytespilted[1]
|
|
|
|
decoded_mid_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet)))
|
|
Mpcm = np.frombuffer(decoded_mid_channel_pcm, dtype=np.int16)
|
|
|
|
if Sencoded_packet != b"\\xnl":
|
|
decoded_side_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Sencoded_packet)))
|
|
Spcm = np.frombuffer(decoded_side_channel_pcm, dtype=np.int16)
|
|
|
|
Mpcm = int16_to_float32(Mpcm)
|
|
Spcm = int16_to_float32(Spcm)
|
|
|
|
L = Mpcm + Spcm
|
|
R = Mpcm - Spcm
|
|
|
|
stereo_signal = np.column_stack((L, R))
|
|
|
|
max_amplitude = np.max(np.abs(stereo_signal))
|
|
if max_amplitude > 1.0:
|
|
stereo_signal /= max_amplitude
|
|
|
|
stereo_signal = float32_to_int16(stereo_signal)
|
|
else:
|
|
stereo_signal = np.column_stack((Mpcm, Mpcm))
|
|
elif mode == 3:
|
|
Mencoded_packet = xopusbytespilted[0]
|
|
stereodatapacked = xopusbytespilted[1]
|
|
|
|
stereodata = struct.unpack('iff', stereodatapacked)
|
|
|
|
mono_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Mencoded_packet)))
|
|
Mpcm = np.frombuffer(mono_channel_pcm, dtype=np.int16)
|
|
|
|
stereo_audio = np.stack((Mpcm, Mpcm)).T.reshape(-1, 2)
|
|
|
|
stereo_signal = self.__synthstereo(stereo_audio, stereodata)
|
|
else:
|
|
# stereo LR
|
|
Lencoded_packet = xopusbytespilted[0]
|
|
Rencoded_packet = xopusbytespilted[1]
|
|
|
|
decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet)))
|
|
decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet)))
|
|
|
|
Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16)
|
|
Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=np.int16)
|
|
|
|
stereo_signal = np.column_stack((Lpcm, Rpcm))
|
|
|
|
return stereo_signal.astype(outputformat).tobytes()
|
|
|
|
class HeaderContainer:
|
|
def __init__(self, capture_pattern, version, metadata):
|
|
self.capture_pattern = capture_pattern
|
|
self.version = version
|
|
self.metadata = metadata
|
|
|
|
def serialize(self):
|
|
header = struct.pack('<4sB', self.capture_pattern, self.version)
|
|
metadata_bytes = self.serialize_metadata()
|
|
return header + metadata_bytes
|
|
|
|
def serialize_metadata(self):
|
|
metadata_bytes = b''
|
|
for key, value in self.metadata.items():
|
|
key_bytes = key.encode('utf-8')
|
|
value_bytes = value.encode('utf-8') if isinstance(value, str) else str(value).encode('utf-8')
|
|
metadata_bytes += struct.pack(f'<I{len(key_bytes)}sI{len(value_bytes)}s', len(key_bytes), key_bytes, len(value_bytes), value_bytes)
|
|
return metadata_bytes
|
|
|
|
@classmethod
|
|
def deserialize(cls, data):
|
|
capture_pattern, version = struct.unpack_from('<4sB', data)
|
|
metadata_start = struct.calcsize('<4sB')
|
|
metadata = cls.deserialize_metadata(data[metadata_start:])
|
|
return cls(capture_pattern, version, metadata)
|
|
|
|
@staticmethod
|
|
def deserialize_metadata(metadata_bytes):
|
|
metadata = {}
|
|
while metadata_bytes:
|
|
key_length = struct.unpack('<I', metadata_bytes[:4])[0]
|
|
key = struct.unpack(f'<{key_length}s', metadata_bytes[4:4+key_length])[0].decode('utf-8')
|
|
metadata_bytes = metadata_bytes[4+key_length:]
|
|
value_length = struct.unpack('<I', metadata_bytes[:4])[0]
|
|
value = struct.unpack(f'<{value_length}s', metadata_bytes[4:4+value_length])[0].decode('utf-8')
|
|
metadata_bytes = metadata_bytes[4+value_length:]
|
|
metadata[key] = value
|
|
return metadata
|
|
|
|
class FooterContainer:
|
|
def __init__(self, loudness_avg, length):
|
|
self.loudness_avg = loudness_avg
|
|
self.length = length
|
|
|
|
def serialize(self):
|
|
metadata_bytes = self.serialize_metadata()
|
|
return metadata_bytes
|
|
|
|
def serialize_metadata(self):
|
|
metadata_bytes = b''
|
|
metadata_bytes += struct.pack('<f', self.loudness_avg)
|
|
metadata_bytes += struct.pack('<I', self.length)
|
|
return metadata_bytes
|
|
|
|
@classmethod
|
|
def deserialize(cls, data):
|
|
loudness_avg, length = cls.deserialize_metadata(data)
|
|
return cls(loudness_avg, length)
|
|
|
|
@staticmethod
|
|
def deserialize_metadata(metadata_bytes):
|
|
loudness_avg = struct.unpack('<f', metadata_bytes[:4])[0]
|
|
length = struct.unpack('<I', metadata_bytes[4:8])[0]
|
|
return loudness_avg, length
|
|
|
|
class XopusWriter:
|
|
def __init__(self, file, encoder: DualOpusEncoder, metadata=None):
|
|
self.file = file
|
|
self.encoder = encoder
|
|
|
|
if metadata is None:
|
|
metadata = {}
|
|
|
|
systemmetadata = {
|
|
"format": "Xopus",
|
|
"audio": {
|
|
"encoder": "libxheopus",
|
|
"format": "xHE-Opus",
|
|
"format/info": "Extended High Efficiency Opus Audio Codec"
|
|
}
|
|
}
|
|
|
|
open(file, 'wb').write(b"") # clear
|
|
self.xopusfile = open(file, 'ab')
|
|
self.xopusfile.write(HeaderContainer(b'OpuS', 1, metadata | systemmetadata).serialize() + b"\\xa")
|
|
|
|
self.loudnessperframe = []
|
|
self.length = 0
|
|
|
|
def write(self, pcmbytes):
|
|
pcm = np.frombuffer(pcmbytes, dtype=np.int16)
|
|
# Convert int16 audio data to floating-point values in range [-1, 1]
|
|
normalized_audio = pcm / 32767.0
|
|
|
|
# Calculate RMS value
|
|
rms = np.sqrt(np.mean(np.square(normalized_audio)))
|
|
|
|
# Calculate dBFS
|
|
try:
|
|
dbfs = 20 * math.log10(rms)
|
|
except:
|
|
dbfs = 0
|
|
self.loudnessperframe.append(dbfs)
|
|
|
|
encoded = self.encoder.encode(pcm, directpcm=True)
|
|
self.xopusfile.write(encoded + b"\\xa")
|
|
self.length += 1
|
|
|
|
def close(self):
|
|
loudnessavgs = sum(self.loudnessperframe) / len(self.loudnessperframe)
|
|
|
|
self.xopusfile.write(b"\\xeof\\xeof")
|
|
self.xopusfile.write(FooterContainer(loudnessavgs, self.length).serialize())
|
|
self.loudnessperframe = []
|
|
self.length = 0
|
|
|
|
class XopusReader:
|
|
def __init__(self, file):
|
|
self.file = open(file, 'rb')
|
|
self.xopusline = self.file.read().split(b"\\xa")
|
|
|
|
def readmetadata(self):
|
|
header = HeaderContainer.deserialize(self.xopusline[0])
|
|
|
|
if self.xopusline[-1].startswith(b"\\xeof\\xeof"):
|
|
footer = FooterContainer.deserialize(self.xopusline[-1].split(b"\\xeof\\xeof")[1])
|
|
else:
|
|
raise EOFError("can't find EOF")
|
|
|
|
data = {
|
|
"header": dict(header.metadata),
|
|
"footer": {
|
|
"contentloudness": footer.loudness_avg,
|
|
"length": footer.length
|
|
}
|
|
}
|
|
return data
|
|
|
|
def decode(self, decoder, play=False, start=0):
|
|
if play:
|
|
for data in self.xopusline[start + 1:]:
|
|
if data.startswith(b"\\xeof\\xeof"):
|
|
break
|
|
else:
|
|
try:
|
|
yield decoder.decode(data)
|
|
except Exception as e:
|
|
#print(e)
|
|
yield b""
|
|
else:
|
|
decodedlist = []
|
|
for data in self.xopusline[1:]:
|
|
if data.startswith(b"\\xeof\\xeof"):
|
|
break
|
|
else:
|
|
try:
|
|
decodedlist.append(decoder.decode(data))
|
|
except:
|
|
decodedlist.append(b"")
|
|
return decodedlist
|
|
|
|
def close(self):
|
|
self.xopusline = []
|
|
self.file.close() |