diff --git a/demo/server.py b/demo/server.py index eece326..da61dae 100644 --- a/demo/server.py +++ b/demo/server.py @@ -2,16 +2,21 @@ import os import socket import time import shlex -from damp11113 import TextFormatter +from damp11113 import TextFormatter, sort_files, allfiles import cv2 import traceback +import requests +from bs4 import BeautifulSoup -from PyserSSH import Server, AccountManager, Send, wait_input, wait_inputkey +from PyserSSH import Server, AccountManager, Send, wait_input, wait_inputkey, wait_choose, Clear, Title from PyserSSH.system.info import system_banner from PyserSSH.extensions.processbar import indeterminateStatus, LoadingProgress +from PyserSSH.extensions.dialog import MenuDialog, TextDialog, TextInputDialog +from PyserSSH.extensions.moredisplay import clickable_url useraccount = AccountManager() useraccount.add_account("admin", "") # create user without password +useraccount.add_account("test", "test") # create user without password ssh = Server(useraccount, system_commands=True, system_message=False) @@ -22,89 +27,115 @@ Please ensure you have proper authorization before proceeding.""" Authorizedmessage = """You have successfully connected to the server. Enjoy your session and remember to follow security protocols.""" +loading = ["PyserSSH", "Extensions"] + @ssh.on_user("connect") -def connect(channel, client): +def connect(client): + Title(client, "PyserSSH") #print(client["windowsize"]) if client['current_user'] == "": warningmessage = nonamewarning else: warningmessage = Authorizedmessage - wm = f"""********************************************************************************************* Hello {client['current_user']}, {warningmessage} +Visit: {clickable_url("https://damp11113.xyz", "DPCloudev")} + {system_banner} *********************************************************************************************""" - for char in wm: - Send(channel, char, ln=False) - time.sleep(0.005) # Adjust the delay as needed - Send(channel, '\n') # Send newline after each line + if client['current_user'] != "test": + for i in loading: + P = indeterminateStatus(client, f"Starting {i}", f"[ OK ] Started {i}") + P.start() + + time.sleep(len(i) / 20) + + P.stop() + + Di1 = TextDialog(client, "PyserSSH Extension", "Welcome!\n to PyserSSH test server") + Di1.render() + + for char in wm: + Send(client, char, ln=False) + # time.sleep(0.005) # Adjust the delay as needed + Send(client, '\n') # Send newline after each line @ssh.on_user("error") -def error(channel, error, client): +def error(client, error): if isinstance(error, socket.error): pass else: - Send(channel, traceback.format_exc()) + Send(client, traceback.format_exc()) + + +#@ssh.on_user("onrawtype") +#def onrawtype(client, key): +# print(key) @ssh.on_user("command") -def command(channel, command: str, client): +def command(client, command: str): if command == "passtest": - user = wait_input(channel, "username: ") - password = wait_input(channel, "password: ", password=True) - Send(channel, f"username: {user} | password: {password}") + user = wait_input(client, "username: ") + password = wait_input(client, "password: ", password=True) + Send(client, f"username: {user} | password: {password}") elif command == "colortest": for i in range(0, 255, 5): - Send(channel, TextFormatter.format_text_truecolor(" ", background=f"{i};0;0"), ln=False) - Send(channel, "") + Send(client, TextFormatter.format_text_truecolor(" ", background=f"{i};0;0"), ln=False) + Send(client, "") for i in range(0, 255, 5): - Send(channel, TextFormatter.format_text_truecolor(" ", background=f"0;{i};0"), ln=False) - Send(channel, "") + Send(client, TextFormatter.format_text_truecolor(" ", background=f"0;{i};0"), ln=False) + Send(client, "") for i in range(0, 255, 5): - Send(channel, TextFormatter.format_text_truecolor(" ", background=f"0;0;{i}"), ln=False) - Send(channel, "") - - Send(channel, "TrueColors 24-Bit") + Send(client, TextFormatter.format_text_truecolor(" ", background=f"0;0;{i}"), ln=False) + Send(client, "") + Send(client, "TrueColors 24-Bit") elif command == "keytest": - user = wait_inputkey(channel, "press any key", raw=True) - Send(channel, "") - Send(channel, f"key: {user}") + user = wait_inputkey(client, "press any key", raw=True) + Send(client, "") + Send(client, f"key: {user}") for i in range(10): - user = wait_inputkey(channel, "press any key", raw=True) - Send(channel, "") - Send(channel, f"key: {user}") + user = wait_inputkey(client, "press any key", raw=True) + Send(client, "") + Send(client, f"key: {user}") elif command.startswith("typing"): args = shlex.split(command) messages = args[1] speed = float(args[2]) for w in messages: - Send(channel, w, ln=False) + Send(client, w, ln=False) time.sleep(speed) - Send(channel, "") - elif command == "renimtest": - image = cv2.imread(r"opensource.png", cv2.IMREAD_COLOR) + Send(client, "") + elif command.startswith("renimtest"): + args = shlex.split(command) + Clear(client) + image = cv2.imread(f"opensource.png", cv2.IMREAD_COLOR) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - width, height = client['windowsize']["width"], client['windowsize']["height"] + width, height = client['windowsize']["width"]-5, client['windowsize']["height"]-5 # resize image resized = cv2.resize(image, (width, height)) + t = "" # Scan all pixels for y in range(0, height): for x in range(0, width): pixel_color = resized[y, x] - #PyserSSH.Send(channel, f"Pixel color at ({x}, {y}): {pixel_color}") + # PyserSSH.Send(channel, f"Pixel color at ({x}, {y}): {pixel_color}") if pixel_color.tolist() != [0, 0, 0]: - Send(channel, TextFormatter.format_text_truecolor(" ", background=f"{pixel_color[0]};{pixel_color[1]};{pixel_color[2]}"), ln=False) + t += TextFormatter.format_text_truecolor(" ", background=f"{pixel_color[0]};{pixel_color[1]};{pixel_color[2]}") else: - Send(channel, " ", ln=False) + t += " " + + Send(client, t, ln=False) + Send(client, "") + t = "" - Send(channel, "") elif command == "errortest": raise Exception("hello error") elif command == "inloadtest": @@ -117,8 +148,50 @@ def command(channel, command: str, client): l.start() for i in range(101): l.current = i - l.desc = "loading..." + l.status = f"loading {i}" time.sleep(0.05) l.stop() + elif command == "dialogtest": + Di1 = TextDialog(client, "PyserSSH Extension", "Hello Dialog!") + Di1.render() + elif command == "dialogtest2": + Di2 = MenuDialog(client, ["H1", "H2", "H3"], "PyserSSH Extension", "Hello world") + Di2.render() + Send(client, f"selected index: {Di2.output()}") + elif command == "dialogtest3": + Di3 = TextInputDialog(client, "PyserSSH Extension") + Di3.render() + Send(client, f"input: {Di3.output()}") + elif command == "passdialogtest3": + Di3 = TextInputDialog(client, "PyserSSH Extension", inputtitle="Password Here", password=True) + Di3.render() + Send(client, f"password: {Di3.output()}") + elif command == "choosetest": + cindex = wait_choose(client, ["H1", "H2", "H3"], "select: ") + Send(client, f"selected index: {cindex}") + elif command.startswith("vieweb"): + args = shlex.split(command) + url = args[1] + loading = indeterminateStatus(client, desc=f"requesting {url}...") + loading.start() + try: + content = requests.get(url).content + except: + loading.stopfail() + return + loading.stop() + loading = indeterminateStatus(client, desc=f"parsing html {url}...") + loading.start() + try: + soup = BeautifulSoup(content, 'html.parser') + # Extract only the text content + text_content = soup.get_text() + except: + loading.stopfail() + return + loading.stop() + Di1 = TextDialog(client, url, text_content) + Di1.render() -ssh.run(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'private_key.pem')) \ No newline at end of file + +ssh.run(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'private_key.pem')) diff --git a/setup.py b/setup.py index a6aac67..23dce92 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open('README.md', 'r', encoding='utf-8') as f: setup( name='PyserSSH', - version='4.0', + version='4.2.1', # update pypi (no update for 4.3) license='MIT', author='damp11113', author_email='damp51252@gmail.com', @@ -17,5 +17,8 @@ setup( long_description_content_type='text/markdown', install_requires=[ "paramiko" - ] + ], + extras_require={ + "fullsyscom": ["damp11113"] + } ) \ No newline at end of file diff --git a/src/PyserSSH/__init__.py b/src/PyserSSH/__init__.py index 982647d..2f66e1b 100644 --- a/src/PyserSSH/__init__.py +++ b/src/PyserSSH/__init__.py @@ -25,6 +25,44 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +""" +note + +ansi cursor arrow +up - \x1b[A +down - \x1b[B +left - \x1b[D +right - \x1b[C + +https://en.wikipedia.org/wiki/ANSI_escape_code +""" +import os +import logging + from .interactive import * from .server import Server from .account import AccountManager +from .system.info import system_banner + + +try: + os.environ["pyserssh_systemmessage"] +except: + os.environ["pyserssh_systemmessage"] = "YES" + +try: + os.environ["pyserssh_enable_damp11113"] +except: + os.environ["pyserssh_enable_damp11113"] = "YES" + +try: + os.environ["pyserssh_log"] +except: + os.environ["pyserssh_log"] = "NO" + +if os.environ["pyserssh_log"]: + logger = logging.getLogger("PyserSSH") + logger.disabled = True + +if os.environ["pyserssh_systemmessage"] == "YES": + print(system_banner) diff --git a/src/PyserSSH/account.py b/src/PyserSSH/account.py index c9f52a5..f84701b 100644 --- a/src/PyserSSH/account.py +++ b/src/PyserSSH/account.py @@ -111,6 +111,15 @@ class AccountManager: return self.accounts[username]["sftp_path"] return "" + def get_user_timeout(self, username): + if username in self.accounts and "timeout" in self.accounts[username]: + return self.accounts[username]["timeout"] + return 0 + + def set_user_timeout(self, username, timeout=0): + if username in self.accounts: + self.accounts[username]["timeout"] = timeout + def add_history(self, username, command): if not self.anyuser: if username in self.accounts: diff --git a/src/PyserSSH/extensions/XHandler.py b/src/PyserSSH/extensions/XHandler.py new file mode 100644 index 0000000..6098262 --- /dev/null +++ b/src/PyserSSH/extensions/XHandler.py @@ -0,0 +1,178 @@ +import inspect +import shlex + +from ..interactive import Send + +class XHandler: + def __init__(self, enablehelp=True, showusageonworng=True): + self.handlers = {} + self.categories = {} + self.enablehelp = enablehelp + self.showusageonworng = showusageonworng + + self.commandnotfound = None + + def command(self, category=None, name=None, aliases=None): + def decorator(func): + nonlocal name, category + if name is None: + name = func.__name__ + command_name = name + command_description = func.__doc__ # Read the docstring + parameters = inspect.signature(func).parameters + command_args = [] + for param in list(parameters.values())[1:]: # Exclude first parameter (client) + if param.default != inspect.Parameter.empty: # Check if parameter has default value + if param.annotation == bool: + command_args.append(f"-{param.name}") + else: + command_args.append((f"{param.name}", param.default)) + else: + command_args.append(param.name) + if category is None: + category = 'No Category' + if category not in self.categories: + self.categories[category] = {} + self.categories[category][command_name] = { + 'description': command_description.strip() if command_description else "", + 'args': command_args + } + self.handlers[command_name] = func + if aliases: + for alias in aliases: + self.handlers[alias] = func + return func + + return decorator + + def call(self, client, command_string): + tokens = shlex.split(command_string) + command_name = tokens[0] + args = tokens[1:] + if command_name == "help" and self.enablehelp: + if args: + Send(client, self.get_help_command_info(args[0])) + else: + Send(client, self.get_help_message()) + Send(client, "Type 'help ' for more info on a command.") + else: + if command_name in self.handlers: + command_func = self.handlers[command_name] + command_args = inspect.signature(command_func).parameters + if len(args) % 2 != 0 and not args[0].startswith("--"): + if self.showusageonworng: + Send(client, self.get_help_command_info(command_name)) + else: + Send(client, f"Invalid number of arguments for command '{command_name}'.") + return + # Parse arguments + final_args = {} + for i in range(0, len(args), 2): + if args[i].startswith("--"): + arg_name = args[i].lstrip('--') + if arg_name not in command_args: + if self.showusageonworng: + Send(client, self.get_help_command_info(command_name)) + else: + Send(client, f"Invalid flag '{arg_name}' for command '{command_name}'.") + return + try: + args[i + 1] + except: + pass + else: + if self.showusageonworng: + Send(client, self.get_help_command_info(command_name)) + else: + Send(client, f"value '{args[i + 1]}' not available for '{arg_name}' flag for command '{command_name}'.") + return + final_args[arg_name] = True + else: + arg_name = args[i].lstrip('-') + if arg_name not in command_args: + if self.showusageonworng: + Send(client, self.get_help_command_info(command_name)) + else: + Send(client, f"Invalid argument '{arg_name}' for command '{command_name}'.") + return + arg_value = args[i + 1] + final_args[arg_name] = arg_value + # Match parsed arguments to function parameters + final_args_list = [] + for param in list(command_args.values())[1:]: # Skip client argument + if param.name in final_args: + final_args_list.append(final_args[param.name]) + elif param.default != inspect.Parameter.empty: + final_args_list.append(param.default) + else: + if self.showusageonworng: + Send(client, self.get_help_command_info(command_name)) + else: + Send(client, f"Missing required argument '{param.name}' for command '{command_name}'") + return + return command_func(client, *final_args_list) + else: + if self.commandnotfound: + self.commandnotfound(client, command_name) + return + else: + Send(client, f"{command_name} not found") + return + + def get_command_info(self, command_name): + found_command = None + for category, commands in self.categories.items(): + if command_name in commands: + found_command = commands[command_name] + break + else: + for cmd, cmd_info in commands.items(): + if 'aliases' in cmd_info and command_name in cmd_info['aliases']: + found_command = cmd_info + break + if found_command: + break + + if found_command: + return { + 'name': command_name, + 'description': found_command['description'].strip() if found_command['description'] else "", + 'args': found_command['args'], + 'category': category + } + + def get_help_command_info(self, command): + command_info = self.get_command_info(command) + aliases = command_info.get('aliases', []) + help_message = f"{command_info['name']}" + if aliases: + help_message += f" ({', '.join(aliases)})" + help_message += "\n" + help_message += f"{command_info['description']}\n" + help_message += f"Usage: {command_info['name']}" + for arg in command_info['args']: + if isinstance(arg, tuple): + if isinstance(arg[1], bool): + help_message += f" [--{arg[0]}]" + else: + help_message += f" [-{arg[0]} {arg[1]}]" + else: + help_message += f" <{arg}>" + return help_message + + def get_help_message(self): + help_message = "" + for category, commands in self.categories.items(): + help_message += f"{category}:\n" + for command_name, command_info in commands.items(): + help_message += f" {command_name}" + if command_info['description']: + help_message += f" - {command_info['description']}" + help_message += "\n" + return help_message + + def get_all_commands(self): + all_commands = {} + for category, commands in self.categories.items(): + all_commands[category] = commands + return all_commands \ No newline at end of file diff --git a/src/PyserSSH/extensions/dialog.py b/src/PyserSSH/extensions/dialog.py new file mode 100644 index 0000000..6fe78ad --- /dev/null +++ b/src/PyserSSH/extensions/dialog.py @@ -0,0 +1,203 @@ +""" +PyserSSH - A SSH server. For more info visit https://github.com/damp11113/PyserSSH +Copyright (C) 2023-2024 damp11113 (MIT) + +Visit https://github.com/damp11113/PyserSSH + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import re + +from ..interactive import Clear, Send, wait_inputkey +from ..system.sysfunc import text_centered_screen + +class TextDialog: + def __init__(self, client, title="", content=""): + self.client = client + + self.windowsize = client["windowsize"] + self.title = title + self.content = content + + def render(self): + Clear(self.client) + Send(self.client, self.title) + Send(self.client, "-" * self.windowsize["width"]) + + generatedwindow = text_centered_screen(self.content, self.windowsize["width"], self.windowsize["height"]-3, " ") + + Send(self.client, generatedwindow) + + Send(self.client, "Press 'enter' to continue", ln=False) + + self.waituserenter() + + def waituserenter(self): + while True: + if wait_inputkey(self.client, raw=True) == b'\r': + Clear(self.client) + break + pass + +class MenuDialog: + def __init__(self, client, choose: list, title="", desc=""): + self.client = client + + self.title = title + self.choose = choose + self.desc = desc + self.contentallindex = len(choose) - 1 + self.selectedindex = 0 + self.selectstatus = 0 # 0 none 1 selected 2 cancel + + def render(self): + tempcontentlist = self.choose.copy() + + Clear(self.client) + Send(self.client, self.title) + Send(self.client, "-" * self.client["windowsize"]["width"]) + + tempcontentlist[self.selectedindex] = "> " + tempcontentlist[self.selectedindex] + + exported = "\n".join(tempcontentlist) + + if not self.desc.strip() == "": + contenttoshow = ( + f"{self.desc}\n\n" + f"{exported}" + ) + else: + contenttoshow = ( + f"{exported}" + ) + + generatedwindow = text_centered_screen(contenttoshow, self.client["windowsize"]["width"], self.client["windowsize"]["height"]-3, " ") + + Send(self.client, generatedwindow) + + Send(self.client, "Use arrow up/down key to choose and press 'enter' to select or 'c' to cancel", ln=False) + + self._waituserinput() + + def _waituserinput(self): + keyinput = wait_inputkey(self.client, raw=True) + + if keyinput == b'\r': # Enter key + Clear(self.client) + self.selectstatus = 1 + elif keyinput == b'c': # 'c' key for cancel + Clear(self.client) + self.selectstatus = 2 + elif keyinput == b'\x1b[A': # Up arrow key + self.selectedindex -= 1 + if self.selectedindex < 0: + self.selectedindex = 0 + elif keyinput == b'\x1b[B': # Down arrow key + self.selectedindex += 1 + if self.selectedindex > self.contentallindex: + self.selectedindex = self.contentallindex + + if self.selectstatus == 2: + self.output() + elif self.selectstatus == 1: + self.output() + else: + self.render() + + def output(self): + if self.selectstatus == 2: + return None + elif self.selectstatus == 1: + return self.selectedindex + +class TextInputDialog: + def __init__(self, client, title="", inputtitle="Input Here", password=False): + self.client = client + + self.title = title + self.inputtitle = inputtitle + self.ispassword = password + + self.inputstatus = 0 # 0 none 1 selected 2 cancel + self.buffer = bytearray() + self.cursor_position = 0 + + def render(self): + Clear(self.client) + Send(self.client, self.title) + Send(self.client, "-" * self.client["windowsize"]["width"]) + + if self.ispassword: + texts = ( + f"{self.inputtitle}\n\n" + "> " + ("*" * len(self.buffer.decode('utf-8'))) + ) + else: + texts = ( + f"{self.inputtitle}\n\n" + "> " + self.buffer.decode('utf-8') + ) + + generatedwindow = text_centered_screen(texts, self.client["windowsize"]["width"], self.client["windowsize"]["height"]-3, " ") + + Send(self.client, generatedwindow) + + Send(self.client, "Press 'enter' to select or 'ctrl+c' to cancel", ln=False) + + self._waituserinput() + + def _waituserinput(self): + keyinput = wait_inputkey(self.client, raw=True) + + if keyinput == b'\r': # Enter key + Clear(self.client) + self.inputstatus = 1 + elif keyinput == b'\x03': # 'ctrl + c' key for cancel + Clear(self.client) + self.inputstatus = 2 + + try: + if keyinput == b'\x7f' or keyinput == b'\x08': # Backspace + if self.cursor_position > 0: + # Move cursor back, erase character, move cursor back again + self.buffer = self.buffer[:self.cursor_position - 1] + self.buffer[self.cursor_position:] + self.cursor_position -= 1 + elif bool(re.compile(b'\x1b\[[0-9;]*[mGK]').search(keyinput)): + pass + else: # Regular character + self.buffer = self.buffer[:self.cursor_position] + keyinput + self.buffer[self.cursor_position:] + self.cursor_position += 1 + except Exception: + raise + + if self.inputstatus == 2: + self.output() + elif self.inputstatus == 1: + self.output() + else: + self.render() + + def output(self): + if self.inputstatus == 2: + return None + elif self.inputstatus == 1: + return self.buffer.decode('utf-8') diff --git a/src/PyserSSH/extensions/moredisplay.py b/src/PyserSSH/extensions/moredisplay.py new file mode 100644 index 0000000..e3ad84f --- /dev/null +++ b/src/PyserSSH/extensions/moredisplay.py @@ -0,0 +1,91 @@ +""" +PyserSSH - A SSH server. For more info visit https://github.com/damp11113/PyserSSH +Copyright (C) 2023-2024 damp11113 (MIT) + +Visit https://github.com/damp11113/PyserSSH + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +def clickable_url(url, link_text=""): + return f"\033]8;;{url}\033\\{link_text}\033]8;;\033\\" + +class BasicTextFormatter: + RESET = "\033[0m" + TEXT_COLORS = { + "black": "\033[30m", + "red": "\033[31m", + "green": "\033[32m", + "yellow": "\033[33m", + "blue": "\033[34m", + "magenta": "\033[35m", + "cyan": "\033[36m", + "white": "\033[37m" + } + TEXT_COLOR_LEVELS = { + "light": "\033[1;{}m", # Light color prefix + "dark": "\033[2;{}m" # Dark color prefix + } + BACKGROUND_COLORS = { + "black": "\033[40m", + "red": "\033[41m", + "green": "\033[42m", + "yellow": "\033[43m", + "blue": "\033[44m", + "magenta": "\033[45m", + "cyan": "\033[46m", + "white": "\033[47m" + } + TEXT_ATTRIBUTES = { + "bold": "\033[1m", + "italic": "\033[3m", + "underline": "\033[4m", + "blink": "\033[5m", + "reverse": "\033[7m", + "strikethrough": "\033[9m" + } + + @staticmethod + def format_text(text, color=None, color_level=None, background=None, attributes=None, target_text=''): + formatted_text = "" + start_index = text.find(target_text) + end_index = start_index + len(target_text) if start_index != -1 else len(text) + + if color in BasicTextFormatter.TEXT_COLORS: + if color_level in BasicTextFormatter.TEXT_COLOR_LEVELS: + color_code = BasicTextFormatter.TEXT_COLORS[color] + color_format = BasicTextFormatter.TEXT_COLOR_LEVELS[color_level].format(color_code) + formatted_text += color_format + else: + formatted_text += BasicTextFormatter.TEXT_COLORS[color] + + if background in BasicTextFormatter.BACKGROUND_COLORS: + formatted_text += BasicTextFormatter.BACKGROUND_COLORS[background] + + if attributes in BasicTextFormatter.TEXT_ATTRIBUTES: + formatted_text += BasicTextFormatter.TEXT_ATTRIBUTES[attributes] + + if target_text == "": + formatted_text += text + BasicTextFormatter.RESET + else: + formatted_text += text[:start_index] + text[start_index:end_index] + BasicTextFormatter.RESET + text[end_index:] + + return formatted_text diff --git a/src/PyserSSH/extensions/processbar.py b/src/PyserSSH/extensions/processbar.py index 0c15f0b..b23c6b7 100644 --- a/src/PyserSSH/extensions/processbar.py +++ b/src/PyserSSH/extensions/processbar.py @@ -26,13 +26,16 @@ SOFTWARE. """ # this file is from damp11113-library -from itertools import cycle, islice +from itertools import cycle import math import time from threading import Thread from time import sleep -from ..interactive import Print +from ..system.sysfunc import replace_enter_with_crlf + +def Print(channel, string, start="", end="\n"): + channel.send(replace_enter_with_crlf(start + string + end)) try: from damp11113.utils import get_size_unit2, center_string, TextFormatter, insert_string @@ -47,9 +50,8 @@ steps5 = ['[ ]', '[ -]', '[ --]', '[---]', '[-- ]', '[- ]'] steps6 = ['[ ]', '[ -]', '[ - ]', '[- ]'] class indeterminateStatus: - def __init__(self, client, desc="Loading...", end="[ ✔ ]", timeout=0.1, fail='[ ❌ ]', steps=None): - self.channel = client['channel'] - self.windowsize = client["windowsize"] + def __init__(self, client, desc="Loading...", end="[ OK ]", timeout=0.1, fail='[FAILED]', steps=None): + self.client = client self.desc = desc self.end = end @@ -72,7 +74,7 @@ class indeterminateStatus: for c in cycle(self.steps): if self.done: break - Print(self.channel, f"\r{c} {self.desc}" , end="") + Print(self.client['channel'], f"\r{c} {self.desc}" , end="") sleep(self.timeout) def __enter__(self): @@ -80,23 +82,23 @@ class indeterminateStatus: def stop(self): self.done = True - cols = self.windowsize["width"] - Print(self.channel, "\r" + " " * cols, end="") - Print(self.channel, f"\r{self.end}") + cols = self.client["windowsize"]["width"] + Print(self.client['channel'], "\r" + " " * cols, end="") + Print(self.client['channel'], f"\r{self.end}") def stopfail(self): self.done = True self.fail = True - cols = self.windowsize["width"] - Print(self.channel, "\r" + " " * cols, end="") - Print(self.channel, f"\r{self.faill}") + cols = self.client["windowsize"]["width"] + Print(self.client['channel'], "\r" + " " * cols, end="") + Print(self.client['channel'], f"\r{self.faill}") def __exit__(self, exc_type, exc_value, tb): # handle exceptions with those variables ^ self.stop() class LoadingProgress: - def __init__(self, client, total=100, totalbuffer=None, length=50, fill='█', fillbufferbar='█', desc="Loading...", status="", enabuinstatus=True, end="[ ✔ ]", timeout=0.1, fail='[ ❌ ]', steps=None, unit="it", barbackground="-", shortnum=False, buffer=False, shortunitsize=1000, currentshortnum=False, show=True, indeterminate=False, barcolor="red", bufferbarcolor="white",barbackgroundcolor="black", color=True): + def __init__(self, client, total=100, totalbuffer=None, length=50, fill='█', fillbufferbar='█', desc="Loading...", status="", enabuinstatus=True, end="[ OK ]", timeout=0.1, fail='[FAILED]', steps=None, unit="it", barbackground="-", shortnum=False, buffer=False, shortunitsize=1000, currentshortnum=False, show=True, indeterminate=False, barcolor="red", bufferbarcolor="white",barbackgroundcolor="black", color=True): """ Simple loading progress bar python @param client: from ssh client request @@ -116,8 +118,7 @@ class LoadingProgress: @param barbackgroundcolor: change background color @param color: enable colorful """ - self.channel = client["channel"] - self.windowsize = client["windowsize"] + self.client = client self.desc = desc self.end = end @@ -278,7 +279,7 @@ class LoadingProgress: self.currentprint = f"{c} {self.desc} | --%|{bar}| {elapsed_formatted} | {self.status}" if self.printed: - Print(self.channel, f"\r{self.currentprint}", end="") + Print(self.client["channel"], f"\r{self.currentprint}", end="") sleep(self.timeout) @@ -287,16 +288,16 @@ class LoadingProgress: def stop(self): self.done = True - cols = self.windowsize["width"] - Print(self.channel, "\r" + " " * cols, end="") - Print(self.channel, f"\r{self.end}") + cols = self.client["windowsize"]["width"] + Print(self.client["channel"], "\r" + " " * cols, end="") + Print(self.client["channel"], f"\r{self.end}") def stopfail(self): self.done = True self.fail = True - cols = self.windowsize["width"] - Print(self.channel, "\r" + " " * cols, end="") - Print(self.channel, f"\r{self.faill}") + cols = self.client["windowsize"]["width"] + Print(self.client["channel"], "\r" + " " * cols, end="") + Print(self.client["channel"], f"\r{self.faill}") def __exit__(self, exc_type, exc_value, tb): # handle exceptions with those variables ^ diff --git a/src/PyserSSH/extensions/ptop.py b/src/PyserSSH/extensions/ptop.py new file mode 100644 index 0000000..3c97851 --- /dev/null +++ b/src/PyserSSH/extensions/ptop.py @@ -0,0 +1,30 @@ +""" +PyserSSH - A SSH server. For more info visit https://github.com/damp11113/PyserSSH +Copyright (C) 2023-2024 damp11113 (MIT) + +Visit https://github.com/damp11113/PyserSSH + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +class PTOP: + def __init__(self, client, interval=1): + pass # working \ No newline at end of file diff --git a/src/PyserSSH/interactive.py b/src/PyserSSH/interactive.py index a777620..f7522ab 100644 --- a/src/PyserSSH/interactive.py +++ b/src/PyserSSH/interactive.py @@ -24,32 +24,43 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import re +import socket from .system.sysfunc import replace_enter_with_crlf -def Send(channel, string, ln=True): - if ln: - channel.send(replace_enter_with_crlf(string + "\n")) - else: - channel.send(replace_enter_with_crlf(string)) - -def Print(channel, string, start="", end="\n"): - channel.send(replace_enter_with_crlf(start + string + end)) - -def Clear(client): +def Send(client, string, ln=True): channel = client["channel"] + if ln: + channel.send(replace_enter_with_crlf(str(string) + "\n")) + else: + channel.send(replace_enter_with_crlf(str(string))) + +def Clear(client, oldclear=False): sx, sy = client["windowsize"]["width"], client["windowsize"]["height"] - for x in range(sx): - for y in range(sy): - Send(channel, '\b \b', ln=False) # Send newline after each line + if oldclear: + for x in range(sy): + Send(client, '\b \b' * sx, ln=False) # Send newline after each line + else: + Send(client, "\033[3J", ln=False) + Send(client, "\033[1J", ln=False) + Send(client, "\033[H", ln=False) + +def Title(client, title): + Send(client, f"\033]0;{title}\007", ln=False) + +def wait_input(client, prompt="", defaultvalue=None, cursor_scroll=False, echo=True, password=False, passwordmask=b"*", noabort=False, timeout=0): + channel = client["channel"] -def wait_input(channel, prompt="", defaultvalue=None, cursor_scroll=False, echo=True, password=False, passwordmask=b"*", noabort=False): channel.send(replace_enter_with_crlf(prompt)) buffer = bytearray() cursor_position = 0 + if timeout != 0: + channel.settimeout(timeout) + try: while True: byte = channel.recv(1) @@ -90,10 +101,17 @@ def wait_input(channel, prompt="", defaultvalue=None, cursor_scroll=False, echo= channel.sendall(b'\r\n') + except socket.timeout: + output = "" except Exception: raise + else: + output = buffer.decode('utf-8') - output = buffer.decode('utf-8') + if timeout != 0: + channel.settimeout(0) + channel.setblocking(False) + channel.sendall(b'\r\n') # Return default value if specified and no input given if defaultvalue is not None and not output.strip(): @@ -101,24 +119,86 @@ def wait_input(channel, prompt="", defaultvalue=None, cursor_scroll=False, echo= else: return output -def wait_inputkey(channel, prompt="", raw=False): +def wait_inputkey(client, prompt="", raw=False, timeout=0): + channel = client["channel"] + if prompt != "": channel.send(replace_enter_with_crlf(prompt)) + if timeout != 0: + channel.settimeout(timeout) + try: byte = channel.recv(10) - if not raw: - if not byte or byte == b'\x04': - raise EOFError() + if not byte or byte == b'\x04': + raise EOFError() - elif byte == b'\t': + if not raw: + if bool(re.compile(b'\x1b\[[0-9;]*[mGK]').search(byte)): pass - return byte.decode('utf-8') + return byte.decode('utf-8') # only regular character else: return byte + except socket.timeout: + channel.settimeout(0) + channel.setblocking(False) + channel.send("\r\n") + return None except Exception: - raise \ No newline at end of file + if timeout != 0: + channel.settimeout(0) + channel.setblocking(False) + raise + +def wait_choose(client, choose, prompt="", timeout=0): + channel = client["channel"] + + chooseindex = 0 + chooselen = len(choose) - 1 + + if timeout != 0: + channel.settimeout(timeout) + + while True: + try: + tempchooselist = choose.copy() + + tempchooselist[chooseindex] = "[" + tempchooselist[chooseindex] + "]" + + exported = " ".join(tempchooselist) + + if prompt.strip() == "": + Send(channel, f'\r{exported}', ln=False) + else: + Send(channel, f'\r{prompt}{exported}', ln=False) + + keyinput = wait_inputkey(channel, raw=True) + + if keyinput == b'\r': # Enter key + Send(channel, "\033[K") + return chooseindex + elif keyinput == b'\x03': # ' ctrl+c' key for cancel + Send(channel, "\033[K") + return None + elif keyinput == b'\x1b[D': # Up arrow key + chooseindex -= 1 + if chooseindex < 0: + chooseindex = 0 + elif keyinput == b'\x1b[C': # Down arrow key + chooseindex += 1 + if chooseindex > chooselen: + chooseindex = chooselen + except socket.timeout: + channel.settimeout(0) + channel.setblocking(False) + channel.send("\r\n") + return chooseindex + except Exception: + if timeout != 0: + channel.settimeout(0) + channel.setblocking(False) + raise \ No newline at end of file diff --git a/src/PyserSSH/server.py b/src/PyserSSH/server.py index 52a46ba..16ee3da 100644 --- a/src/PyserSSH/server.py +++ b/src/PyserSSH/server.py @@ -28,7 +28,6 @@ SOFTWARE. import os import time import paramiko -import socket import threading from functools import wraps import logging @@ -37,25 +36,16 @@ from .system.SFTP import SSHSFTPServer from .system.interface import Sinterface from .interactive import * from .system.inputsystem import expect -from .system.info import system_banner - -try: - os.environ["pyserssh_systemmessage"] -except: - os.environ["pyserssh_systemmessage"] = "YES" - -if os.environ["pyserssh_systemmessage"] == "YES": - print(system_banner) +from .system.info import system_banner, __version__ #paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow(2, 22) sftpclient = ["WinSCP", "Xplore"] logger = logging.getLogger("PyserSSH") -logger.disabled = True class Server: - def __init__(self, accounts, system_message=True, timeout=0, disable_scroll_with_arrow=True, sftp=True, sftproot=os.getcwd(), system_commands=False, compression=True, usexternalauth=False, history=True): + def __init__(self, accounts, system_message=True, disable_scroll_with_arrow=True, sftp=True, sftproot=os.getcwd(), system_commands=True, compression=True, usexternalauth=False, history=True, inputsystem=True, XHandler=None, title=f"PyserSSH v{__version__}", inspeed=32768): """ A simple SSH server """ @@ -64,7 +54,6 @@ class Server: self.client_handlers = {} # Dictionary to store event handlers for each client self.current_users = {} # Dictionary to store current_user for each connected client self.accounts = accounts - self.timeout = timeout self.disable_scroll_with_arrow = disable_scroll_with_arrow self.sftproot = sftproot self.sftpena = sftp @@ -72,6 +61,10 @@ class Server: self.compressena = compression self.usexternalauth = usexternalauth self.history = history + self.enainputsystem = inputsystem + self.XHandler = XHandler + self.title = title + self.inspeed = inspeed self.system_banner = system_banner @@ -111,6 +104,7 @@ class Server: SSHSFTPServer.CLIENTHANDELES = self.client_handlers bh_session.set_subsystem_handler('sftp', paramiko.SFTPServer, SSHSFTPServer) + if self.compressena: bh_session.use_compression(True) else: @@ -120,6 +114,8 @@ class Server: bh_session.packetizer.REKEY_BYTES = pow(2, 40) bh_session.packetizer.REKEY_PACKETS = pow(2, 40) + bh_session.default_max_packet_size = self.inspeed + server = Sinterface(self) bh_session.start_server(server=server) @@ -127,74 +123,78 @@ class Server: channel = bh_session.accept() - if self.timeout != 0: - channel.settimeout(self.timeout) - if channel is None: logger.warning("no channel") try: logger.info("user authenticated") - client_address = channel.getpeername() # Get client's address to identify the user - if client_address not in self.client_handlers: + peername = channel.getpeername() + if peername not in self.client_handlers: # Create a new event handler for this client if it doesn't exist - self.client_handlers[client_address] = { + self.client_handlers[peername] = { "event_handlers": {}, "current_user": None, "channel": channel, # Associate the channel with the client handler, "last_activity_time": None, "connecttype": None, "last_login_time": None, - "windowsize": {} + "windowsize": {}, + "x11": {} } - client_handler = self.client_handlers[client_address] + client_handler = self.client_handlers[peername] client_handler["current_user"] = server.current_user client_handler["channel"] = channel # Update the channel attribute for the client handler client_handler["last_activity_time"] = time.time() client_handler["last_login_time"] = time.time() - peername = channel.getpeername() - - - #byte = channel.recv(1) - #if byte == b'\x00': - #if not any(bh_session.remote_version.split("-")[2].startswith(prefix) for prefix in sftpclient): if not channel.out_window_size == bh_session.default_window_size: + while self.client_handlers[channel.getpeername()]["windowsize"] == {}: + pass + + channel.send(f"\033]0;{self.title}\007".encode()) + if self.sysmess: channel.sendall(replace_enter_with_crlf(self.system_banner)) channel.sendall(replace_enter_with_crlf("\n")) - while self.client_handlers[channel.getpeername()]["windowsize"] == {}: - pass - - self._handle_event("connect", channel, self.client_handlers[channel.getpeername()]) + try: + self._handle_event("connect", self.client_handlers[channel.getpeername()]) + except Exception as e: + self._handle_event("error", self.client_handlers[channel.getpeername()], e) client_handler["connecttype"] = "ssh" - try: - channel.send(replace_enter_with_crlf(self.accounts.get_prompt(self.client_handlers[channel.getpeername()]["current_user"]) + " ").encode('utf-8')) - while True: - expect(self, channel, peername) - except KeyboardInterrupt: - channel.close() - bh_session.close() - except Exception as e: - logger.error(e) - finally: - channel.close() + if self.enainputsystem: + try: + if self.accounts.get_user_timeout(self.client_handlers[channel.getpeername()]["current_user"]) != 0: + channel.settimeout(self.accounts.get_user_timeout(self.client_handlers[channel.getpeername()]["current_user"])) + + channel.send(replace_enter_with_crlf(self.accounts.get_prompt(self.client_handlers[channel.getpeername()]["current_user"]) + " ").encode('utf-8')) + while True: + expect(self, channel, peername) + except KeyboardInterrupt: + self._handle_event("disconnected", self.client_handlers[peername]["current_user"]) + channel.close() + bh_session.close() + except Exception as e: + self._handle_event("syserror", client_handler, e) + logger.error(e) + finally: + self._handle_event("disconnected", self.client_handlers[peername]["current_user"]) + channel.close() else: if self.sftpena: if self.accounts.get_user_sftp_allow(self.client_handlers[channel.getpeername()]["current_user"]): client_handler["connecttype"] = "sftp" - self._handle_event("connectsftp", channel, self.client_handlers[channel.getpeername()]) + self._handle_event("connectsftp", self.client_handlers[channel.getpeername()]) else: - del self.client_handlers[peername] + self._handle_event("disconnected", self.client_handlers[peername]["current_user"]) channel.close() else: - del self.client_handlers[peername] + self._handle_event("disconnected", self.client_handlers[peername]["current_user"]) channel.close() except: - raise + pass def stop_server(self): logger.info("Stopping the server...") @@ -233,37 +233,37 @@ class Server: for peername, client_handler in list(self.client_handlers.items()): if client_handler["current_user"] == username: channel = client_handler.get("channel") + self._handle_event("disconnected", channel.getpeername(), self.client_handlers[channel.getpeername()]["current_user"]) if reason is None: if channel: channel.close() - del self.client_handlers[peername] logger.info(f"User '{username}' has been kicked.") else: if channel: Send(channel, f"You have been disconnected for {reason}") channel.close() - del self.client_handlers[peername] logger.info(f"User '{username}' has been kicked by reason {reason}.") def kickbypeername(self, peername, reason=None): client_handler = self.client_handlers.get(peername) if client_handler: channel = client_handler.get("channel") + self._handle_event("disconnected", channel.getpeername(), self.client_handlers[channel.getpeername()]["current_user"]) if reason is None: if channel: channel.close() - del self.client_handlers[peername] logger.info(f"peername '{peername}' has been kicked.") else: if channel: Send(channel, f"You have been disconnected for {reason}") channel.close() - del self.client_handlers[peername] logger.info(f"peername '{peername}' has been kicked by reason {reason}.") def kickall(self, reason=None): for peername, client_handler in self.client_handlers.items(): channel = client_handler.get("channel") + self._handle_event("disconnected", channel.getpeername(), self.client_handlers[channel.getpeername()]["current_user"]) + if reason is None: if channel: channel.close() @@ -292,6 +292,7 @@ class Server: for client_handler in self.client_handlers.values(): if client_handler.get("current_user") == username: channel = client_handler.get("channel") + if channel: try: # Send the message to the specific client diff --git a/src/PyserSSH/system/info.py b/src/PyserSSH/system/info.py index 9ee469c..84edfcd 100644 --- a/src/PyserSSH/system/info.py +++ b/src/PyserSSH/system/info.py @@ -25,10 +25,10 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ -version = "4.0" +__version__ = "4.2" system_banner = ( - f"\033[36mPyserSSH V{version} \033[0m\n" + f"\033[36mPyserSSH V{__version__} \033[0m\n" #"\033[33m!!Warning!! This is Testing Version of PyserSSH \033[0m\n" "\033[35mUse Putty and WinSCP (SFTP) for best experience \033[0m" ) diff --git a/src/PyserSSH/system/inputsystem.py b/src/PyserSSH/system/inputsystem.py index 5213275..6b23479 100644 --- a/src/PyserSSH/system/inputsystem.py +++ b/src/PyserSSH/system/inputsystem.py @@ -24,28 +24,29 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ - +import socket import time import logging +import shlex +import traceback from .sysfunc import replace_enter_with_crlf from .syscom import systemcommand logger = logging.getLogger("PyserSSH") -logger.disabled = True def expect(self, chan, peername, echo=True): buffer = bytearray() cursor_position = 0 + outindexall = 0 history_index_position = 0 # Initialize history index position outside the loop currentuser = self.client_handlers[chan.getpeername()] try: while True: byte = chan.recv(1) - self._handle_event("onrawtype", chan, byte, self.client_handlers[chan.getpeername()]) + self._handle_event("onrawtype", self.client_handlers[chan.getpeername()], byte) - if self.timeout != 0: - self.client_handlers[chan.getpeername()]["last_activity_time"] = time.time() + self.client_handlers[chan.getpeername()]["last_activity_time"] = time.time() if not byte or byte == b'\x04': raise EOFError() @@ -57,7 +58,10 @@ def expect(self, chan, peername, echo=True): if cursor_position > 0: buffer = buffer[:cursor_position - 1] + buffer[cursor_position:] cursor_position -= 1 + outindexall -= 1 chan.sendall(b"\b \b") + else: + chan.sendall(b"\x07") elif byte == b"\x1b" and chan.recv(1) == b'[': arrow_key = chan.recv(1) if not self.disable_scroll_with_arrow: @@ -85,13 +89,13 @@ def expect(self, chan, peername, echo=True): # Update buffer and cursor position with the new command buffer = bytearray(command.encode('utf-8')) cursor_position = len(buffer) + outindexall = cursor_position # Print the updated buffer chan.sendall(buffer) history_index_position += 1 - - if arrow_key == b'B': + elif arrow_key == b'B': if history_index_position != -1: if history_index_position == 0: command = self.accounts.get_lastcommand(currentuser["current_user"]) @@ -105,6 +109,7 @@ def expect(self, chan, peername, echo=True): # Update buffer and cursor position with the new command buffer = bytearray(command.encode('utf-8')) cursor_position = len(buffer) + outindexall = cursor_position # Print the updated buffer chan.sendall(buffer) @@ -115,6 +120,7 @@ def expect(self, chan, peername, echo=True): buffer.clear() cursor_position = 0 + outindexall = 0 history_index_position -= 1 @@ -122,36 +128,71 @@ def expect(self, chan, peername, echo=True): break else: history_index_position = -1 + + self._handle_event("ontype", self.client_handlers[chan.getpeername()], byte) + if echo: + if outindexall != cursor_position: + chan.sendall(byte + buffer[cursor_position:]) + chan.sendall(f"\033[{cursor_position}G".encode()) + else: + chan.sendall(byte) + + #print(buffer[:cursor_position], byte, buffer[cursor_position:]) buffer = buffer[:cursor_position] + byte + buffer[cursor_position:] cursor_position += 1 - self._handle_event("ontype", chan, byte, self.client_handlers[chan.getpeername()]) - if echo: - chan.sendall(byte) + outindexall += 1 if echo: chan.sendall(b'\r\n') - command = str(buffer.decode('utf-8')) - - try: - if self.enasyscom: - systemcommand(currentuser, command) - - self._handle_event("command", chan, command, currentuser) - except Exception as e: - self._handle_event("error", chan, e, currentuser) + command = str(buffer.decode('utf-8')).strip() if self.history and command.strip() != "" and self.accounts.get_lastcommand(currentuser["current_user"]) != command: self.accounts.add_history(currentuser["current_user"], command) + if command.strip() != "": + if self.accounts.get_user_timeout(self.client_handlers[chan.getpeername()]["current_user"]) != 0: + chan.settimeout(0) + chan.setblocking(False) + + try: + if self.enasyscom: + sct = systemcommand(currentuser, command) + else: + sct = False + + if not sct: + if self.XHandler != None: + self._handle_event("beforexhandler", currentuser, command) + + self.XHandler.call(currentuser, command) + + self._handle_event("afterxhandler", currentuser, command) + else: + self._handle_event("command", currentuser, command) + + except Exception as e: + self._handle_event("error", currentuser, e) + try: chan.send(replace_enter_with_crlf(self.accounts.get_prompt(currentuser["current_user"]) + " ").encode('utf-8')) except: logger.error("Send error") + if self.accounts.get_user_timeout(self.client_handlers[chan.getpeername()]["current_user"]) != 0: + chan.settimeout(self.accounts.get_user_timeout(self.client_handlers[chan.getpeername()]["current_user"])) + + except socket.timeout: + chan.settimeout(0) + chan.setblocking(False) + chan.close() except Exception as e: logger.error(str(e)) finally: - if not byte: - logger.info(f"{peername} is disconnected") - self._handle_event("disconnected", peername, self.client_handlers[peername]["current_user"]) \ No newline at end of file + try: + if not byte: + logger.info(f"{peername} is disconnected") + self._handle_event("disconnected", self.client_handlers[peername]["current_user"]) + except: + logger.info(f"{peername} is disconnected by timeout") + self._handle_event("timeout", self.client_handlers[peername]["current_user"]) \ No newline at end of file diff --git a/src/PyserSSH/system/interface.py b/src/PyserSSH/system/interface.py index 53833c0..7a05bda 100644 --- a/src/PyserSSH/system/interface.py +++ b/src/PyserSSH/system/interface.py @@ -48,6 +48,7 @@ class Sinterface(paramiko.ServerInterface): return paramiko.AUTH_SUCCESSFUL else: if self.serverself._handle_event("auth", data): + self.current_user = username # Store the current user upon successful authentication return paramiko.AUTH_SUCCESSFUL else: return paramiko.AUTH_FAILED @@ -67,8 +68,11 @@ class Sinterface(paramiko.ServerInterface): "pixelwidth": pixelwidth, "pixelheight": pixelheight, } - self.serverself.client_handlers[channel.getpeername()]["windowsize"] = data2 - self.serverself._handle_event("connectpty", channel, data, self.serverself.client_handlers[channel.getpeername()]) + try: + self.serverself.client_handlers[channel.getpeername()]["windowsize"] = data2 + self.serverself._handle_event("connectpty", self.serverself.client_handlers[channel.getpeername()], data) + except: + pass return True @@ -76,6 +80,18 @@ class Sinterface(paramiko.ServerInterface): return True def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number): + data = { + "single_connection": single_connection, + "auth_protocol": auth_protocol, + "auth_cookie": auth_cookie, + "screen_number": screen_number + } + try: + self.serverself.client_handlers[channel.getpeername()]["x11"] = data + self.serverself._handle_event("connectx11", self.serverself.client_handlers[channel.getpeername()], data) + except: + pass + return True def check_channel_window_change_request(self, channel, width: int, height: int, pixelwidth: int, pixelheight: int): @@ -86,4 +102,4 @@ class Sinterface(paramiko.ServerInterface): "pixelheight": pixelheight } self.serverself.client_handlers[channel.getpeername()]["windowsize"] = data - self.serverself._handle_event("resized", channel, data, self.serverself.client_handlers[channel.getpeername()]) + self.serverself._handle_event("resized", self.serverself.client_handlers[channel.getpeername()], data) diff --git a/src/PyserSSH/system/syscom.py b/src/PyserSSH/system/syscom.py index e61687b..f1fa82c 100644 --- a/src/PyserSSH/system/syscom.py +++ b/src/PyserSSH/system/syscom.py @@ -17,48 +17,33 @@ The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WA3RRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import shlex from ..interactive import * -from .info import version - -try: - from damp11113.info import pyofetch - from damp11113.utils import TextFormatter - damp11113lib = True -except: - damp11113lib = False def systemcommand(client, command): channel = client["channel"] - if command == "info": - if damp11113lib: - Send(channel, "Please wait...", ln=False) - pyf = pyofetch().info(f"{TextFormatter.format_text('PyserSSH Version', color='yellow')}: {TextFormatter.format_text(version, color='cyan')}") - Send(channel, " \r", ln=False) - for i in pyf: - Send(channel, i) - else: - Send(channel, "damp11113-library not available for use this command") - elif command == "whoami": + if command == "whoami": Send(channel, client["current_user"]) + return True + elif command.startswith("title"): + args = shlex.split(command) + title = args[1] + Title(client, title) + return True elif command == "exit": channel.close() + return True elif command == "clear": Clear(client) - elif command == "fullscreentest": - Clear(client) - sx, sy = client["windowsize"]["width"], client["windowsize"]["height"] - - for x in range(sx): - for y in range(sy): - Send(channel, 'H', ln=False) # Send newline after each line + return True else: return False \ No newline at end of file diff --git a/src/PyserSSH/system/sysfunc.py b/src/PyserSSH/system/sysfunc.py index fbc8bde..ae12805 100644 --- a/src/PyserSSH/system/sysfunc.py +++ b/src/PyserSSH/system/sysfunc.py @@ -28,4 +28,22 @@ SOFTWARE. def replace_enter_with_crlf(input_string): if '\n' in input_string: input_string = input_string.replace('\n', '\r\n') - return input_string \ No newline at end of file + return input_string + + +def text_centered_screen(text, screen_width, screen_height, spacecharacter=" "): + screen = [] + lines = text.split("\n") + padding_vertical = (screen_height - len(lines)) // 2 # Calculate vertical padding + + for y in range(screen_height): + line = "" + if padding_vertical <= y < padding_vertical + len(lines): # Check if it's within the range of the text lines + index = y - padding_vertical # Get the corresponding line index + padding_horizontal = (screen_width - len(lines[index])) // 2 # Calculate horizontal padding for each line + line += spacecharacter * padding_horizontal + lines[index] + spacecharacter * padding_horizontal + else: # Fill other lines with space characters + line += spacecharacter * screen_width + screen.append(line) + + return "\n".join(screen) \ No newline at end of file