Compare commits

..

No commits in common. "main" and "xopustoolsv1" have entirely different histories.

5 changed files with 92 additions and 195 deletions

View File

@ -40,11 +40,7 @@ else:
print("no profile available") print("no profile available")
sys.exit() sys.exit()
if args.profile == 1: encoder.set_bitrates(args.bitrate, args.samebitrate)
encoder.set_bitrates(args.bitrate, args.samebitrate)
elif args.profile == 2:
encoder.set_bitrates(args.bitrate)
encoder.set_bitrate_mode(args.bitmode) encoder.set_bitrate_mode(args.bitmode)
encoder.set_bandwidth(args.bandwidth) encoder.set_bandwidth(args.bandwidth)
encoder.set_compression(args.compress) encoder.set_compression(args.compress)

35
gui.py
View File

@ -1,4 +1,3 @@
print("Please wait...")
import dearpygui.dearpygui as dpg import dearpygui.dearpygui as dpg
import easygui import easygui
import threading import threading
@ -92,14 +91,10 @@ class App:
signaltype = str(dpg.get_value("opussignaltype")).lower() signaltype = str(dpg.get_value("opussignaltype")).lower()
profile = str(dpg.get_value("opusprofile")).strip().lower() profile = str(dpg.get_value("opusprofile")).strip().lower()
stereomode = str(dpg.get_value("opusstereomode")).lower() stereomode = str(dpg.get_value("opusstereomode")).lower()
bitbalance = dpg.get_value("opusbitratebalance") if stereomode == "stereo l/r":
if stereomode == "l/r":
stereomode = 1 stereomode = 1
elif stereomode == "mid/side": elif stereomode == "stereo mid/side":
stereomode = 2 stereomode = 2
elif stereomode == "intensity":
stereomode = 3
else: else:
stereomode = 2 stereomode = 2
@ -113,29 +108,27 @@ class App:
signalauto = True signalauto = True
signalvoice = False signalvoice = False
if bitbalance == -1:
bitbalance = None
try: try:
total = 0 total = 0
current = 0 current = 0
filename = os.path.splitext(os.path.basename(self.inputfilepath))[0] filename = os.path.splitext(os.path.basename(self.inputfilepath))[0]
dpg.set_value("convertstatus", "init encoder...") dpg.set_value("convertstatus", "init encoder...")
print(profile)
if profile == "xhe-opus v1": if profile == "xhe-opus v1":
encoder = libxheopus.DualOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion")) encoder = libxheopus.DualOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion"))
encoder.set_stereo_mode(stereomode, dpg.get_value("opusautomono"), dpg.get_value("opusmsautomonogate"), dpg.get_value("opusintensity"))
encoder.set_bitrates(int(dpg.get_value("opusbitrate") * 1000), balance_percent=bitbalance)
else: else:
encoder = libxheopus.PSOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion")) encoder = libxheopus.PSOpusEncoder(dpg.get_value("opusapp"), 48000, dpg.get_value("opusversion"))
encoder.set_bitrates(int(dpg.get_value("opusbitrate") * 1000))
encoder.set_bitrate_mode(dpg.get_value("opusbitmode")) encoder.set_bitrate_mode(dpg.get_value("opusbitmode"))
encoder.set_bandwidth(dpg.get_value("opusbandwidth")) encoder.set_bandwidth(dpg.get_value("opusbandwidth"))
encoder.set_bitrates(int(dpg.get_value("opusbitrate")*1000))
encoder.set_compression(dpg.get_value("opuscompression")) encoder.set_compression(dpg.get_value("opuscompression"))
encoder.set_packet_loss(dpg.get_value("opuspacketloss")) encoder.set_packet_loss(dpg.get_value("opuspacketloss"))
if profile != "xhe-opus v2":
encoder.set_stereo_mode(stereomode, dpg.get_value("opusenajoint"))
encoder.set_feature(dpg.get_value("opusenapred"), False, dpg.get_value("opusenadtx")) encoder.set_feature(dpg.get_value("opusenapred"), False, dpg.get_value("opusenadtx"))
encoder.enable_voice_mode(signalvoice, signalauto) encoder.enable_voice_mode(signalvoice, signalauto)
@ -292,33 +285,25 @@ class App:
thread.start() thread.start()
def window(self): def window(self):
with dpg.window(label="Encoder", width=500, no_close=True): with dpg.window(label="Encoder", width=420, no_close=True):
dpg.add_text("input:", tag="inpathshow") dpg.add_text("input:", tag="inpathshow")
dpg.add_text("output:", tag="outpathshow") dpg.add_text("output:", tag="outpathshow")
dpg.add_button(label="Select Input File", callback=self.selectinputfile) dpg.add_button(label="Select Input File", callback=self.selectinputfile)
dpg.add_button(label="Select Output Path", callback=self.selectoutputpath) dpg.add_button(label="Select Output Path", callback=self.selectoutputpath)
dpg.add_combo(["xHE-Opus v1", "xHE-Opus v2"], label="Profile", default_value="xHE-Opus v1", tag="opusprofile", callback=self.changeprofileopus) dpg.add_combo(["xHE-Opus v1", "xHE-Opus v2"], label="Profile", default_value="xHE-Opus v1", tag="opusprofile", callback=self.changeprofileopus)
dpg.add_combo(["hev2", "exper", "stable", "old"], label="Version", default_value="hev2", tag="opusversion", callback=self.changeversionopus) dpg.add_combo(["hev2", "exper", "stable", "old"], label="Version", default_value="hev2", tag="opusversion", callback=self.changeversionopus)
dpg.add_combo(["120", "100", "80", "60", "40", "20", "10", "5"], label="Frame Size (ms)", tag="opusframesize", default_value="120") dpg.add_combo(["120", "100", "80", "60", "40", "20", "10", "5"], label="Frame Size (ms)", tag="opusframesize", default_value="120")
dpg.add_combo(["voip", "audio", "restricted_lowdelay"], label="Application", default_value="restricted_lowdelay", tag="opusapp") dpg.add_combo(["voip", "audio", "restricted_lowdelay"], label="Application", default_value="restricted_lowdelay", tag="opusapp")
dpg.add_combo(["VBR", "CVBR", "CBR"], label="Bitrate Mode", default_value="CVBR", tag="opusbitmode") dpg.add_combo(["VBR", "CVBR", "CBR"], label="Bitrate Mode", default_value="CVBR", tag="opusbitmode")
dpg.add_combo(["auto", "fullband", "superwideband", "wideband", "mediumband", "narrowband"], label="Bandwidth", tag="opusbandwidth", default_value="fullband") dpg.add_combo(["auto", "fullband", "superwideband", "wideband", "mediumband", "narrowband"], label="Bandwidth", tag="opusbandwidth", default_value="fullband")
dpg.add_combo(["L/R", "Mid/Side", "Intensity"], label="Stereo Mode", tag="opusstereomode", default_value="L/R") dpg.add_combo(["Stereo L/R", "Stereo Mid/Side"], label="Stereo Mode", tag="opusstereomode", default_value="Stereo L/R")
dpg.add_combo(["Auto", "Voice", "Music"], label="Signal Type", tag="opussignaltype", default_value="Auto") dpg.add_combo(["Auto", "Voice", "Music"], label="Signal Type", tag="opussignaltype", default_value="Auto")
dpg.add_input_int(label="Bitrates Balance", min_value=-1, max_value=100, min_clamped=True, max_clamped=True, step_fast=1, default_value=-1, tag="opusbitratebalance")
dpg.add_input_float(label="Bitrates", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64, tag="opusbitrate") dpg.add_input_float(label="Bitrates", min_value=5, max_value=1020, min_clamped=True, max_clamped=True, step_fast=1, default_value=64, tag="opusbitrate")
dpg.add_input_int(label="Compression Level", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=10, tag="opuscompression") dpg.add_input_int(label="Compression Level", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=10, tag="opuscompression")
dpg.add_input_int(label="Packet Loss", max_clamped=True, min_clamped=True, min_value=0, max_value=100, default_value=0, tag="opuspacketloss") dpg.add_input_int(label="Packet Loss", max_clamped=True, min_clamped=True, min_value=0, max_value=100, default_value=0, tag="opuspacketloss")
dpg.add_input_int(label="Auto Mono Threshold (Mid/Side Encoding)", min_value=0, max_value=-100, min_clamped=True, max_clamped=True, step_fast=1, default_value=-50, tag="opusmsautomonogate")
dpg.add_input_float(label="Intensity (Intensity Encoding)", max_clamped=True, min_clamped=True, min_value=0, max_value=10, default_value=1, tag="opusintensity")
dpg.add_checkbox(label="Prediction", tag="opusenapred") dpg.add_checkbox(label="Prediction", tag="opusenapred")
dpg.add_checkbox(label="DTX", tag="opusenadtx") dpg.add_checkbox(label="DTX", tag="opusenadtx")
dpg.add_checkbox(label="Auto Mono (Mid/Side Encoding)", tag="opusautomono") dpg.add_checkbox(label="Auto Mono (Mid/Side Encoding)", tag="opusenajoint")
dpg.add_button(label="Convert", callback=self.startconvert) dpg.add_button(label="Convert", callback=self.startconvert)
with dpg.window(label="converting", show=False, tag="convertingwindow", modal=True, no_resize=True, no_move=True, no_title_bar=True, width=320): with dpg.window(label="converting", show=False, tag="convertingwindow", modal=True, no_resize=True, no_move=True, no_title_bar=True, width=320):

