diff --git a/Client/client.py b/Client/client.py index 23949bd..64941ca 100644 --- a/Client/client.py +++ b/Client/client.py @@ -7,6 +7,7 @@ import socket import numpy as np import pickle import pyaudio +import zmq from pyogg import OpusDecoder from Crypto.Cipher import AES from Crypto.Protocol.KDF import scrypt @@ -92,18 +93,27 @@ class App: self.ccisdecrypt = None self.ccisdecryptpassword = None self.paudio = pyaudio.PyAudio() + self.cprotocol = None def connecttoserver(self, sender, data): dpg.configure_item("connectservergroup", show=False) - #protocol = dpg.get_value("serverprotocol") + protocol = dpg.get_value("serverprotocol") + self.cprotocol = protocol dpg.configure_item("serverstatus", default_value='connecting...', color=(255, 255, 0)) ip = dpg.get_value("serverip") port = dpg.get_value("serverport") - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - s.connect((ip, port)) - except: - dpg.configure_item("connectbutton", show=True) + if protocol == "TCP": + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect((ip, port)) + except: + dpg.configure_item("connectbutton", show=True) + elif protocol == "ZeroMQ": + context = zmq.Context() + s = context.socket(zmq.SUB) + s.connect(f"tcp://{ip}:{port}") + s.setsockopt_string(zmq.SUBSCRIBE, "") + self.working = True self.device_index_output = 0 @@ -196,14 +206,19 @@ class App: while True: try: if self.working: - data = b'' - #data = socket.recv(1580152) - while True: - part = socket.recv(1024) - data += part - if len(part) < 1024: - # either 0 or end of data - break + if self.cprotocol == "TCP": + data = b'' + #data = socket.recv(1580152) + while True: + part = socket.recv(1024) + data += part + if len(part) < 1024: + # either 0 or end of data + break + elif self.cprotocol == "ZeroMQ": + data = socket.recv() + else: + data = b"" bytesconunt += len(data) @@ -363,11 +378,11 @@ class App: dpg.add_spacer() dpg.add_image("station_logo", show=False, tag="station_logo_config") dpg.add_text("", tag="RDSinfo", show=False) - with dpg.child_window(tag="connectservergroup", label="Server", use_internal_label=True, height=105): + with dpg.child_window(tag="connectservergroup", label="Server", use_internal_label=True, height=130): dpg.add_button(label="select server", tag="selectserverbutton") dpg.add_input_text(label="server ip", tag="serverip", default_value="localhost") dpg.add_input_int(label="port", tag="serverport", max_value=65535, default_value=6980) - #dpg.add_combo(["TCP", "Websocket"], label="protocol", tag="serverprotocol", default_value="TCP") + dpg.add_combo(["TCP", "ZeroMQ"], label="protocol", tag="serverprotocol", default_value="TCP") dpg.add_button(label="connect", callback=self.connecttoserver, tag="connectbutton") dpg.add_spacer() dpg.add_button(label="More RDS info", callback=lambda: dpg.configure_item("RDSwindow", show=True), tag="morerdsbutton", show=False) diff --git a/Server/RDS.py b/Server/RDS.py new file mode 100644 index 0000000..083f51d --- /dev/null +++ b/Server/RDS.py @@ -0,0 +1,129 @@ +import time +from datetime import datetime + +import cv2 +import numpy as np + +from damp11113 import scrollTextBySteps + + +def encodelogoimage(path, quality=50): + image = cv2.resize(cv2.imread(path), (128, 128)) + # Encode the image as JPEG with higher compression (lower quality) + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] # Adjust quality (50 is just an example) + result, encoded_image = cv2.imencode('.jpg', image, encode_param) + encoded_bytes = np.array(encoded_image).tobytes() + return encoded_bytes + +RDS = { + "PS": "DPRadio", + "RT": "Testing internet radio", + "PI": 0x27C8, # static + "PTY": 0, + "PTY+": "Testing", + "Country": "TH", + "Coverage": "All", + "CT": { + "Local": None, + "UTC": None, + }, + "PIN": 12345, + "TMC": { + "TP": False, + "TA": False, + "Messages": None + }, + "ECC": None, + "LIC": None, + "AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF + "ArtificialHead": False, + "Compressed": False, + "DyPTY": False, + "EPG": None, + "AS": [ # AS = Alternative Server + # can add more server here + ], + "EON": [ + # can add more here + ], + "ContentInfo": { + "Codec": "opus", + "bitrate": 64000, + "channel": 2, + "samplerates": 48000 + }, + "images": { + "logo": encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png") + } +} + +RDS2 = { + "PS": "DPTest", + "RT": "Testing internet radio", + "PI": 0x27C6, + "PTY": 0, + "PTY+": "Testing", + "Country": "TH", + "Coverage": "All", + "CT": { + "Local": None, + "UTC": None, + }, + "PIN": 12345, + "TMC": { + "TP": False, + "TA": False, + "Messages": None + }, + "ECC": None, + "LIC": None, + "AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF + "ArtificialHead": False, + "Compressed": False, + "DyPTY": False, + "EPG": None, + "AS": [ # AS = Alternative Server + # can add more server here + ], + "EON": [ + # can add more server here + ], + "ContentInfo": { + "Codec": "Opus", + "bitrate": 8000, + "channel": 2, + "samplerates": 48000 + }, + "images": { + "logo": None + } +} + + +def update_RDS(): + global RDS + while True: + pstext = "DPRadio Testing Broadcasting " + for i in range(0, len(pstext)): + RDS["PS"] = scrollTextBySteps(pstext, i) + time.sleep(1) + +def update_RDS_time(): + global RDS + while True: + RDS["CT"]["Local"] = datetime.now().timestamp() + RDS["CT"]["UTC"] = datetime.utcnow().timestamp() + RDS2["CT"]["Local"] = datetime.now().timestamp() + RDS2["CT"]["UTC"] = datetime.utcnow().timestamp() + time.sleep(1) + +def update_RDS_images(): + global RDS + while True: + RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png", 25) + time.sleep(10) + RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\140702_hi-res-logo.jpg", 25) + time.sleep(10) + RDS["images"]["logo"] = encodelogoimage(r"IDRBfavicon.jpg", 25) + time.sleep(10) + diff --git a/Server/__pycache__/RDS.cpython-310.pyc b/Server/__pycache__/RDS.cpython-310.pyc new file mode 100644 index 0000000..c204398 Binary files /dev/null and b/Server/__pycache__/RDS.cpython-310.pyc differ diff --git a/Server/server.py b/Server/server.py index c832440..52b1e8d 100644 --- a/Server/server.py +++ b/Server/server.py @@ -5,13 +5,12 @@ from pyogg import OpusBufferedEncoder import numpy as np import pickle import threading -from damp11113 import scrollTextBySteps +import RDS as _RDS from queue import Queue -from datetime import datetime, timezone -import cv2 from Crypto.Cipher import AES from Crypto.Protocol.KDF import scrypt from Crypto.Random import get_random_bytes +import zmq def pad_message(message_bytes): block_size = AES.block_size @@ -40,13 +39,22 @@ def encrypt_data(message_bytes, password): return encrypted_message, salt, iv -# create tcp -s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -# wait for connection -server_port = ('localhost', 6980) -s.bind(server_port) +protocol = "ZMQ" +server_port = ('*', 6980) -s.listen(1) +if protocol == "TCP": + # create tcp + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # wait for connection + s.bind(server_port) + s.listen(1) +elif protocol == "ZMQ": + context = zmq.Context() + s = context.socket(zmq.PUB) + s.bind(f"tcp://{server_port[0]}:{server_port[1]}") +else: + print(f"{protocol} not supported") + exit() p = pyaudio.PyAudio() @@ -74,135 +82,13 @@ streaminput = p.open(format=pyaudio.paInt16, channels=2, rate=sample_rate, input streaminput2 = p.open(format=pyaudio.paInt16, channels=2, rate=sample_rate, input=True, input_device_index=device_index_input2) -def encodelogoimage(path, quality=50): - image = cv2.resize(cv2.imread(path), (128, 128)) - # Encode the image as JPEG with higher compression (lower quality) - encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] # Adjust quality (50 is just an example) - result, encoded_image = cv2.imencode('.jpg', image, encode_param) - encoded_bytes = np.array(encoded_image).tobytes() - return encoded_bytes - - -RDS = { - "PS": "DPRadio", - "RT": "Testing internet radio", - "PI": 0x27C8, # static - "PTY": 0, - "PTY+": "Testing", - "Country": "TH", - "Coverage": "All", - "CT": { - "Local": None, - "UTC": None, - }, - "PIN": 12345, - "TMC": { - "TP": False, - "TA": False, - "Messages": None - }, - "ECC": None, - "LIC": None, - "AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF - "ArtificialHead": False, - "Compressed": False, - "DyPTY": False, - "EPG": None, - "AS": [ # AS = Alternative Server - # can add more server here - ], - "EON": [ - # can add more here - ], - "ContentInfo": { - "Codec": "opus", - "bitrate": 64000, - "channel": 2, - "samplerates": sample_rate - }, - "images": { - "logo": encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png") - } -} - -RDS2 = { - "PS": "DPTest", - "RT": "Testing internet radio", - "PI": 0x27C6, - "PTY": 0, - "PTY+": "Testing", - "Country": "TH", - "Coverage": "All", - "CT": { - "Local": None, - "UTC": None, - }, - "PIN": 12345, - "TMC": { - "TP": False, - "TA": False, - "Messages": None - }, - "ECC": None, - "LIC": None, - "AudioMode": "Stereo", # mono, stereo, surround 5.1/7.1, HRTF - "ArtificialHead": False, - "Compressed": False, - "DyPTY": False, - "EPG": None, - "AS": [ # AS = Alternative Server - # can add more server here - ], - "EON": [ - # can add more server here - ], - "ContentInfo": { - "Codec": "Opus", - "bitrate": 8000, - "channel": 2, - "samplerates": sample_rate - }, - "images": { - "logo": None - } -} - - -lock = threading.Lock() - -def update_RDS(): - global RDS - with lock: - while True: - pstext = "DPRadio Testing Broadcasting " - for i in range(0, len(pstext)): - RDS["PS"] = scrollTextBySteps(pstext, i) - time.sleep(1) - -def update_RDS_time(): - global RDS - while True: - RDS["CT"]["Local"] = datetime.now().timestamp() - RDS["CT"]["UTC"] = datetime.utcnow().timestamp() - time.sleep(1) - -def update_RDS_images(): - global RDS - while True: - RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\dpstream iptv logo.png", 25) - time.sleep(10) - RDS["images"]["logo"] = encodelogoimage(r"C:\Users\sansw\3D Objects\140702_hi-res-logo.jpg", 25) - time.sleep(10) - RDS["images"]["logo"] = encodelogoimage(r"IDRBfavicon.jpg", 25) - time.sleep(10) - -thread = threading.Thread(target=update_RDS) +thread = threading.Thread(target=_RDS.update_RDS) thread.start() -thread2 = threading.Thread(target=update_RDS_time) +thread2 = threading.Thread(target=_RDS.update_RDS_time) thread2.start() -thread4 = threading.Thread(target=update_RDS_images) +thread4 = threading.Thread(target=_RDS.update_RDS_images) thread4.start() # Create a shared queue for encoded audio packets @@ -215,8 +101,8 @@ def encode_audio(): encoder = OpusBufferedEncoder() encoder.set_application("audio") encoder.set_sampling_frequency(sample_rate) - encoder.set_channels(2) - encoder.set_bitrates(64000) + encoder.set_channels(_RDS.RDS["ContentInfo"]["channel"]) + encoder.set_bitrates(_RDS.RDS["ContentInfo"]["bitrate"]) encoder.set_frame_size(60) while True: @@ -232,8 +118,8 @@ def encode_audio2(): encoder2 = OpusBufferedEncoder() encoder2.set_application("audio") encoder2.set_sampling_frequency(sample_rate) - encoder2.set_channels(2) - encoder2.set_bitrates(8000) + encoder2.set_channels(_RDS.RDS2["ContentInfo"]["channel"]) + encoder2.set_bitrates(_RDS.RDS2["ContentInfo"]["bitrate"]) encoder2.set_frame_size(60) while True: @@ -253,10 +139,38 @@ audio_thread2 = threading.Thread(target=encode_audio2) audio_thread2.start() connectionlist = [] -connected_users = 0 + +if protocol == "TCP": + connected_users = 0 +elif protocol == "ZMQ": + connected_users = "Unknown" +else: + print(f"{protocol} not supported") + exit() +timestart = time.time() first = True +firstcontent = { + "first": True, + "mainchannel": 1, + "channel": { + 1: { + "Station": "DPRadio+", + "RDS": _RDS.RDS + }, + 2: { + "Station": "DPTest", + "RDS": _RDS.RDS2 + } + }, + "serverinfo": { + "Listener": connected_users, + "Country": "TH", + "Startat": timestart + } +} + def handle_client(): global connected_users, first try: @@ -271,6 +185,7 @@ def handle_client(): ENchannel2 = channel2.get() content = { + "first": False, "mainchannel": 1, "channel": { 1: { @@ -278,51 +193,61 @@ def handle_client(): "Encrypt": b'|||||' in ENchannel1, # check if encrypt "ContentSize": len(ENchannel1), "Content": ENchannel1, - "RDS": RDS + "RDS": _RDS.RDS }, 2: { "Station": "DPTest", "Encrypt": b'|||||' in ENchannel2, "ContentSize": len(ENchannel2), "Content": ENchannel2, - "RDS": RDS2 + "RDS": _RDS.RDS2 } }, "serverinfo": { "Listener": connected_users, "Country": "TH", - "Startat": time.time() + "Startat": timestart } } #connection.sendall(pickle.dumps(content)) - for i in connectionlist: - try: - i.sendall(pickle.dumps(content)) - except Exception as e: - #print(f'Error sending data to {i.getpeername()}: {e}') - # Remove disconnected client from the list - if i in connectionlist: - i.close() - connectionlist.remove(i) - connected_users -= 1 - # check if no user - if not connectionlist: - first = True - break + if protocol == "TCP": + for i in connectionlist: + try: + i.sendall(pickle.dumps(content)) + except Exception as e: + #print(f'Error sending data to {i.getpeername()}: {e}') + # Remove disconnected client from the list + if i in connectionlist: + i.close() + connectionlist.remove(i) + connected_users -= 1 + # check if no user + if not connectionlist: + first = True + break + elif protocol == "ZMQ": + s.send(pickle.dumps(content)) except Exception as e: print(f'Error: {e}') # Your main server logic using threading for handling connections -while True: - print("Waiting for a connection...") - connection, client_address = s.accept() - print(f"Connected to {client_address}") +if __name__ == "__main__": + print("server is running") + if protocol == "TCP": + while True: + print("Waiting for a connection...") + connection, client_address = s.accept() + print(f"Connected to {client_address}") - connectionlist.append(connection) - connected_users += 1 - if first: - # Start a: new thread to handle the client + connectionlist.append(connection) + connected_users += 1 + if first: + # Start a: new thread to handle the client + client_thread = threading.Thread(target=handle_client) + # client_thread.daemon = True # Set the thread as a daemon so it exits when the main thread exits + client_thread.start() + first = False + elif protocol == "ZMQ": client_thread = threading.Thread(target=handle_client) - #client_thread.daemon = True # Set the thread as a daemon so it exits when the main thread exits - client_thread.start() - first = False \ No newline at end of file + # client_thread.daemon = True # Set the thread as a daemon so it exits when the main thread exits + client_thread.start() \ No newline at end of file