mirror of
https://github.com/damp11113/xHE-Opus.git
synced 2025-07-01 05:30:36 +00:00
v1
This commit is contained in:
parent
1a30a5bc5d
commit
3c2a058e19
BIN
createnewformat.reg
Normal file
BIN
createnewformat.reg
Normal file
Binary file not shown.
34
encode.py
Normal file
34
encode.py
Normal file
@ -0,0 +1,34 @@
|
||||
from libxheopus import DualOpusEncoder, CustomFileContainer
|
||||
import wave
|
||||
|
||||
encoder = DualOpusEncoder("restricted_lowdelay", version="hev2")
|
||||
encoder.set_bitrates(12000)
|
||||
encoder.set_bitrate_mode("CVBR")
|
||||
desired_frame_size = encoder.set_frame_size(120)
|
||||
|
||||
wav_file = wave.open(r"C:\Users\sansw\Desktop\The Weeknd - Blinding Lights (HD+).wav", 'rb')
|
||||
|
||||
metadata = {"Format": "xHE-Opus", "loudness": 0} # Replace with your metadata
|
||||
container = CustomFileContainer(b'OpuS', 1, metadata)
|
||||
|
||||
file = r"test.xopus"
|
||||
|
||||
open(file, 'wb').write(b"") # clear
|
||||
xopusfile = open(file, 'ab')
|
||||
xopusfile.write(container.serialize() + b"\\xa")
|
||||
|
||||
# Read and process the WAV file in chunks
|
||||
print("encoding...")
|
||||
while True:
|
||||
frames = wav_file.readframes(desired_frame_size)
|
||||
|
||||
encoded = encoder.encode(frames)
|
||||
|
||||
if not frames:
|
||||
break # Break the loop when all frames have been read
|
||||
|
||||
xopusfile.write(encoded + b"\\xa")
|
||||
# Process the frames here, for example, print the number of bytes read
|
||||
|
||||
|
||||
print("encoded")
|
229
libxheopus.py
Normal file
229
libxheopus.py
Normal file
@ -0,0 +1,229 @@
|
||||
import importlib
|
||||
import struct
|
||||
|
||||
import pyogg
|
||||
import os
|
||||
import numpy as np
|
||||
|
||||
class DualOpusEncoder:
|
||||
def __init__(self, app="audio", samplerate=48000, version="stable"):
|
||||
"""
|
||||
----------------------------- version--------------------------
|
||||
hev2: libopus 1.5.1 (fre:ac)
|
||||
he: libopus 1.5.2 (moded)
|
||||
exper: libopus 1.5.1
|
||||
stable: libopus 1.4
|
||||
old: libopus 1.3.1
|
||||
custom: custom opus path you can use "pyogg_win_libopus_custom_path" env to change opus version (windows only)
|
||||
------------------------- App----------------------------------
|
||||
|
||||
Set the encoding mode.
|
||||
|
||||
This must be one of 'voip', 'audio', or 'restricted_lowdelay'.
|
||||
|
||||
'voip': Gives best quality at a given bitrate for voice
|
||||
signals. It enhances the input signal by high-pass
|
||||
filtering and emphasizing formants and
|
||||
harmonics. Optionally it includes in-band forward error
|
||||
correction to protect against packet loss. Use this mode
|
||||
for typical VoIP applications. Because of the enhancement,
|
||||
even at high bitrates the output may sound different from
|
||||
the input.
|
||||
|
||||
'audio': Gives best quality at a given bitrate for most
|
||||
non-voice signals like music. Use this mode for music and
|
||||
mixed (music/voice) content, broadcast, and applications
|
||||
requiring less than 15 ms of coding delay.
|
||||
|
||||
'restricted_lowdelay': configures low-delay mode that
|
||||
disables the speech-optimized mode in exchange for
|
||||
slightly reduced delay. This mode can only be set on an
|
||||
newly initialized encoder because it changes the codec
|
||||
delay.
|
||||
"""
|
||||
self.version = version
|
||||
self.samplerate = samplerate
|
||||
os.environ["pyogg_win_libopus_version"] = version
|
||||
importlib.reload(pyogg.opus)
|
||||
|
||||
self.Lencoder = pyogg.OpusBufferedEncoder()
|
||||
self.Rencoder = pyogg.OpusBufferedEncoder()
|
||||
|
||||
self.Lencoder.set_application(app)
|
||||
self.Rencoder.set_application(app)
|
||||
|
||||
self.Lencoder.set_sampling_frequency(samplerate)
|
||||
self.Rencoder.set_sampling_frequency(samplerate)
|
||||
|
||||
self.Lencoder.set_channels(1)
|
||||
self.Rencoder.set_channels(1)
|
||||
|
||||
self.set_frame_size()
|
||||
self.set_compression()
|
||||
self.set_feature()
|
||||
self.set_bitrate_mode()
|
||||
self.set_bitrates()
|
||||
self.set_bandwidth()
|
||||
self.set_packet_loss()
|
||||
|
||||
def set_compression(self, level=10):
|
||||
"""complex 0-10 low-hires"""
|
||||
self.Lencoder.set_compresion_complex(level)
|
||||
self.Rencoder.set_compresion_complex(level)
|
||||
|
||||
def set_bitrates(self, bitrates=64000, samebitrate=False):
|
||||
"""input birate unit: bps"""
|
||||
if bitrates <= 5000:
|
||||
bitrates = 5000
|
||||
|
||||
if samebitrate:
|
||||
bitperchannel = bitrates
|
||||
else:
|
||||
bitperchannel = bitrates / 2
|
||||
|
||||
self.Lencoder.set_bitrates(int(bitperchannel))
|
||||
self.Rencoder.set_bitrates(int(bitperchannel))
|
||||
|
||||
def set_bandwidth(self, bandwidth="fullband"):
|
||||
"""
|
||||
narrowband:
|
||||
Narrowband typically refers to a limited range of frequencies suitable for voice communication.
|
||||
mediumband (unsupported in libopus 1.3+):
|
||||
Mediumband extends the frequency range compared to narrowband, providing better audio quality.
|
||||
wideband:
|
||||
Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband.
|
||||
superwideband:
|
||||
Superwideband extends the frequency range beyond wideband, further enhancing audio quality.
|
||||
fullband (default):
|
||||
Fullband provides the widest frequency range among the listed options, offering the highest audio quality.
|
||||
auto: opus is working auto not force
|
||||
"""
|
||||
self.Lencoder.set_bandwidth(bandwidth)
|
||||
self.Rencoder.set_bandwidth(bandwidth)
|
||||
|
||||
def set_frame_size(self, size=60):
|
||||
""" Set the desired frame duration (in milliseconds).
|
||||
Valid options are 2.5, 5, 10, 20, 40, or 60ms.
|
||||
Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms.
|
||||
|
||||
@return chunk size
|
||||
"""
|
||||
|
||||
if self.version != "hev2" and size > 60:
|
||||
raise ValueError("non hev2 can't use framesize > 60")
|
||||
|
||||
self.Lencoder.set_frame_size(size)
|
||||
self.Rencoder.set_frame_size(size)
|
||||
|
||||
return int((size / 1000) * self.samplerate)
|
||||
|
||||
def set_packet_loss(self, loss=0):
|
||||
"""input: % percent"""
|
||||
if loss > 100:
|
||||
raise ValueError("percent must <=100")
|
||||
|
||||
self.Lencoder.set_packets_loss(loss)
|
||||
self.Rencoder.set_packets_loss(loss)
|
||||
|
||||
def set_bitrate_mode(self, mode="CVBR"):
|
||||
"""VBR, CVBR, CBR
|
||||
VBR in 1.5.x replace by CVBR
|
||||
"""
|
||||
|
||||
self.Lencoder.set_bitrate_mode(mode)
|
||||
self.Rencoder.set_bitrate_mode(mode)
|
||||
|
||||
def set_feature(self, prediction=False, phaseinvert=False, DTX=False):
|
||||
self.Lencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction))
|
||||
self.Lencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert))
|
||||
self.Lencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
|
||||
|
||||
self.Rencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction))
|
||||
self.Rencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert))
|
||||
self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
|
||||
|
||||
def encode(self, pcmbytes):
|
||||
"""input: pcm bytes accept float32/int16 only"""
|
||||
pcm = np.frombuffer(pcmbytes, dtype=np.int16)
|
||||
|
||||
left_channel = pcm[::2]
|
||||
right_channel = pcm[1::2]
|
||||
|
||||
Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes()
|
||||
Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][
|
||||
0].tobytes()
|
||||
|
||||
dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet)
|
||||
|
||||
return dual_encoded_packet
|
||||
|
||||
|
||||
class DualOpusDecoder:
|
||||
def __init__(self, sample_rate=48000):
|
||||
self.Ldecoder = pyogg.OpusDecoder()
|
||||
self.Rdecoder = pyogg.OpusDecoder()
|
||||
|
||||
self.Ldecoder.set_channels(1)
|
||||
self.Rdecoder.set_channels(1)
|
||||
|
||||
self.Ldecoder.set_sampling_frequency(sample_rate)
|
||||
self.Rdecoder.set_sampling_frequency(sample_rate)
|
||||
|
||||
def decode(self, dualopusbytes: bytes, outputformat=np.int16):
|
||||
try:
|
||||
dualopusbytespilted = dualopusbytes.split(b'\\x64\\x75')
|
||||
Lencoded_packet = dualopusbytespilted[0]
|
||||
Rencoded_packet = dualopusbytespilted[1]
|
||||
except:
|
||||
raise TypeError("this is not dual opus")
|
||||
|
||||
decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet)))
|
||||
decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet)))
|
||||
|
||||
Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=outputformat)
|
||||
Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=outputformat)
|
||||
|
||||
stereo_signal = np.empty((len(Lpcm), 2), dtype=Lpcm.dtype)
|
||||
stereo_signal[:, 0] = Lpcm
|
||||
stereo_signal[:, 1] = Rpcm
|
||||
|
||||
return stereo_signal.astype(outputformat).tobytes()
|
||||
|
||||
class CustomFileContainer:
|
||||
def __init__(self, capture_pattern, version, metadata):
|
||||
self.capture_pattern = capture_pattern
|
||||
self.version = version
|
||||
self.metadata = metadata
|
||||
|
||||
def serialize(self):
|
||||
header = struct.pack('<4sB', self.capture_pattern, self.version)
|
||||
metadata_bytes = self.serialize_metadata()
|
||||
return header + metadata_bytes
|
||||
|
||||
def serialize_metadata(self):
|
||||
metadata_bytes = b''
|
||||
for key, value in self.metadata.items():
|
||||
key_bytes = key.encode('utf-8')
|
||||
value_bytes = value.encode('utf-8') if isinstance(value, str) else str(value).encode('utf-8')
|
||||
metadata_bytes += struct.pack(f'<I{len(key_bytes)}sI{len(value_bytes)}s', len(key_bytes), key_bytes, len(value_bytes), value_bytes)
|
||||
return metadata_bytes
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, data):
|
||||
capture_pattern, version = struct.unpack_from('<4sB', data)
|
||||
metadata_start = struct.calcsize('<4sB')
|
||||
metadata = cls.deserialize_metadata(data[metadata_start:])
|
||||
return cls(capture_pattern, version, metadata)
|
||||
|
||||
@staticmethod
|
||||
def deserialize_metadata(metadata_bytes):
|
||||
metadata = {}
|
||||
while metadata_bytes:
|
||||
key_length = struct.unpack('<I', metadata_bytes[:4])[0]
|
||||
key = struct.unpack(f'<{key_length}s', metadata_bytes[4:4+key_length])[0].decode('utf-8')
|
||||
metadata_bytes = metadata_bytes[4+key_length:]
|
||||
value_length = struct.unpack('<I', metadata_bytes[:4])[0]
|
||||
value = struct.unpack(f'<{value_length}s', metadata_bytes[4:4+value_length])[0].decode('utf-8')
|
||||
metadata_bytes = metadata_bytes[4+value_length:]
|
||||
metadata[key] = value
|
||||
return metadata
|
30
player.py
Normal file
30
player.py
Normal file
@ -0,0 +1,30 @@
|
||||
import pyaudio
|
||||
from libxheopus import DualOpusDecoder, CustomFileContainer
|
||||
|
||||
# Initialize PyAudio
|
||||
p = pyaudio.PyAudio()
|
||||
|
||||
decoder = DualOpusDecoder()
|
||||
|
||||
streamoutput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, output=True)
|
||||
|
||||
file = open(r"test.xopus", 'rb')
|
||||
|
||||
line = file.read().split(b"\\xa")
|
||||
|
||||
deserialized_container = CustomFileContainer.deserialize(line[0])
|
||||
print(deserialized_container.metadata)
|
||||
|
||||
try:
|
||||
for data in line[1:]:
|
||||
if data:
|
||||
streamoutput.write(decoder.decode(data))
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("Interrupted by user")
|
||||
finally:
|
||||
# Clean up PyAudio streams and terminate PyAudio
|
||||
streamoutput.stop_stream()
|
||||
streamoutput.close()
|
||||
p.terminate()
|
BIN
test.xopus
Normal file
BIN
test.xopus
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user