View File

@ -15,7 +15,7 @@ def int16_to_float32(data_int16):
return data_float32 return data_float32
class DualOpusEncoder: class DualOpusEncoder:
def __init__(self, app="restricted_lowdelay", samplerate=48000, version="stable"): def __init__(self, app="audio", samplerate=48000, version="stable"):
""" """
----------------------------- version-------------------------- ----------------------------- version--------------------------
hev2: libopus 1.5.1 (fre:ac) hev2: libopus 1.5.1 (fre:ac)
@ -51,14 +51,8 @@ class DualOpusEncoder:
""" """
self.version = version self.version = version
self.samplerate = samplerate self.samplerate = samplerate
self.stereomode = 1 #0 = mono, 1 = Stereo LR, 2 = Stereo Mid/Side, 3 = Stereo Intensity self.stereomode = 1 #0 = mono, 1 = stereo LR, 2 = stereo Mid/Side
self.automonogate = -50 self.audiomono = False
self.automono = False
self.msmono = False
self.overallbitrate = 0
self.secbitrate = 0
self.intensity = 1
self.bitratemode = 1 # 0 = CBR, 1 = CVBR, 2 = VBR
os.environ["pyogg_win_libopus_version"] = version os.environ["pyogg_win_libopus_version"] = version
importlib.reload(pyogg.opus) importlib.reload(pyogg.opus)
@ -88,46 +82,18 @@ class DualOpusEncoder:
self.Lencoder.set_compresion_complex(level) self.Lencoder.set_compresion_complex(level)
self.Rencoder.set_compresion_complex(level) self.Rencoder.set_compresion_complex(level)
def set_bitrates(self, bitrates=64000, samebitrate=False, balance_percent=None): def set_bitrates(self, bitrates=64000, samebitrate=False):
""" """input birate unit: bps"""
input birate unit: bps
balance_percent is working good with M/S stereo
"""
if bitrates <= 5000: if bitrates <= 5000:
bitrates = 5000 bitrates = 5000
if balance_percent is None:
if self.stereomode == 0:
balance_percent = 100
elif self.stereomode == 2:
balance_percent = 75
else:
balance_percent = 50
self.overallbitrate = bitrates
if samebitrate: if samebitrate:
self.Lencoder.set_bitrates(int(bitrates)) bitperchannel = bitrates
self.Rencoder.set_bitrates(int(bitrates))
else: else:
percentage_decimal = balance_percent / 100 bitperchannel = bitrates / 2
bitratech1 = round(bitrates * percentage_decimal)
bitratech2 = bitrates - bitratech1
if bitratech1 < 2500: self.Lencoder.set_bitrates(int(bitperchannel))
bitratech1 = 2500 self.Rencoder.set_bitrates(int(bitperchannel))
if bitratech2 < 2500:
self.msmono = True
bitratech2 = 2500
else:
self.msmono = False
self.secbitrate = bitratech1
self.Lencoder.set_bitrates(int(bitratech1))
self.Rencoder.set_bitrates(int(bitratech2))
def set_bandwidth(self, bandwidth="fullband"): def set_bandwidth(self, bandwidth="fullband"):
""" """
@ -151,25 +117,19 @@ class DualOpusEncoder:
self.Lencoder.set_bandwidth(bandwidth) self.Lencoder.set_bandwidth(bandwidth)
self.Rencoder.set_bandwidth(bandwidth) self.Rencoder.set_bandwidth(bandwidth)
def set_stereo_mode(self, mode=1, automono=False, automonogate=-50, intensity=1, changebitratesbalance=False): def set_stereo_mode(self, mode=1, audiomono=False):
""" """
0 = mono (not recommend) 0 = mono
1 = stereo LR 1 = stereo LR
2 = stereo Mid/Side 2 = stereo Mid/Side (Joint encoding)
3 = Intensity
""" """
if mode > 2: if mode > 2:
mode = 1 mode = 1
self.stereomode = mode self.stereomode = mode
self.automono = automono self.audiomono = audiomono
self.automonogate = automonogate
self.intensity = intensity
if changebitratesbalance: def set_frame_size(self, size=60):
self.set_bitrates(self.overallbitrate)
def set_frame_size(self, size=60, nocheck=False):
""" Set the desired frame duration (in milliseconds). """ Set the desired frame duration (in milliseconds).
Valid options are 2.5, 5, 10, 20, 40, or 60ms. Valid options are 2.5, 5, 10, 20, 40, or 60ms.
Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms. Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms.
@ -179,8 +139,8 @@ class DualOpusEncoder:
if self.version != "hev2" and size > 60: if self.version != "hev2" and size > 60:
raise ValueError("non hev2 can't use framesize > 60") raise ValueError("non hev2 can't use framesize > 60")
self.Lencoder.set_frame_size(size, nocheck) self.Lencoder.set_frame_size(size)
self.Rencoder.set_frame_size(size, nocheck) self.Rencoder.set_frame_size(size)
return int((size / 1000) * self.samplerate) return int((size / 1000) * self.samplerate)
@ -196,14 +156,6 @@ class DualOpusEncoder:
"""VBR, CVBR, CBR """VBR, CVBR, CBR
VBR in 1.5.x replace by CVBR VBR in 1.5.x replace by CVBR
""" """
if mode.lower() == "cbr":
self.bitratemode = 0
elif mode.lower() == "cvbr":
self.bitratemode = 1
elif mode.lower() == "vbr":
self.bitratemode = 2
else:
raise ValueError(f"No {mode} bitrate mode option")
self.Lencoder.set_bitrate_mode(mode) self.Lencoder.set_bitrate_mode(mode)
self.Rencoder.set_bitrate_mode(mode) self.Rencoder.set_bitrate_mode(mode)
@ -265,35 +217,20 @@ class DualOpusEncoder:
mid = float32_to_int16(mid) mid = float32_to_int16(mid)
intside = float32_to_int16(side) intside = float32_to_int16(side)
midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes()
# check if side is no audio or loudness <= -50 DBFS # check if side is no audio or loudness <= -50 DBFS
try: try:
loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side)))) loudnessside = 20 * math.log10(np.sqrt(np.mean(np.square(side))))
except: except:
loudnessside = 0 loudnessside = 0
if (loudnessside) <= self.automonogate and self.automono or self.msmono: if (loudnessside) <= -50 and self.audiomono:
sideencoded_packet = b"\\xnl" sideencoded_packet = b"\\xnl"
if self.bitratemode == 0: # CBR
self.Lencoder.set_bitrates(int(self.overallbitrate - 300))
else: else:
self.Lencoder.set_bitrates(int(self.secbitrate))
sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes() sideencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(intside)), flush=True)[0][0].tobytes()
midencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(mid)), flush=True)[0][0].tobytes()
dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet) dual_encoded_packet = (midencoded_packet + b'\\x64\\x76' + sideencoded_packet)
elif self.stereomode == 3:
# stereo intensity (Joint encoding)
left_channel = pcm[:, 0]
right_channel = pcm[:, 1]
IRChannel = left_channel + self.intensity * (right_channel - left_channel)
Lencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes()
IRencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(IRChannel)), flush=True)[0][0].tobytes()
dual_encoded_packet = (Lencoded_packet + b'\\x64\\x77' + IRencoded_packet)
else: else:
# stereo LR # stereo LR
left_channel = pcm[::2] left_channel = pcm[::2]
@ -307,7 +244,7 @@ class DualOpusEncoder:
return dual_encoded_packet return dual_encoded_packet
class PSOpusEncoder: class PSOpusEncoder:
def __init__(self, app="restricted_lowdelay", samplerate=48000, version="stable"): def __init__(self, app="audio", samplerate=48000, version="stable"):
""" """
This version is xHE-Opus v2 (Parametric Stereo) This version is xHE-Opus v2 (Parametric Stereo)
----------------------------- version-------------------------- ----------------------------- version--------------------------
@ -505,7 +442,6 @@ class xOpusDecoder:
self.Rdecoder.set_sampling_frequency(sample_rate) self.Rdecoder.set_sampling_frequency(sample_rate)
self.__prev_pan = 0.0 self.__prev_pan = 0.0
self.__prev_max_amplitude = 0.0
def __smooth(self, value, prev_value, alpha=0.1): def __smooth(self, value, prev_value, alpha=0.1):
return alpha * value + (1 - alpha) * prev_value return alpha * value + (1 - alpha) * prev_value
@ -642,6 +578,13 @@ class xOpusDecoder:
shifted_signal = signal_complex * np.exp(1j * phase_shift) shifted_signal = signal_complex * np.exp(1j * phase_shift)
return shifted_signal.astype(np.int16) return shifted_signal.astype(np.int16)
def __butter_lowpass_filter_stereo(self, data, cutoff, fs, order=5):
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype='low', analog=False)
filtered_data = np.apply_along_axis(lambda x: filtfilt(b, a, x), axis=0, arr=data)
return filtered_data.astype(np.int16)
def __synthstereo(self, mono_signal, stereodata): def __synthstereo(self, mono_signal, stereodata):
pan = stereodata[2] pan = stereodata[2]
@ -673,9 +616,6 @@ class xOpusDecoder:
elif b"\\x64\\x75" in dualopusbytes: elif b"\\x64\\x75" in dualopusbytes:
mode = 1 mode = 1
xopusbytespilted = dualopusbytes.split(b'\\x64\\x75') xopusbytespilted = dualopusbytes.split(b'\\x64\\x75')
elif b"\\x64\\x77" in dualopusbytes:
mode = 4
xopusbytespilted = dualopusbytes.split(b'\\x64\\x77')
elif b"\\x21\\x75" in dualopusbytes: elif b"\\x21\\x75" in dualopusbytes:
mode = 3 # v2 mode = 3 # v2
xopusbytespilted = dualopusbytes.split(b'\\x21\\x75') xopusbytespilted = dualopusbytes.split(b'\\x21\\x75')
@ -688,6 +628,7 @@ class xOpusDecoder:
Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16) Mpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16)
stereo_signal = np.column_stack((Mpcm, Mpcm)) stereo_signal = np.column_stack((Mpcm, Mpcm))
elif mode == 2: elif mode == 2:
# stereo mid/side (Joint encoding) # stereo mid/side (Joint encoding)
Mencoded_packet = xopusbytespilted[0] Mencoded_packet = xopusbytespilted[0]
@ -708,32 +649,14 @@ class xOpusDecoder:
stereo_signal = np.column_stack((L, R)) stereo_signal = np.column_stack((L, R))
#max_amplitude = np.max(np.abs(stereo_signal)) max_amplitude = np.max(np.abs(stereo_signal))
if max_amplitude > 1.0:
#if max_amplitude > 1.0: stereo_signal /= max_amplitude
# stereo_signal /= max_amplitude
stereo_signal = np.clip(stereo_signal, -1, 1)
stereo_signal = float32_to_int16(stereo_signal) stereo_signal = float32_to_int16(stereo_signal)
else: else:
stereo_signal = np.column_stack((Mpcm, Mpcm)) stereo_signal = np.column_stack((Mpcm, Mpcm))
elif mode == 4:
# stereo intensity
Lencoded_packet = xopusbytespilted[0]
IRencoded_packet = xopusbytespilted[1]
decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet)))
decoded_intensity_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(IRencoded_packet)))
Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=np.int16)
IRpcm = np.frombuffer(decoded_intensity_right_channel_pcm, dtype=np.int16)
recovered_right = Lpcm + (IRpcm - Lpcm) / 1
stereo_signal = np.column_stack((Lpcm, recovered_right))
elif mode == 3: elif mode == 3:
# Parametric Stereo
Mencoded_packet = xopusbytespilted[0] Mencoded_packet = xopusbytespilted[0]
stereodatapacked = xopusbytespilted[1] stereodatapacked = xopusbytespilted[1]
@ -880,7 +803,6 @@ class XopusReader:
def __init__(self, file): def __init__(self, file):
self.file = open(file, 'rb') self.file = open(file, 'rb')
self.xopusline = self.file.read().split(b"\\xa") self.xopusline = self.file.read().split(b"\\xa")
self.lastframe = b""
def readmetadata(self): def readmetadata(self):
header = HeaderContainer.deserialize(self.xopusline[0]) header = HeaderContainer.deserialize(self.xopusline[0])
@ -906,12 +828,10 @@ class XopusReader:
break break
else: else:
try: try:
decodeddata = decoder.decode(data) yield decoder.decode(data)
self.lastframe = decodeddata
yield decodeddata
except Exception as e: except Exception as e:
#print(e) #print(e)
yield self.lastframe yield b""
else: else:
decodedlist = [] decodedlist = []
for data in self.xopusline[1:]: for data in self.xopusline[1:]:
@ -919,11 +839,9 @@ class XopusReader:
break break
else: else:
try: try:
decodeddata = decoder.decode(data) decodedlist.append(decoder.decode(data))
self.lastframe = decodeddata
decodedlist.append(self.lastframe)
except: except:
decodedlist.append(self.lastframe) decodedlist.append(b"")
return decodedlist return decodedlist
def close(self): def close(self):

