ZMQ update

This commit is contained in:
dharm pimsen 2024-01-19 16:07:01 +07:00
parent 0e55b036ec
commit 291ecb6622
4 changed files with 252 additions and 183 deletions

View File

@ -7,6 +7,7 @@ import socket
import numpy as np import numpy as np
import pickle import pickle
import pyaudio import pyaudio
import zmq
from pyogg import OpusDecoder from pyogg import OpusDecoder
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Protocol.KDF import scrypt from Crypto.Protocol.KDF import scrypt
@ -92,18 +93,27 @@ class App:
self.ccisdecrypt = None self.ccisdecrypt = None
self.ccisdecryptpassword = None self.ccisdecryptpassword = None
self.paudio = pyaudio.PyAudio() self.paudio = pyaudio.PyAudio()
self.cprotocol = None
def connecttoserver(self, sender, data): def connecttoserver(self, sender, data):
dpg.configure_item("connectservergroup", show=False) dpg.configure_item("connectservergroup", show=False)
#protocol = dpg.get_value("serverprotocol") protocol = dpg.get_value("serverprotocol")
self.cprotocol = protocol
dpg.configure_item("serverstatus", default_value='connecting...', color=(255, 255, 0)) dpg.configure_item("serverstatus", default_value='connecting...', color=(255, 255, 0))
ip = dpg.get_value("serverip") ip = dpg.get_value("serverip")
port = dpg.get_value("serverport") port = dpg.get_value("serverport")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if protocol == "TCP":
try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, port)) try:
except: s.connect((ip, port))
dpg.configure_item("connectbutton", show=True) except:
dpg.configure_item("connectbutton", show=True)
elif protocol == "ZeroMQ":
context = zmq.Context()
s = context.socket(zmq.SUB)
s.connect(f"tcp://{ip}:{port}")
s.setsockopt_string(zmq.SUBSCRIBE, "")
self.working = True self.working = True
self.device_index_output = 0 self.device_index_output = 0
@ -196,14 +206,19 @@ class App:
while True: while True:
try: try:
if self.working: if self.working:
data = b'' if self.cprotocol == "TCP":
#data = socket.recv(1580152) data = b''
while True: #data = socket.recv(1580152)
part = socket.recv(1024) while True:
data += part part = socket.recv(1024)
if len(part) < 1024: data += part
# either 0 or end of data if len(part) < 1024:
break # either 0 or end of data
break
elif self.cprotocol == "ZeroMQ":
data = socket.recv()
else:
data = b""
bytesconunt += len(data) bytesconunt += len(data)
@ -363,11 +378,11 @@ class App:
dpg.add_spacer() dpg.add_spacer()
dpg.add_image("station_logo", show=False, tag="station_logo_config") dpg.add_image("station_logo", show=False, tag="station_logo_config")
dpg.add_text("", tag="RDSinfo", show=False) dpg.add_text("", tag="RDSinfo", show=False)
with dpg.child_window(tag="connectservergroup", label="Server", use_internal_label=True, height=105): with dpg.child_window(tag="connectservergroup", label="Server", use_internal_label=True, height=130):
dpg.add_button(label="select server", tag="selectserverbutton") dpg.add_button(label="select server", tag="selectserverbutton")
dpg.add_input_text(label="server ip", tag="serverip", default_value="localhost") dpg.add_input_text(label="server ip", tag="serverip", default_value="localhost")
dpg.add_input_int(label="port", tag="serverport", max_value=65535, default_value=6980) dpg.add_input_int(label="port", tag="serverport", max_value=65535, default_value=6980)
#dpg.add_combo(["TCP", "Websocket"], label="protocol", tag="serverprotocol", default_value="TCP") dpg.add_combo(["TCP", "ZeroMQ"], label="protocol", tag="serverprotocol", default_value="TCP")
dpg.add_button(label="connect", callback=self.connecttoserver, tag="connectbutton") dpg.add_button(label="connect", callback=self.connecttoserver, tag="connectbutton")
dpg.add_spacer() dpg.add_spacer()
dpg.add_button(label="More RDS info", callback=lambda: dpg.configure_item("RDSwindow", show=True), tag="morerdsbutton", show=False) dpg.add_button(label="More RDS info", callback=lambda: dpg.configure_item("RDSwindow", show=True), tag="morerdsbutton", show=False)

129
Server/RDS.py Normal file
View File

