mirror of
https://github.com/damp11113/xHE-Opus.git
synced 2025-04-27 22:48:08 +00:00
v2
add writer and reader
This commit is contained in:
parent
3c2a058e19
commit
a6ef8f580f
18
encode.py
18
encode.py
@ -1,4 +1,4 @@
|
|||||||
from libxheopus import DualOpusEncoder, CustomFileContainer
|
from libxheopus import DualOpusEncoder, XopusWriter
|
||||||
import wave
|
import wave
|
||||||
|
|
||||||
encoder = DualOpusEncoder("restricted_lowdelay", version="hev2")
|
encoder = DualOpusEncoder("restricted_lowdelay", version="hev2")
|
||||||
@ -6,29 +6,21 @@ encoder.set_bitrates(12000)
|
|||||||
encoder.set_bitrate_mode("CVBR")
|
encoder.set_bitrate_mode("CVBR")
|
||||||
desired_frame_size = encoder.set_frame_size(120)
|
desired_frame_size = encoder.set_frame_size(120)
|
||||||
|
|
||||||
wav_file = wave.open(r"C:\Users\sansw\Desktop\The Weeknd - Blinding Lights (HD+).wav", 'rb')
|
wav_file = wave.open(r"test.wav", 'rb')
|
||||||
|
|
||||||
metadata = {"Format": "xHE-Opus", "loudness": 0} # Replace with your metadata
|
|
||||||
container = CustomFileContainer(b'OpuS', 1, metadata)
|
|
||||||
|
|
||||||
file = r"test.xopus"
|
file = r"test.xopus"
|
||||||
|
|
||||||
open(file, 'wb').write(b"") # clear
|
xopus = XopusWriter(file, encoder)
|
||||||
xopusfile = open(file, 'ab')
|
|
||||||
xopusfile.write(container.serialize() + b"\\xa")
|
|
||||||
|
|
||||||
# Read and process the WAV file in chunks
|
# Read and process the WAV file in chunks
|
||||||
print("encoding...")
|
print("encoding...")
|
||||||
while True:
|
while True:
|
||||||
frames = wav_file.readframes(desired_frame_size)
|
frames = wav_file.readframes(desired_frame_size)
|
||||||
|
|
||||||
encoded = encoder.encode(frames)
|
|
||||||
|
|
||||||
if not frames:
|
if not frames:
|
||||||
break # Break the loop when all frames have been read
|
break # Break the loop when all frames have been read
|
||||||
|
|
||||||
xopusfile.write(encoded + b"\\xa")
|
xopus.write(frames)
|
||||||
# Process the frames here, for example, print the number of bytes read
|
|
||||||
|
|
||||||
|
|
||||||
|
xopus.close()
|
||||||
print("encoded")
|
print("encoded")
|
126
libxheopus.py
126
libxheopus.py
@ -1,4 +1,5 @@
|
|||||||
import importlib
|
import importlib
|
||||||
|
import math
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
import pyogg
|
import pyogg
|
||||||
@ -142,9 +143,17 @@ class DualOpusEncoder:
|
|||||||
self.Rencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert))
|
self.Rencoder.CTL(pyogg.opus.OPUS_SET_PHASE_INVERSION_DISABLED_REQUEST, int(phaseinvert))
|
||||||
self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
|
self.Rencoder.CTL(pyogg.opus.OPUS_SET_DTX_REQUEST, int(DTX))
|
||||||
|
|
||||||
def encode(self, pcmbytes):
|
def encode(self, pcmbytes, directpcm=False):
|
||||||
"""input: pcm bytes accept float32/int16 only"""
|
"""input: pcm bytes accept float32/int16 only"""
|
||||||
pcm = np.frombuffer(pcmbytes, dtype=np.int16)
|
if directpcm:
|
||||||
|
if pcmbytes.dtype == np.float32:
|
||||||
|
pcm = (pcmbytes * 32767).astype(np.int16)
|
||||||
|
elif pcmbytes.dtype == np.int16:
|
||||||
|
pcm = pcmbytes.astype(np.int16)
|
||||||
|
else:
|
||||||
|
raise TypeError("accept only int16/float32")
|
||||||
|
else:
|
||||||
|
pcm = np.frombuffer(pcmbytes, dtype=np.int16)
|
||||||
|
|
||||||
left_channel = pcm[::2]
|
left_channel = pcm[::2]
|
||||||
right_channel = pcm[1::2]
|
right_channel = pcm[1::2]
|
||||||
@ -189,7 +198,7 @@ class DualOpusDecoder:
|
|||||||
|
|
||||||
return stereo_signal.astype(outputformat).tobytes()
|
return stereo_signal.astype(outputformat).tobytes()
|
||||||
|
|
||||||
class CustomFileContainer:
|
class HeaderContainer:
|
||||||
def __init__(self, capture_pattern, version, metadata):
|
def __init__(self, capture_pattern, version, metadata):
|
||||||
self.capture_pattern = capture_pattern
|
self.capture_pattern = capture_pattern
|
||||||
self.version = version
|
self.version = version
|
||||||
@ -226,4 +235,113 @@ class CustomFileContainer:
|
|||||||
value = struct.unpack(f'<{value_length}s', metadata_bytes[4:4+value_length])[0].decode('utf-8')
|
value = struct.unpack(f'<{value_length}s', metadata_bytes[4:4+value_length])[0].decode('utf-8')
|
||||||
metadata_bytes = metadata_bytes[4+value_length:]
|
metadata_bytes = metadata_bytes[4+value_length:]
|
||||||
metadata[key] = value
|
metadata[key] = value
|
||||||
return metadata
|
return metadata
|
||||||
|
|
||||||
|
class FooterContainer:
|
||||||
|
def __init__(self, loudness_avg, length):
|
||||||
|
self.loudness_avg = loudness_avg
|
||||||
|
self.length = length
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
metadata_bytes = self.serialize_metadata()
|
||||||
|
return metadata_bytes
|
||||||
|
|
||||||
|
def serialize_metadata(self):
|
||||||
|
metadata_bytes = b''
|
||||||
|
metadata_bytes += struct.pack('<f', self.loudness_avg)
|
||||||
|
metadata_bytes += struct.pack('<I', self.length)
|
||||||
|
return metadata_bytes
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def deserialize(cls, data):
|
||||||
|
loudness_avg, length = cls.deserialize_metadata(data)
|
||||||
|
return cls(loudness_avg, length)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def deserialize_metadata(metadata_bytes):
|
||||||
|
loudness_avg = struct.unpack('<f', metadata_bytes[:4])[0]
|
||||||
|
length = struct.unpack('<I', metadata_bytes[4:8])[0]
|
||||||
|
return loudness_avg, length
|
||||||
|
|
||||||
|
class XopusWriter:
|
||||||
|
def __init__(self, file, encoder: DualOpusEncoder, metadata={}):
|
||||||
|
self.file = file
|
||||||
|
self.encoder = encoder
|
||||||
|
|
||||||
|
systemmetadata = {
|
||||||
|
"format": "Xopus",
|
||||||
|
"audio": {
|
||||||
|
"encoder": "libxheopus",
|
||||||
|
"format": "xHE-Opus",
|
||||||
|
"format/info": "Extended High Efficiency Opus Audio Codec"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
open(file, 'wb').write(b"") # clear
|
||||||
|
self.xopusfile = open(file, 'ab')
|
||||||
|
self.xopusfile.write(HeaderContainer(b'OpuS', 1, metadata | systemmetadata).serialize() + b"\\xa")
|
||||||
|
|
||||||
|
self.loudnessperframe = []
|
||||||
|
self.length = 0
|
||||||
|
|
||||||
|
def write(self, pcmbytes):
|
||||||
|
pcm = np.frombuffer(pcmbytes, dtype=np.int16)
|
||||||
|
# Convert int16 audio data to floating-point values in range [-1, 1]
|
||||||
|
normalized_audio = pcm / 32767.0
|
||||||
|
|
||||||
|
# Calculate RMS value
|
||||||
|
rms = np.sqrt(np.mean(np.square(normalized_audio)))
|
||||||
|
|
||||||
|
# Calculate dBFS
|
||||||
|
dbfs = 20 * math.log10(rms)
|
||||||
|
self.loudnessperframe.append(dbfs)
|
||||||
|
|
||||||
|
encoded = self.encoder.encode(pcm, directpcm=True)
|
||||||
|
self.xopusfile.write(encoded + b"\\xa")
|
||||||
|
self.length += 1
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
loudnessavgs = sum(self.loudnessperframe) / len(self.loudnessperframe)
|
||||||
|
|
||||||
|
self.xopusfile.write(b"\\xeof\\xeof")
|
||||||
|
self.xopusfile.write(FooterContainer(loudnessavgs, self.length).serialize())
|
||||||
|
self.loudnessperframe = []
|
||||||
|
self.length = 0
|
||||||
|
|
||||||
|
class XopusReader:
|
||||||
|
def __init__(self, file):
|
||||||
|
file = open(file, 'rb')
|
||||||
|
self.xopusline = file.read().split(b"\\xa")
|
||||||
|
|
||||||
|
def readmetadata(self):
|
||||||
|
header = HeaderContainer.deserialize(self.xopusline[0])
|
||||||
|
|
||||||
|
if self.xopusline[-1].startswith(b"\\xeof\\xeof"):
|
||||||
|
footer = FooterContainer.deserialize(self.xopusline[-1].split(b"\\xeof\\xeof")[1])
|
||||||
|
else:
|
||||||
|
raise EOFError("can't find EOF")
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"header": header.metadata,
|
||||||
|
"footer": {
|
||||||
|
"contentloudness": footer.loudness_avg,
|
||||||
|
"length": footer.length
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
|
||||||
|
def decode(self, decoder, play=False):
|
||||||
|
if play:
|
||||||
|
for data in self.xopusline[1:]:
|
||||||
|
if data.startswith(b"\\xeof\\xeof"):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
yield decoder.decode(data)
|
||||||
|
else:
|
||||||
|
decodedlist = []
|
||||||
|
for data in self.xopusline[1:]:
|
||||||
|
if data.startswith(b"\\xeof\\xeof"):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
decodedlist.append(decoder.decode(data))
|
||||||
|
return decodedlist
|
15
player.py
15
player.py
@ -1,5 +1,5 @@
|
|||||||
import pyaudio
|
import pyaudio
|
||||||
from libxheopus import DualOpusDecoder, CustomFileContainer
|
from libxheopus import DualOpusDecoder, XopusReader
|
||||||
|
|
||||||
# Initialize PyAudio
|
# Initialize PyAudio
|
||||||
p = pyaudio.PyAudio()
|
p = pyaudio.PyAudio()
|
||||||
@ -8,18 +8,13 @@ decoder = DualOpusDecoder()
|
|||||||
|
|
||||||
streamoutput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, output=True)
|
streamoutput = p.open(format=pyaudio.paInt16, channels=2, rate=48000, output=True)
|
||||||
|
|
||||||
file = open(r"test.xopus", 'rb')
|
xopusdecoder = XopusReader(r"test.xopus")
|
||||||
|
|
||||||
line = file.read().split(b"\\xa")
|
print(xopusdecoder.readmetadata())
|
||||||
|
|
||||||
deserialized_container = CustomFileContainer.deserialize(line[0])
|
|
||||||
print(deserialized_container.metadata)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for data in line[1:]:
|
for data in xopusdecoder.decode(decoder, True):
|
||||||
if data:
|
streamoutput.write(data)
|
||||||
streamoutput.write(decoder.decode(data))
|
|
||||||
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("Interrupted by user")
|
print("Interrupted by user")
|
||||||
|
BIN
test.xopus
BIN
test.xopus
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user