View File

@ -56,7 +56,3 @@ or if you want only convert to wav you can use
```bash ```bash
$ python3 player.py input.xopus -o output.wav $ python3 player.py input.xopus -o output.wav
``` ```
## Encode with foobar2000
you can encode xopus with foobar by follow this setup
![image](https://github.com/damp11113/xHE-Opus/assets/64675096/3d285f77-3ac3-4fdf-9320-7de9df30e36c)

View File

@ -3,12 +3,10 @@ import pyaudio
import os import os
from libxheopus import DualOpusEncoder, xOpusDecoder from libxheopus import DualOpusEncoder, xOpusDecoder
encoder = DualOpusEncoder(samplerate=48000, version="hev2") encoder = DualOpusEncoder("restricted_lowdelay", 48000, "hev2")
encoder.set_stereo_mode(2) encoder.set_bitrates(24000)
encoder.set_bitrates(32000, balance_percent=75)
encoder.set_bitrate_mode("CVBR") encoder.set_bitrate_mode("CVBR")
encoder.set_bandwidth("fullband") encoder.set_bandwidth("fullband")
encoder.set_compression(10) encoder.set_compression(10)
desired_frame_size = encoder.set_frame_size(120) desired_frame_size = encoder.set_frame_size(120)
@ -24,7 +22,7 @@ for i in range(p.get_device_count()):
device_index_input = dev['index'] device_index_input = dev['index']
break break
device_name_output = "Speakers (2- USB AUDIO DEVICE)" device_name_output = "Speakers (2- USB Audio DAC )"
device_index_output = 0 device_index_output = 0
for i in range(p.get_device_count()): for i in range(p.get_device_count()):
dev = p.get_device_info_by_index(i) dev = p.get_device_info_by_index(i)
@ -32,46 +30,50 @@ for i in range(p.get_device_count()):
device_index_output = dev['index'] device_index_output = dev['index']
break break
def callback(in_data, frame_count, time_info, status): streaminput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, input=True, input_device_index=device_index_input)
pcm = np.frombuffer(in_data, dtype=np.int16) streamoutput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, output=True, output_device_index=device_index_output)
encoded_packets = encoder.encode(pcm) print(desired_frame_size)
print(len(pcm), "-encoded->", len(encoded_packets))
decoded_pcm = decoder.decode(encoded_packets)
# Check if the decoded PCM is empty or not
if len(decoded_pcm) > 0:
pcm_to_write = np.frombuffer(decoded_pcm, dtype=np.int16)
print(pcm_to_write)
return (pcm_to_write.astype(np.int16).tobytes(), pyaudio.paContinue)
else:
print("Decoded PCM is empty")
return (b"\x00", pyaudio.paContinue)
stream = p.open(format=pyaudio.paInt16, channels=2, rate=48000,
input=True, input_device_index=device_index_input,
output=True, output_device_index=device_index_output,
stream_callback=callback, frames_per_buffer=desired_frame_size)
stream.start_stream()
print("Streaming audio. Press Ctrl+C to stop.")
try: try:
while stream.is_active(): while True:
pass try:
except KeyboardInterrupt: pcm = np.frombuffer(streaminput.read(desired_frame_size, exception_on_overflow=False), dtype=np.int16)
print("Stopping stream...")
# Stop and close stream if len(pcm) == 0:
stream.stop_stream() # If PCM is empty, break the loop
stream.close() break
p.terminate()
encoded_packets = encoder.encode(pcm)
print(len(pcm), "-encoded->", len(encoded_packets))
# print(encoded_packet)
try:
decoded_pcm = decoder.decode(encoded_packets)
except Exception as e:
decoded_pcm = b""
# Check if the decoded PCM is empty or not
if len(decoded_pcm) > 0:
pcm_to_write = np.frombuffer(decoded_pcm, dtype=np.int16)
streamoutput.write(pcm_to_write.astype(np.int16).tobytes())
else:
print("Decoded PCM is empty")
except Exception as e:
print(e)
raise
except KeyboardInterrupt:
print("Interrupted by user")
finally:
# Clean up PyAudio streams and terminate PyAudio
streaminput.stop_stream()
streaminput.close()
streamoutput.stop_stream()
streamoutput.close()
p.terminate()