diff --git a/createnewformat.reg b/createnewformat.reg new file mode 100644 index 0000000..472d3b1 Binary files /dev/null and b/createnewformat.reg differ diff --git a/encode.py b/encode.py new file mode 100644 index 0000000..3910f59 --- /dev/null +++ b/encode.py @@ -0,0 +1,34 @@ +from libxheopus import DualOpusEncoder, CustomFileContainer +import wave + +encoder = DualOpusEncoder("restricted_lowdelay", version="hev2") +encoder.set_bitrates(12000) +encoder.set_bitrate_mode("CVBR") +desired_frame_size = encoder.set_frame_size(120) + +wav_file = wave.open(r"C:\Users\sansw\Desktop\The Weeknd - Blinding Lights (HD+).wav", 'rb') + +metadata = {"Format": "xHE-Opus", "loudness": 0} # Replace with your metadata +container = CustomFileContainer(b'OpuS', 1, metadata) + +file = r"test.xopus" + +open(file, 'wb').write(b"") # clear +xopusfile = open(file, 'ab') +xopusfile.write(container.serialize() + b"\\xa") + +# Read and process the WAV file in chunks +print("encoding...") +while True: + frames = wav_file.readframes(desired_frame_size) + + encoded = encoder.encode(frames) + + if not frames: + break # Break the loop when all frames have been read + + xopusfile.write(encoded + b"\\xa") + # Process the frames here, for example, print the number of bytes read + + +print("encoded") \ No newline at end of file diff --git a/libxheopus.py b/libxheopus.py new file mode 100644 index 0000000..e645e12 --- /dev/null +++ b/libxheopus.py @@ -0,0 +1,229 @@ +import importlib +import struct + +import pyogg +import os +import numpy as np + +class DualOpusEncoder: + def __init__(self, app="audio", samplerate=48000, version="stable"): + """ + ----------------------------- version-------------------------- + hev2: libopus 1.5.1 (fre:ac) + he: libopus 1.5.2 (moded) + exper: libopus 1.5.1 + stable: libopus 1.4 + old: libopus 1.3.1 + custom: custom opus path you can use "pyogg_win_libopus_custom_path" env to change opus version (windows only) + ------------------------- App---------------------------------- + + Set the encoding mode. + + This must be one of 'voip', 'audio', or 'restricted_lowdelay'. + + 'voip': Gives best quality at a given bitrate for voice + signals. It enhances the input signal by high-pass + filtering and emphasizing formants and + harmonics. Optionally it includes in-band forward error + correction to protect against packet loss. Use this mode + for typical VoIP applications. Because of the enhancement, + even at high bitrates the output may sound different from + the input. + + 'audio': Gives best quality at a given bitrate for most + non-voice signals like music. Use this mode for music and + mixed (music/voice) content, broadcast, and applications + requiring less than 15 ms of coding delay. + + 'restricted_lowdelay': configures low-delay mode that + disables the speech-optimized mode in exchange for + slightly reduced delay. This mode can only be set on an + newly initialized encoder because it changes the codec + delay. + """ + self.version = version + self.samplerate = samplerate + os.environ["pyogg_win_libopus_version"] = version + importlib.reload(pyogg.opus) + + self.Lencoder = pyogg.OpusBufferedEncoder() + self.Rencoder = pyogg.OpusBufferedEncoder() + + self.Lencoder.set_application(app) + self.Rencoder.set_application(app) + + self.Lencoder.set_sampling_frequency(samplerate) + self.Rencoder.set_sampling_frequency(samplerate) + + self.Lencoder.set_channels(1) + self.Rencoder.set_channels(1) + + self.set_frame_size() + self.set_compression() + self.set_feature() + self.set_bitrate_mode() + self.set_bitrates() + self.set_bandwidth() + self.set_packet_loss() + + def set_compression(self, level=10): + """complex 0-10 low-hires""" + self.Lencoder.set_compresion_complex(level) + self.Rencoder.set_compresion_complex(level) + + def set_bitrates(self, bitrates=64000, samebitrate=False): + """input birate unit: bps""" + if bitrates <= 5000: + bitrates = 5000 + + if samebitrate: + bitperchannel = bitrates + else: + bitperchannel = bitrates / 2 + + self.Lencoder.set_bitrates(int(bitperchannel)) + self.Rencoder.set_bitrates(int(bitperchannel)) + + def set_bandwidth(self, bandwidth="fullband"): + """ + narrowband: + Narrowband typically refers to a limited range of frequencies suitable for voice communication. + mediumband (unsupported in libopus 1.3+): + Mediumband extends the frequency range compared to narrowband, providing better audio quality. + wideband: + Wideband offers an even broader frequency range, resulting in higher audio fidelity compared to narrowband and mediumband. + superwideband: + Superwideband extends the frequency range beyond wideband, further enhancing audio quality. + fullband (default): + Fullband provides the widest frequency range among the listed options, offering the highest audio quality. + auto: opus is working auto not force + """ + self.Lencoder.set_bandwidth(bandwidth) + self.Rencoder.set_bandwidth(bandwidth) + + def set_frame_size(self, size=60): + """ Set the desired frame duration (in milliseconds). + Valid options are 2.5, 5, 10, 20, 40, or 60ms. + Exclusive for HE opus v2 (freac opus) 80, 100 or 120ms. + + @return chunk size + """ + + if self.version != "hev2" and size > 60: + raise ValueError("non hev2 can't use framesize > 60") + + self.Lencoder.set_frame_size(size) + self.Rencoder.set_frame_size(size) + + return int((size / 1000) * self.samplerate) + + def set_packet_loss(self, loss=0): + """input: % percent""" + if loss > 100: + raise ValueError("percent must <=100") + + self.Lencoder.set_packets_loss(loss) + self.Rencoder.set_packets_loss(loss) + + def set_bitrate_mode(self, mode="CVBR"): + """VBR, CVBR, CBR + VBR in 1.5.x replace by CVBR + """ + + self.Lencoder.set_bitrate_mode(mode) + self.Rencoder.set_bitrate_mode(mode) + + def set_feature(self, prediction=False, phaseinvert=False, DTX=False): + self.Lencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction)) + self.Lencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert)) + self.Lencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) + + self.Rencoder.CTL(pyogg.opus.OPUS_SET_PREDICTION_DISABLED_REQUEST, int(prediction)) + self.Rencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert)) + self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX)) + + def encode(self, pcmbytes): + """input: pcm bytes accept float32/int16 only""" + pcm = np.frombuffer(pcmbytes, dtype=np.int16) + + left_channel = pcm[::2] + right_channel = pcm[1::2] + + Lencoded_packet = self.Lencoder.buffered_encode(memoryview(bytearray(left_channel)), flush=True)[0][0].tobytes() + Rencoded_packet = self.Rencoder.buffered_encode(memoryview(bytearray(right_channel)), flush=True)[0][ + 0].tobytes() + + dual_encoded_packet = (Lencoded_packet + b'\\x64\\x75' + Rencoded_packet) + + return dual_encoded_packet + + +class DualOpusDecoder: + def __init__(self, sample_rate=48000): + self.Ldecoder = pyogg.OpusDecoder() + self.Rdecoder = pyogg.OpusDecoder() + + self.Ldecoder.set_channels(1) + self.Rdecoder.set_channels(1) + + self.Ldecoder.set_sampling_frequency(sample_rate) + self.Rdecoder.set_sampling_frequency(sample_rate) + + def decode(self, dualopusbytes: bytes, outputformat=np.int16): + try: + dualopusbytespilted = dualopusbytes.split(b'\\x64\\x75') + Lencoded_packet = dualopusbytespilted[0] + Rencoded_packet = dualopusbytespilted[1] + except: + raise TypeError("this is not dual opus") + + decoded_left_channel_pcm = self.Ldecoder.decode(memoryview(bytearray(Lencoded_packet))) + decoded_right_channel_pcm = self.Rdecoder.decode(memoryview(bytearray(Rencoded_packet))) + + Lpcm = np.frombuffer(decoded_left_channel_pcm, dtype=outputformat) + Rpcm = np.frombuffer(decoded_right_channel_pcm, dtype=outputformat) + + stereo_signal = np.empty((len(Lpcm), 2), dtype=Lpcm.dtype) + stereo_signal[:, 0] = Lpcm + stereo_signal[:, 1] = Rpcm + + return stereo_signal.astype(outputformat).tobytes() + +class CustomFileContainer: + def __init__(self, capture_pattern, version, metadata): + self.capture_pattern = capture_pattern + self.version = version + self.metadata = metadata + + def serialize(self): + header = struct.pack('<4sB', self.capture_pattern, self.version) + metadata_bytes = self.serialize_metadata() + return header + metadata_bytes + + def serialize_metadata(self): + metadata_bytes = b'' + for key, value in self.metadata.items(): + key_bytes = key.encode('utf-8') + value_bytes = value.encode('utf-8') if isinstance(value, str) else str(value).encode('utf-8') + metadata_bytes += struct.pack(f'