@ -0,0 +1,129 @@
import time
from datetime import datetime
import cv2
import numpy as np
from damp11113 import scrollTextBySteps
def encodelogoimage(path, quality=50):
image = cv2.resize(cv2.imread(path), (128, 128))
# Encode the image as JPEG with higher compression (lower quality)
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] # Adjust quality (50 is just an example)
result, encoded_image = cv2.imencode('.jpg', image, encode_param)
encoded_bytes = np.array(encoded_image).tobytes()
return encoded_bytes
RDS = {
"PS": "DPRadio",
"RT": "Testing internet radio",
"PI": 0x27C8, # static
"PTY": 0,
"PTY+": "Testing",
"Country": "TH",
"Coverage": "All",
"CT": {
"Local": None,
"UTC": None,
},
"PIN": 12345,
"TMC": {
"TP": False,
"TA": False,
"Messages": None
},
"ECC": None,
"LIC": None,
"AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF
"ArtificialHead": False,
"Compressed": False,
"DyPTY": False,
"EPG": None,
"AS": [ # AS = Alternative Server
# can add more server here
],
"EON": [
# can add more here
],
"ContentInfo": {
"Codec": "opus",
"bitrate": 64000,
"channel": 2,
"samplerates": 48000
},
"images": {
"logo": encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png")
}
}
RDS2 = {
"PS": "DPTest",
"RT": "Testing internet radio",
"PI": 0x27C6,
"PTY": 0,
"PTY+": "Testing",
"Country": "TH",
"Coverage": "All",
"CT": {
"Local": None,
"UTC": None,
},
"PIN": 12345,
"TMC": {
"TP": False,
"TA": False,
"Messages": None
},
"ECC": None,
"LIC": None,
"AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF
"ArtificialHead": False,
"Compressed": False,
"DyPTY": False,
"EPG": None,
"AS": [ # AS = Alternative Server
# can add more server here
],
"EON": [
# can add more server here
],
"ContentInfo": {
"Codec": "Opus",
"bitrate": 8000,
"channel": 2,
"samplerates": 48000
},
"images": {
"logo": None
}
}
def update_RDS():
global RDS
while True:
pstext = "DPRadio Testing Broadcasting "
for i in range(0, len(pstext)):
RDS["PS"] = scrollTextBySteps(pstext, i)
time.sleep(1)
def update_RDS_time():
global RDS
while True:
RDS["CT"]["Local"] = datetime.now().timestamp()
RDS["CT"]["UTC"] = datetime.utcnow().timestamp()
RDS2["CT"]["Local"] = datetime.now().timestamp()
RDS2["CT"]["UTC"] = datetime.utcnow().timestamp()
time.sleep(1)
def update_RDS_images():
global RDS
while True:
RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png", 25)
time.sleep(10)
RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\140702_hi-res-logo.jpg", 25)
time.sleep(10)
RDS["images"]["logo"] = encodelogoimage(r"IDRBfavicon.jpg", 25)
time.sleep(10)

Binary file not shown.

View File

