PyserSSH/demo/demo1.py
damp11113 78a6459d26 update 5.1
New ServerManager for manage multiple server with multiple protocol. ProWrapper for translate protocol from many protocol to one protocol (function).
2024-12-06 23:33:15 +07:00

422 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
os.environ["damp11113_load_all_module"] = "NO"
from damp11113.utils import TextFormatter
from damp11113.file import sort_files, allfiles
import socket
import time
import cv2
import traceback
import requests
from bs4 import BeautifulSoup
import numpy as np
import logging
# Configure logging
logging.basicConfig(format='[{asctime}] [{levelname}] {name}: {message}', datefmt='%Y-%m-%d %H:%M:%S', style='{', level=logging.DEBUG)
from PyserSSH import Server, AccountManager
from PyserSSH.interactive import Send, wait_input, wait_inputkey, wait_choose, Clear, wait_inputmouse
from PyserSSH.system.info import version, Flag_TH
from PyserSSH.extensions.processbar import indeterminateStatus, LoadingProgress
from PyserSSH.extensions.dialog import MenuDialog, TextDialog, TextInputDialog
from PyserSSH.extensions.moredisplay import clickable_url, Send_karaoke_effect
from PyserSSH.extensions.moreinteractive import ShowCursor
from PyserSSH.extensions.remodesk import RemoDesk
from PyserSSH.extensions.XHandler import XHandler
from PyserSSH.system.clientype import Client
from PyserSSH.system.RemoteStatus import remotestatus
from PyserSSH.utils.ServerManager import ServerManager
useraccount = AccountManager(allow_guest=True, autoload=True, autosave=True)
if not os.path.isfile("autosave_session.ses"):
useraccount.add_account("admin", "", sudo=True) # create user without password
useraccount.add_account("test", "test") # create user without password
useraccount.add_account("demo")
useraccount.add_account("remote", "12345", permissions=["remote_desktop"])
useraccount.set_user_enable_inputsystem_echo("remote", False)
useraccount.set_user_sftp_allow("admin", True)
XH = XHandler()
ssh = Server(useraccount,
system_commands=True,
system_message=False,
sftp=True,
enable_preauth_banner=True,
XHandler=XH)
remotedesktopserver = RemoDesk()
servername = "PyserSSH"
loading = ["PyserSSH", "openRemoDesk", "XHandler", "RemoteStatus"]
@ssh.on_user("pre-shell")
def guestauth(client):
if client.get_name() == "remote":
return
if not useraccount.has_user(client.get_name()):
while True:
Clear(client)
Send(client, f"You are currently logged in as a guest. To access, please login or register.\nYour current account: {client.get_name()}\n")
method = wait_choose(client, ["Login", "Register", "Exit"], prompt="Action: ")
Clear(client)
if method == 0: # login
username = wait_input(client, "Username: ", noabort=True)
if not username:
Send(client, "Please Enter username")
wait_inputkey(client, "Press any key to continue...")
continue
password = wait_input(client, "Password: ", password=True, noabort=True)
Send(client, "Please wait...")
if not useraccount.has_user(username):
Send(client, f"Username isn't exist. Please try again")
wait_inputkey(client, "Press any key to continue...")
continue
if not useraccount.validate_credentials(username, password):
Send(client, f"Password incorrect. Please try again")
wait_inputkey(client, "Press any key to continue...")
continue
Clear(client)
client.switch_user(username)
break
elif method == 1: # register
username = wait_input(client, "Please choose a username: ", noabort=True)
if not username:
Send(client, "Please Enter username")
wait_inputkey(client, "Press any key to continue...")
continue
if useraccount.has_user(username):
Send(client, f"Username is exist. Please try again")
wait_inputkey(client, "Press any key to continue...")
continue
password = wait_input(client, "Password: ", password=True, noabort=True)
if not password:
Send(client, "Please Enter password")
wait_inputkey(client, "Press any key to continue...")
continue
confirmpassword = wait_input(client, "Confirm Password: ", password=True, noabort=True)
if not password:
Send(client, "Please Enter confirm password")
wait_inputkey(client, "Press any key to continue...")
continue
if password != confirmpassword:
Send(client, "Password do not matching the confirm password. Please try again.")
wait_inputkey(client, "Press any key to continue...")
continue
Send(client, "Please wait...")
useraccount.add_account(username, password, ["user"])
client.switch_user(username)
Clear(client)
break
else:
client.close()
@ssh.on_user("connect")
def connect(client):
if client.get_name() == "remote":
return
client.set_prompt(client["current_user"] + "@" + servername + ":~$")
wm = f"""{Flag_TH()}{''*50}
Hello {client['current_user']},
This is testing server of PyserSSH v{version}.
Visit: {clickable_url("https://damp11113.xyz", "DPCloudev")}
{''*50}"""
for i in loading:
P = indeterminateStatus(client, f"Starting {i}", f"[ OK ] Started {i}")
P.start()
time.sleep(len(i) / 20)
P.stop()
Di1 = TextDialog(client, "Welcome!\n to PyserSSH test server", "PyserSSH Extension")
Di1.render()
for char in wm:
Send(client, char, ln=False)
#time.sleep(0.005) # Adjust the delay as needed
Send(client, '\n') # Send newline after each line
@ssh.on_user("authbanner")
def banner(tmp):
return "Hello World!\n", "en"
@ssh.on_user("error")
def error(client, error):
if isinstance(error, socket.error):
pass
else:
Send(client, traceback.format_exc())
@XH.command(name="startremotedesktop", category="Remote", permissions=["remote_desktop"])
def remotedesktop(client):
remotedesktopserver.handle_new_client(client)
@XH.command(name="passtest", category="Test Function")
def xh_passtest(client):
user = wait_input(client, "username: ")
password = wait_input(client, "password: ", password=True)
Send(client, f"username: {user} | password: {password}")
@XH.command(name="colortest", category="Test Function")
def xh_colortest(client):
for i in range(0, 255, 5):
Send(client, TextFormatter.format_text_truecolor(" ", background=f"{i};0;0"), ln=False)
Send(client, "")
for i in range(0, 255, 5):
Send(client, TextFormatter.format_text_truecolor(" ", background=f"0;{i};0"), ln=False)
Send(client, "")
for i in range(0, 255, 5):
Send(client, TextFormatter.format_text_truecolor(" ", background=f"0;0;{i}"), ln=False)
Send(client, "")
Send(client, "TrueColors 24-Bit")
@XH.command(name="keytest", category="Test Function")
def xh_keytest(client: Client):
user = wait_inputkey(client, "press any key", raw=True, timeout=1)
Send(client, "")
Send(client, f"key: {user}")
for i in range(10):
user = wait_inputkey(client, "press any key", raw=True, timeout=1)
Send(client, "")
Send(client, f"key: {user}")
@XH.command(name="typing")
def xh_typing(client: Client, messages, speed = 1):
for w in messages:
Send(client, w, ln=False)
time.sleep(float(speed))
Send(client, "")
@XH.command(name="renimtest")
def xh_renimtest(client: Client):
Clear(client)
image = cv2.imread("opensource.png", cv2.IMREAD_COLOR)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
width, height = client['windowsize']["width"] - 5, client['windowsize']["height"] - 5
# resize image
resized = cv2.resize(image, (width, height))
t = ""
# Scan all pixels
for y in range(0, height):
for x in range(0, width):
pixel_color = resized[y, x]
if pixel_color.tolist() != [0, 0, 0]:
t += TextFormatter.format_text_truecolor(" ",
background=f"{pixel_color[0]};{pixel_color[1]};{pixel_color[2]}")
else:
t += " "
Send(client, t, ln=False)
Send(client, "")
t = ""
@XH.command(name="errortest", category="Test Function")
def xh_errortest(client: Client):
raise Exception("hello error")
@XH.command(name="inloadtest", category="Test Function")
def xh_inloadtest(client: Client):
loading = indeterminateStatus(client)
loading.start()
time.sleep(5)
loading.stop()
@XH.command(name="loadtest", category="Test Function")
def xh_loadtest(client: Client):
l = LoadingProgress(client, total=100, color=True)
l.start()
for i in range(101):
l.current = i
l.status = f"loading {i}"
time.sleep(0.05)
l.stop()
@XH.command(name="dialogtest", category="Test Function")
def xh_dialogtest(client: Client):
Di1 = TextDialog(client, "Hello Dialog!", "PyserSSH Extension")
Di1.render()
@XH.command(name="dialogtest2", category="Test Function")
def xh_dialogtest2(client: Client):
Di2 = MenuDialog(client, ["H1", "H2", "H3"], "PyserSSH Extension", "Hello world")
Di2.render()
Send(client, f"selected index: {Di2.output()}")
@XH.command(name="dialogtest3", category="Test Function")
def xh_dialogtest3(client: Client):
Di3 = TextInputDialog(client, "PyserSSH Extension")
Di3.render()
Send(client, f"input: {Di3.output()}")
@XH.command(name="passdialogtest3", category="Test Function")
def xh_passdialogtest3(client: Client):
Di3 = TextInputDialog(client, "PyserSSH Extension", inputtitle="Password Here", password=True)
Di3.render()
Send(client, f"password: {Di3.output()}")
@XH.command(name="choosetest", category="Test Function")
def xh_choosetest(client: Client):
mylist = ["H1", "H2", "H3"]
cindex = wait_choose(client, mylist, "select: ")
Send(client, f"selected: {mylist[cindex]}")
@XH.command(name="vieweb")
def xh_vieweb(client: Client, url: str):
loading = indeterminateStatus(client, desc=f"requesting {url}...")
loading.start()
try:
content = requests.get(url).content
except:
loading.stopfail()
return
loading.stop()
loading = indeterminateStatus(client, desc=f"parsing html {url}...")
loading.start()
try:
soup = BeautifulSoup(content, 'html.parser')
# Extract only the text content
text_content = soup.get_text()
except:
loading.stopfail()
return
loading.stop()
Di1 = TextDialog(client, text_content, url)
Di1.render()
@XH.command(name="shutdown")
def xh_shutdown(client: Client, at: str):
if at == "now":
ssh.stop_server()
@XH.command(name="mouseinput", category="Test Function")
def xh_mouseinput(client: Client):
for i in range(10):
button, x, y = wait_inputmouse(client)
if button == 0:
Send(client, "Left Button")
elif button == 1:
Send(client, "Middle Button")
elif button == 2:
Send(client, "Right Button")
elif button == 3:
Send(client, "Button Up")
Send(client, f"Current POS: X {x} | Y {y} with button {button}")
@XH.command(name="karaoke")
def xh_karaoke(client: Client):
ShowCursor(client, False)
Send_karaoke_effect(client, "Python can print like karaoke!")
ShowCursor(client)
R1 = 1
R2 = 2
K2 = 5
theta_spacing = 0.07
phi_spacing = 0.02
illumination = np.fromiter(".,-~:;=!*#$@", dtype="<U1")
def render_frame(A: float, B: float, screen_size) -> np.ndarray:
K1 = screen_size * K2 * 3 / (8 * (R1 + R2))
"""
Returns a frame of the spinning 3D donut.
Based on the pseudocode from: https://www.a1k0n.net/2011/07/20/donut-math.html
"""
cos_A = np.cos(A)
sin_A = np.sin(A)
cos_B = np.cos(B)
sin_B = np.sin(B)
output = np.full((screen_size, screen_size), " ") # (40, 40)
zbuffer = np.zeros((screen_size, screen_size)) # (40, 40)
cos_phi = np.cos(phi := np.arange(0, 2 * np.pi, phi_spacing)) # (315,)
sin_phi = np.sin(phi) # (315,)
cos_theta = np.cos(theta := np.arange(0, 2 * np.pi, theta_spacing)) # (90,)
sin_theta = np.sin(theta) # (90,)
circle_x = R2 + R1 * cos_theta # (90,)
circle_y = R1 * sin_theta # (90,)
x = (np.outer(cos_B * cos_phi + sin_A * sin_B * sin_phi, circle_x) - circle_y * cos_A * sin_B).T # (90, 315)
y = (np.outer(sin_B * cos_phi - sin_A * cos_B * sin_phi, circle_x) + circle_y * cos_A * cos_B).T # (90, 315)
z = ((K2 + cos_A * np.outer(sin_phi, circle_x)) + circle_y * sin_A).T # (90, 315)
ooz = np.reciprocal(z) # Calculates 1/z
xp = (screen_size / 2 + K1 * ooz * x).astype(int) # (90, 315)
yp = (screen_size / 2 - K1 * ooz * y).astype(int) # (90, 315)
L1 = (((np.outer(cos_phi, cos_theta) * sin_B) - cos_A * np.outer(sin_phi, cos_theta)) - sin_A * sin_theta) # (315, 90)
L2 = cos_B * (cos_A * sin_theta - np.outer(sin_phi, cos_theta * sin_A)) # (315, 90)
L = np.around(((L1 + L2) * 8)).astype(int).T # (90, 315)
mask_L = L >= 0 # (90, 315)
chars = illumination[L] # (90, 315)
for i in range(90):
mask = mask_L[i] & (ooz[i] > zbuffer[xp[i], yp[i]]) # (315,)
zbuffer[xp[i], yp[i]] = np.where(mask, ooz[i], zbuffer[xp[i], yp[i]])
output[xp[i], yp[i]] = np.where(mask, chars[i], output[xp[i], yp[i]])
return output
@XH.command()
def donut(client, screen_size=40):
screen_size = int(screen_size)
A = 1
B = 1
for _ in range(screen_size * screen_size):
A += theta_spacing
B += phi_spacing
Clear(client)
array = render_frame(A, B, screen_size)
for row in array:
Send(client, " ".join(row))
Send(client, "\n")
if wait_inputkey(client, raw=True, timeout=0.01) == "\x03": break
@XH.command(name="status")
def xh_status(client: Client):
remotestatus(ssh, client.channel, True)
#@ssh.on_user("command")
#def command(client: Client, command: str):
#ssh.run(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'private_key.pem'))
manager = ServerManager()
# Add servers to the manager
manager.add_server("ssh", ssh, os.path.join(os.path.dirname(os.path.realpath(__file__)), 'private_key.pem'))
manager.add_server("telnet", ssh, "", protocol="telnet")
# Start a specific server
manager.start_server("ssh")
manager.start_server("telnet")