@ -5,13 +5,12 @@ from pyogg import OpusBufferedEncoder
import numpy as np import numpy as np
import pickle import pickle
import threading import threading
from damp11113 import scrollTextBySteps import RDS as _RDS
from queue import Queue from queue import Queue
from datetime import datetime, timezone
import cv2
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Protocol.KDF import scrypt from Crypto.Protocol.KDF import scrypt
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
import zmq
def pad_message(message_bytes): def pad_message(message_bytes):
block_size = AES.block_size block_size = AES.block_size
@ -40,13 +39,22 @@ def encrypt_data(message_bytes, password):
return encrypted_message, salt, iv return encrypted_message, salt, iv
# create tcp protocol = "ZMQ"
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_port = ('*', 6980)
# wait for connection
server_port = ('localhost', 6980)
s.bind(server_port)
s.listen(1) if protocol == "TCP":
# create tcp
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# wait for connection
s.bind(server_port)
s.listen(1)
elif protocol == "ZMQ":
context = zmq.Context()
s = context.socket(zmq.PUB)
s.bind(f"tcp://{server_port[0]}:{server_port[1]}")
else:
print(f"{protocol} not supported")
exit()
p = pyaudio.PyAudio() p = pyaudio.PyAudio()
@ -74,135 +82,13 @@ streaminput = p.open(format=pyaudio.paInt16, channels=2, rate=sample_rate, input
streaminput2 = p.open(format=pyaudio.paInt16, channels=2, rate=sample_rate, input=True, input_device_index=device_index_input2) streaminput2 = p.open(format=pyaudio.paInt16, channels=2, rate=sample_rate, input=True, input_device_index=device_index_input2)
def encodelogoimage(path, quality=50): thread = threading.Thread(target=_RDS.update_RDS)
image = cv2.resize(cv2.imread(path), (128, 128))
# Encode the image as JPEG with higher compression (lower quality)
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] # Adjust quality (50 is just an example)
result, encoded_image = cv2.imencode('.jpg', image, encode_param)
encoded_bytes = np.array(encoded_image).tobytes()
return encoded_bytes
RDS = {
"PS": "DPRadio",
"RT": "Testing internet radio",
"PI": 0x27C8, # static
"PTY": 0,
"PTY+": "Testing",
"Country": "TH",
"Coverage": "All",
"CT": {
"Local": None,
"UTC": None,
},
"PIN": 12345,
"TMC": {
"TP": False,
"TA": False,
"Messages": None
},
"ECC": None,
"LIC": None,
"AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF
"ArtificialHead": False,
"Compressed": False,
"DyPTY": False,
"EPG": None,
"AS": [ # AS = Alternative Server
# can add more server here
],
"EON": [
# can add more here
],
"ContentInfo": {
"Codec": "opus",
"bitrate": 64000,
"channel": 2,
"samplerates": sample_rate
},
"images": {
"logo": encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png")
}
}
RDS2 = {
"PS": "DPTest",
"RT": "Testing internet radio",
"PI": 0x27C6,
"PTY": 0,
"PTY+": "Testing",
"Country": "TH",
"Coverage": "All",
"CT": {
"Local": None,
"UTC": None,
},
"PIN": 12345,
"TMC": {
"TP": False,
"TA": False,
"Messages": None
},
"ECC": None,
"LIC": None,
"AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF
"ArtificialHead": False,
"Compressed": False,
"DyPTY": False,
"EPG": None,
"AS": [ # AS = Alternative Server
# can add more server here
],
"EON": [
# can add more server here
],
"ContentInfo": {
"Codec": "Opus",
"bitrate": 8000,
"channel": 2,
"samplerates": sample_rate
},
"images": {
"logo": None
}
}
lock = threading.Lock()
def update_RDS():
global RDS
with lock:
while True:
pstext = "DPRadio Testing Broadcasting "
for i in range(0, len(pstext)):
RDS["PS"] = scrollTextBySteps(pstext, i)
time.sleep(1)
def update_RDS_time():
global RDS
while True:
RDS["CT"]["Local"] = datetime.now().timestamp()
RDS["CT"]["UTC"] = datetime.utcnow().timestamp()
time.sleep(1)
def update_RDS_images():
global RDS
while True:
RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png", 25)
time.sleep(10)
RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\140702_hi-res-logo.jpg", 25)
time.sleep(10)
RDS["images"]["logo"] = encodelogoimage(r"IDRBfavicon.jpg", 25)
time.sleep(10)
thread = threading.Thread(target=update_RDS)
thread.start() thread.start()
thread2 = threading.Thread(target=update_RDS_time) thread2 = threading.Thread(target=_RDS.update_RDS_time)
thread2.start() thread2.start()
thread4 = threading.Thread(target=update_RDS_images) thread4 = threading.Thread(target=_RDS.update_RDS_images)
thread4.start() thread4.start()
# Create a shared queue for encoded audio packets # Create a shared queue for encoded audio packets
@ -215,8 +101,8 @@ def encode_audio():
encoder = OpusBufferedEncoder() encoder = OpusBufferedEncoder()
encoder.set_application("audio") encoder.set_application("audio")
encoder.set_sampling_frequency(sample_rate) encoder.set_sampling_frequency(sample_rate)
encoder.set_channels(2) encoder.set_channels(_RDS.RDS["ContentInfo"]["channel"])
encoder.set_bitrates(64000) encoder.set_bitrates(_RDS.RDS["ContentInfo"]["bitrate"])
encoder.set_frame_size(60) encoder.set_frame_size(60)
while True: while True:
@ -232,8 +118,8 @@ def encode_audio2():
encoder2 = OpusBufferedEncoder() encoder2 = OpusBufferedEncoder()
encoder2.set_application("audio") encoder2.set_application("audio")
encoder2.set_sampling_frequency(sample_rate) encoder2.set_sampling_frequency(sample_rate)
encoder2.set_channels(2) encoder2.set_channels(_RDS.RDS2["ContentInfo"]["channel"])
encoder2.set_bitrates(8000) encoder2.set_bitrates(_RDS.RDS2["ContentInfo"]["bitrate"])
encoder2.set_frame_size(60) encoder2.set_frame_size(60)
while True: while True:
@ -253,10 +139,38 @@ audio_thread2 = threading.Thread(target=encode_audio2)
audio_thread2.start() audio_thread2.start()
connectionlist = [] connectionlist = []
connected_users = 0
if protocol == "TCP":
connected_users = 0
elif protocol == "ZMQ":
connected_users = "Unknown"
else:
print(f"{protocol} not supported")
exit()
timestart = time.time()
first = True first = True
firstcontent = {
"first": True,
"mainchannel": 1,
"channel": {
1: {
"Station": "DPRadio+",
"RDS": _RDS.RDS
},
2: {
"Station": "DPTest",
"RDS": _RDS.RDS2
}
},
"serverinfo": {
"Listener": connected_users,
"Country": "TH",
"Startat": timestart
}
}
def handle_client(): def handle_client():
global connected_users, first global connected_users, first
try: try:
@ -271,6 +185,7 @@ def handle_client():
ENchannel2 = channel2.get() ENchannel2 = channel2.get()
content = { content = {
"first": False,
"mainchannel": 1, "mainchannel": 1,
"channel": { "channel": {
1: { 1: {
@ -278,51 +193,61 @@ def handle_client():
"Encrypt": b'|||||' in ENchannel1, # check if encrypt "Encrypt": b'|||||' in ENchannel1, # check if encrypt
"ContentSize": len(ENchannel1), "ContentSize": len(ENchannel1),
"Content": ENchannel1, "Content": ENchannel1,
"RDS": RDS "RDS": _RDS.RDS
}, },
2: { 2: {
"Station": "DPTest", "Station": "DPTest",
"Encrypt": b'|||||' in ENchannel2, "Encrypt": b'|||||' in ENchannel2,
"ContentSize": len(ENchannel2), "ContentSize": len(ENchannel2),
"Content": ENchannel2, "Content": ENchannel2,
"RDS": RDS2 "RDS": _RDS.RDS2
} }
}, },
"serverinfo": { "serverinfo": {
"Listener": connected_users, "Listener": connected_users,
"Country": "TH", "Country": "TH",
"Startat": time.time() "Startat": timestart
} }
} }
#connection.sendall(pickle.dumps(content)) #connection.sendall(pickle.dumps(content))
for i in connectionlist: if protocol == "TCP":
try: for i in connectionlist:
i.sendall(pickle.dumps(content)) try:
except Exception as e: i.sendall(pickle.dumps(content))
#print(f'Error sending data to {i.getpeername()}: {e}') except Exception as e:
# Remove disconnected client from the list #print(f'Error sending data to {i.getpeername()}: {e}')
if i in connectionlist: # Remove disconnected client from the list
i.close() if i in connectionlist:
connectionlist.remove(i) i.close()
connected_users -= 1 connectionlist.remove(i)
# check if no user connected_users -= 1
if not connectionlist: # check if no user
first = True if not connectionlist:
break first = True
break
elif protocol == "ZMQ":
s.send(pickle.dumps(content))
except Exception as e: except Exception as e:
print(f'Error: {e}') print(f'Error: {e}')
# Your main server logic using threading for handling connections # Your main server logic using threading for handling connections
while True: if __name__ == "__main__":
print("Waiting for a connection...") print("server is running")
connection, client_address = s.accept() if protocol == "TCP":
print(f"Connected to {client_address}") while True:
print("Waiting for a connection...")
connection, client_address = s.accept()
print(f"Connected to {client_address}")
connectionlist.append(connection) connectionlist.append(connection)
connected_users += 1 connected_users += 1
if first: if first:
# Start a: new thread to handle the client # Start a: new thread to handle the client
client_thread = threading.Thread(target=handle_client)
# client_thread.daemon = True # Set the thread as a daemon so it exits when the main thread exits
client_thread.start()
first = False
elif protocol == "ZMQ":
client_thread = threading.Thread(target=handle_client) client_thread = threading.Thread(target=handle_client)
#client_thread.daemon = True # Set the thread as a daemon so it exits when the main thread exits # client_thread.daemon = True # Set the thread as a daemon so it exits when the main thread exits
client_thread.start() client_thread.start()
first = False