update 4.2 (no pypi)

This commit is contained in:
dharm pimsen 2024-03-31 14:04:53 +07:00
parent 527094f2bb
commit c9ffdf5201
16 changed files with 956 additions and 189 deletions

View File

@ -2,16 +2,21 @@ import os
import socket
import time
import shlex
from damp11113 import TextFormatter
from damp11113 import TextFormatter, sort_files, allfiles
import cv2
import traceback
import requests
from bs4 import BeautifulSoup
from PyserSSH import Server, AccountManager, Send, wait_input, wait_inputkey
from PyserSSH import Server, AccountManager, Send, wait_input, wait_inputkey, wait_choose, Clear, Title
from PyserSSH.system.info import system_banner
from PyserSSH.extensions.processbar import indeterminateStatus, LoadingProgress
from PyserSSH.extensions.dialog import MenuDialog, TextDialog, TextInputDialog
from PyserSSH.extensions.moredisplay import clickable_url
useraccount = AccountManager()
useraccount.add_account("admin", "") # create user without password
useraccount.add_account("test", "test") # create user without password
ssh = Server(useraccount, system_commands=True, system_message=False)
@ -22,89 +27,115 @@ Please ensure you have proper authorization before proceeding."""
Authorizedmessage = """You have successfully connected to the server.
Enjoy your session and remember to follow security protocols."""
loading = ["PyserSSH", "Extensions"]
@ssh.on_user("connect")
def connect(channel, client):
def connect(client):
Title(client, "PyserSSH")
#print(client["windowsize"])
if client['current_user'] == "":
warningmessage = nonamewarning
else:
warningmessage = Authorizedmessage
wm = f"""*********************************************************************************************
Hello {client['current_user']},
{warningmessage}
Visit: {clickable_url("https://damp11113.xyz", "DPCloudev")}
{system_banner}
*********************************************************************************************"""
if client['current_user'] != "test":
for i in loading:
P = indeterminateStatus(client, f"Starting {i}", f"[ OK ] Started {i}")
P.start()
time.sleep(len(i) / 20)
P.stop()
Di1 = TextDialog(client, "PyserSSH Extension", "Welcome!\n to PyserSSH test server")
Di1.render()
for char in wm:
Send(channel, char, ln=False)
time.sleep(0.005) # Adjust the delay as needed
Send(channel, '\n') # Send newline after each line
Send(client, char, ln=False)
# time.sleep(0.005) # Adjust the delay as needed
Send(client, '\n') # Send newline after each line
@ssh.on_user("error")
def error(channel, error, client):
def error(client, error):
if isinstance(error, socket.error):
pass
else:
Send(channel, traceback.format_exc())
Send(client, traceback.format_exc())
#@ssh.on_user("onrawtype")
#def onrawtype(client, key):
# print(key)
@ssh.on_user("command")
def command(channel, command: str, client):
def command(client, command: str):
if command == "passtest":
user = wait_input(channel, "username: ")
password = wait_input(channel, "password: ", password=True)
Send(channel, f"username: {user} | password: {password}")
user = wait_input(client, "username: ")
password = wait_input(client, "password: ", password=True)
Send(client, f"username: {user} | password: {password}")
elif command == "colortest":
for i in range(0, 255, 5):
Send(channel, TextFormatter.format_text_truecolor(" ", background=f"{i};0;0"), ln=False)
Send(channel, "")
Send(client, TextFormatter.format_text_truecolor(" ", background=f"{i};0;0"), ln=False)
Send(client, "")
for i in range(0, 255, 5):
Send(channel, TextFormatter.format_text_truecolor(" ", background=f"0;{i};0"), ln=False)
Send(channel, "")
Send(client, TextFormatter.format_text_truecolor(" ", background=f"0;{i};0"), ln=False)
Send(client, "")
for i in range(0, 255, 5):
Send(channel, TextFormatter.format_text_truecolor(" ", background=f"0;0;{i}"), ln=False)
Send(channel, "")
Send(channel, "TrueColors 24-Bit")
Send(client, TextFormatter.format_text_truecolor(" ", background=f"0;0;{i}"), ln=False)
Send(client, "")
Send(client, "TrueColors 24-Bit")
elif command == "keytest":
user = wait_inputkey(channel, "press any key", raw=True)
Send(channel, "")
Send(channel, f"key: {user}")
user = wait_inputkey(client, "press any key", raw=True)
Send(client, "")
Send(client, f"key: {user}")
for i in range(10):
user = wait_inputkey(channel, "press any key", raw=True)
Send(channel, "")
Send(channel, f"key: {user}")
user = wait_inputkey(client, "press any key", raw=True)
Send(client, "")
Send(client, f"key: {user}")
elif command.startswith("typing"):
args = shlex.split(command)
messages = args[1]
speed = float(args[2])
for w in messages:
Send(channel, w, ln=False)
Send(client, w, ln=False)
time.sleep(speed)
Send(channel, "")
elif command == "renimtest":
image = cv2.imread(r"opensource.png", cv2.IMREAD_COLOR)
Send(client, "")
elif command.startswith("renimtest"):
args = shlex.split(command)
Clear(client)
image = cv2.imread(f"opensource.png", cv2.IMREAD_COLOR)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
width, height = client['windowsize']["width"], client['windowsize']["height"]
width, height = client['windowsize']["width"]-5, client['windowsize']["height"]-5
# resize image
resized = cv2.resize(image, (width, height))
t = ""
# Scan all pixels
for y in range(0, height):
for x in range(0, width):
pixel_color = resized[y, x]
#PyserSSH.Send(channel, f"Pixel color at ({x}, {y}): {pixel_color}")
# PyserSSH.Send(channel, f"Pixel color at ({x}, {y}): {pixel_color}")
if pixel_color.tolist() != [0, 0, 0]:
Send(channel, TextFormatter.format_text_truecolor(" ", background=f"{pixel_color[0]};{pixel_color[1]};{pixel_color[2]}"), ln=False)
t += TextFormatter.format_text_truecolor(" ", background=f"{pixel_color[0]};{pixel_color[1]};{pixel_color[2]}")
else:
Send(channel, " ", ln=False)
t += " "
Send(client, t, ln=False)
Send(client, "")
t = ""
Send(channel, "")
elif command == "errortest":
raise Exception("hello error")
elif command == "inloadtest":
@ -117,8 +148,50 @@ def command(channel, command: str, client):
l.start()
for i in range(101):
l.current = i
l.desc = "loading..."
l.status = f"loading {i}"
time.sleep(0.05)
l.stop()
elif command == "dialogtest":
Di1 = TextDialog(client, "PyserSSH Extension", "Hello Dialog!")
Di1.render()
elif command == "dialogtest2":
Di2 = MenuDialog(client, ["H1", "H2", "H3"], "PyserSSH Extension", "Hello world")
Di2.render()
Send(client, f"selected index: {Di2.output()}")
elif command == "dialogtest3":
Di3 = TextInputDialog(client, "PyserSSH Extension")
Di3.render()
Send(client, f"input: {Di3.output()}")
elif command == "passdialogtest3":
Di3 = TextInputDialog(client, "PyserSSH Extension", inputtitle="Password Here", password=True)
Di3.render()
Send(client, f"password: {Di3.output()}")
elif command == "choosetest":
cindex = wait_choose(client, ["H1", "H2", "H3"], "select: ")
Send(client, f"selected index: {cindex}")
elif command.startswith("vieweb"):
args = shlex.split(command)
url = args[1]
loading = indeterminateStatus(client, desc=f"requesting {url}...")
loading.start()
try:
content = requests.get(url).content
except:
loading.stopfail()
return
loading.stop()
loading = indeterminateStatus(client, desc=f"parsing html {url}...")
loading.start()
try:
soup = BeautifulSoup(content, 'html.parser')
# Extract only the text content
text_content = soup.get_text()
except:
loading.stopfail()
return
loading.stop()
Di1 = TextDialog(client, url, text_content)
Di1.render()
ssh.run(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'private_key.pem'))

View File

@ -5,7 +5,7 @@ with open('README.md', 'r', encoding='utf-8') as f:
setup(
name='PyserSSH',
version='4.0',
version='4.2.1', # update pypi (no update for 4.3)
license='MIT',
author='damp11113',
author_email='damp51252@gmail.com',
@ -17,5 +17,8 @@ setup(
long_description_content_type='text/markdown',
install_requires=[
"paramiko"
]
],
extras_require={
"fullsyscom": ["damp11113"]
}
)

View File

@ -25,6 +25,44 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
"""
note
ansi cursor arrow
up - \x1b[A
down - \x1b[B
left - \x1b[D
right - \x1b[C
https://en.wikipedia.org/wiki/ANSI_escape_code
"""
import os
import logging
from .interactive import *
from .server import Server
from .account import AccountManager
from .system.info import system_banner
try:
os.environ["pyserssh_systemmessage"]
except:
os.environ["pyserssh_systemmessage"] = "YES"
try:
os.environ["pyserssh_enable_damp11113"]
except:
os.environ["pyserssh_enable_damp11113"] = "YES"
try:
os.environ["pyserssh_log"]
except:
os.environ["pyserssh_log"] = "NO"
if os.environ["pyserssh_log"]:
logger = logging.getLogger("PyserSSH")
logger.disabled = True
if os.environ["pyserssh_systemmessage"] == "YES":
print(system_banner)

View File

@ -111,6 +111,15 @@ class AccountManager:
return self.accounts[username]["sftp_path"]
return ""
def get_user_timeout(self, username):
if username in self.accounts and "timeout" in self.accounts[username]:
return self.accounts[username]["timeout"]
return 0
def set_user_timeout(self, username, timeout=0):
if username in self.accounts:
self.accounts[username]["timeout"] = timeout
def add_history(self, username, command):
if not self.anyuser:
if username in self.accounts:

View File

@ -0,0 +1,178 @@
import inspect
import shlex
from ..interactive import Send
class XHandler:
def __init__(self, enablehelp=True, showusageonworng=True):
self.handlers = {}
self.categories = {}
self.enablehelp = enablehelp
self.showusageonworng = showusageonworng
self.commandnotfound = None
def command(self, category=None, name=None, aliases=None):
def decorator(func):
nonlocal name, category
if name is None:
name = func.__name__
command_name = name
command_description = func.__doc__ # Read the docstring
parameters = inspect.signature(func).parameters
command_args = []
for param in list(parameters.values())[1:]: # Exclude first parameter (client)
if param.default != inspect.Parameter.empty: # Check if parameter has default value
if param.annotation == bool:
command_args.append(f"-{param.name}")
else:
command_args.append((f"{param.name}", param.default))
else:
command_args.append(param.name)
if category is None:
category = 'No Category'
if category not in self.categories:
self.categories[category] = {}
self.categories[category][command_name] = {
'description': command_description.strip() if command_description else "",
'args': command_args
}
self.handlers[command_name] = func
if aliases:
for alias in aliases:
self.handlers[alias] = func
return func
return decorator
def call(self, client, command_string):
tokens = shlex.split(command_string)
command_name = tokens[0]
args = tokens[1:]
if command_name == "help" and self.enablehelp:
if args:
Send(client, self.get_help_command_info(args[0]))
else:
Send(client, self.get_help_message())
Send(client, "Type 'help <command>' for more info on a command.")
else:
if command_name in self.handlers:
command_func = self.handlers[command_name]
command_args = inspect.signature(command_func).parameters
if len(args) % 2 != 0 and not args[0].startswith("--"):
if self.showusageonworng:
Send(client, self.get_help_command_info(command_name))
else:
Send(client, f"Invalid number of arguments for command '{command_name}'.")
return
# Parse arguments
final_args = {}
for i in range(0, len(args), 2):
if args[i].startswith("--"):
arg_name = args[i].lstrip('--')
if arg_name not in command_args:
if self.showusageonworng:
Send(client, self.get_help_command_info(command_name))
else:
Send(client, f"Invalid flag '{arg_name}' for command '{command_name}'.")
return
try:
args[i + 1]
except:
pass
else:
if self.showusageonworng:
Send(client, self.get_help_command_info(command_name))
else:
Send(client, f"value '{args[i + 1]}' not available for '{arg_name}' flag for command '{command_name}'.")
return
final_args[arg_name] = True
else:
arg_name = args[i].lstrip('-')
if arg_name not in command_args:
if self.showusageonworng:
Send(client, self.get_help_command_info(command_name))
else:
Send(client, f"Invalid argument '{arg_name}' for command '{command_name}'.")
return
arg_value = args[i + 1]
final_args[arg_name] = arg_value
# Match parsed arguments to function parameters
final_args_list = []
for param in list(command_args.values())[1:]: # Skip client argument
if param.name in final_args:
final_args_list.append(final_args[param.name])
elif param.default != inspect.Parameter.empty:
final_args_list.append(param.default)
else:
if self.showusageonworng:
Send(client, self.get_help_command_info(command_name))
else:
Send(client, f"Missing required argument '{param.name}' for command '{command_name}'")
return
return command_func(client, *final_args_list)
else:
if self.commandnotfound:
self.commandnotfound(client, command_name)
return
else:
Send(client, f"{command_name} not found")
return
def get_command_info(self, command_name):
found_command = None
for category, commands in self.categories.items():
if command_name in commands:
found_command = commands[command_name]
break
else:
for cmd, cmd_info in commands.items():
if 'aliases' in cmd_info and command_name in cmd_info['aliases']:
found_command = cmd_info
break
if found_command:
break
if found_command:
return {
'name': command_name,
'description': found_command['description'].strip() if found_command['description'] else "",
'args': found_command['args'],
'category': category
}
def get_help_command_info(self, command):
command_info = self.get_command_info(command)
aliases = command_info.get('aliases', [])
help_message = f"{command_info['name']}"
if aliases:
help_message += f" ({', '.join(aliases)})"
help_message += "\n"
help_message += f"{command_info['description']}\n"
help_message += f"Usage: {command_info['name']}"
for arg in command_info['args']:
if isinstance(arg, tuple):
if isinstance(arg[1], bool):
help_message += f" [--{arg[0]}]"
else:
help_message += f" [-{arg[0]} {arg[1]}]"
else:
help_message += f" <{arg}>"
return help_message
def get_help_message(self):
help_message = ""
for category, commands in self.categories.items():
help_message += f"{category}:\n"
for command_name, command_info in commands.items():
help_message += f" {command_name}"
if command_info['description']:
help_message += f" - {command_info['description']}"
help_message += "\n"
return help_message
def get_all_commands(self):
all_commands = {}
for category, commands in self.categories.items():
all_commands[category] = commands
return all_commands

View File

@ -0,0 +1,203 @@
"""
PyserSSH - A SSH server. For more info visit https://github.com/damp11113/PyserSSH
Copyright (C) 2023-2024 damp11113 (MIT)
Visit https://github.com/damp11113/PyserSSH
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import re
from ..interactive import Clear, Send, wait_inputkey
from ..system.sysfunc import text_centered_screen
class TextDialog:
def __init__(self, client, title="", content=""):
self.client = client
self.windowsize = client["windowsize"]
self.title = title
self.content = content
def render(self):
Clear(self.client)
Send(self.client, self.title)
Send(self.client, "-" * self.windowsize["width"])
generatedwindow = text_centered_screen(self.content, self.windowsize["width"], self.windowsize["height"]-3, " ")
Send(self.client, generatedwindow)
Send(self.client, "Press 'enter' to continue", ln=False)
self.waituserenter()
def waituserenter(self):
while True:
if wait_inputkey(self.client, raw=True) == b'\r':
Clear(self.client)
break
pass
class MenuDialog:
def __init__(self, client, choose: list, title="", desc=""):
self.client = client
self.title = title
self.choose = choose
self.desc = desc
self.contentallindex = len(choose) - 1
self.selectedindex = 0
self.selectstatus = 0 # 0 none 1 selected 2 cancel
def render(self):
tempcontentlist = self.choose.copy()
Clear(self.client)
Send(self.client, self.title)
Send(self.client, "-" * self.client["windowsize"]["width"])
tempcontentlist[self.selectedindex] = "> " + tempcontentlist[self.selectedindex]
exported = "\n".join(tempcontentlist)
if not self.desc.strip() == "":
contenttoshow = (
f"{self.desc}\n\n"
f"{exported}"
)
else:
contenttoshow = (
f"{exported}"
)
generatedwindow = text_centered_screen(contenttoshow, self.client["windowsize"]["width"], self.client["windowsize"]["height"]-3, " ")
Send(self.client, generatedwindow)
Send(self.client, "Use arrow up/down key to choose and press 'enter' to select or 'c' to cancel", ln=False)
self._waituserinput()
def _waituserinput(self):
keyinput = wait_inputkey(self.client, raw=True)
if keyinput == b'\r': # Enter key
Clear(self.client)
self.selectstatus = 1
elif keyinput == b'c': # 'c' key for cancel
Clear(self.client)
self.selectstatus = 2
elif keyinput == b'\x1b[A': # Up arrow key
self.selectedindex -= 1
if self.selectedindex < 0:
self.selectedindex = 0
elif keyinput == b'\x1b[B': # Down arrow key
self.selectedindex += 1
if self.selectedindex > self.contentallindex:
self.selectedindex = self.contentallindex
if self.selectstatus == 2:
self.output()
elif self.selectstatus == 1:
self.output()
else:
self.render()
def output(self):
if self.selectstatus == 2:
return None
elif self.selectstatus == 1:
return self.selectedindex
class TextInputDialog:
def __init__(self, client, title="", inputtitle="Input Here", password=False):
self.client = client
self.title = title
self.inputtitle = inputtitle
self.ispassword = password
self.inputstatus = 0 # 0 none 1 selected 2 cancel
self.buffer = bytearray()
self.cursor_position = 0
def render(self):
Clear(self.client)
Send(self.client, self.title)
Send(self.client, "-" * self.client["windowsize"]["width"])
if self.ispassword:
texts = (
f"{self.inputtitle}\n\n"
"> " + ("*" * len(self.buffer.decode('utf-8')))
)
else:
texts = (
f"{self.inputtitle}\n\n"
"> " + self.buffer.decode('utf-8')
)
generatedwindow = text_centered_screen(texts, self.client["windowsize"]["width"], self.client["windowsize"]["height"]-3, " ")
Send(self.client, generatedwindow)
Send(self.client, "Press 'enter' to select or 'ctrl+c' to cancel", ln=False)
self._waituserinput()
def _waituserinput(self):
keyinput = wait_inputkey(self.client, raw=True)
if keyinput == b'\r': # Enter key
Clear(self.client)
self.inputstatus = 1
elif keyinput == b'\x03': # 'ctrl + c' key for cancel
Clear(self.client)
self.inputstatus = 2
try:
if keyinput == b'\x7f' or keyinput == b'\x08': # Backspace
if self.cursor_position > 0:
# Move cursor back, erase character, move cursor back again
self.buffer = self.buffer[:self.cursor_position - 1] + self.buffer[self.cursor_position:]
self.cursor_position -= 1
elif bool(re.compile(b'\x1b\[[0-9;]*[mGK]').search(keyinput)):
pass
else: # Regular character
self.buffer = self.buffer[:self.cursor_position] + keyinput + self.buffer[self.cursor_position:]
self.cursor_position += 1
except Exception:
raise
if self.inputstatus == 2:
self.output()
elif self.inputstatus == 1:
self.output()
else:
self.render()
def output(self):
if self.inputstatus == 2:
return None
elif self.inputstatus == 1:
return self.buffer.decode('utf-8')

View File

@ -0,0 +1,91 @@
"""
PyserSSH - A SSH server. For more info visit https://github.com/damp11113/PyserSSH
Copyright (C) 2023-2024 damp11113 (MIT)
Visit https://github.com/damp11113/PyserSSH
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
def clickable_url(url, link_text=""):
return f"\033]8;;{url}\033\\{link_text}\033]8;;\033\\"
class BasicTextFormatter:
RESET = "\033[0m"
TEXT_COLORS = {
"black": "\033[30m",
"red": "\033[31m",
"green": "\033[32m",
"yellow": "\033[33m",
"blue": "\033[34m",
"magenta": "\033[35m",
"cyan": "\033[36m",
"white": "\033[37m"
}
TEXT_COLOR_LEVELS = {
"light": "\033[1;{}m", # Light color prefix
"dark": "\033[2;{}m" # Dark color prefix
}
BACKGROUND_COLORS = {
"black": "\033[40m",
"red": "\033[41m",
"green": "\033[42m",
"yellow": "\033[43m",
"blue": "\033[44m",
"magenta": "\033[45m",
"cyan": "\033[46m",
"white": "\033[47m"
}
TEXT_ATTRIBUTES = {
"bold": "\033[1m",
"italic": "\033[3m",
"underline": "\033[4m",
"blink": "\033[5m",
"reverse": "\033[7m",
"strikethrough": "\033[9m"
}
@staticmethod
def format_text(text, color=None, color_level=None, background=None, attributes=None, target_text=''):
formatted_text = ""
start_index = text.find(target_text)
end_index = start_index + len(target_text) if start_index != -1 else len(text)
if color in BasicTextFormatter.TEXT_COLORS:
if color_level in BasicTextFormatter.TEXT_COLOR_LEVELS:
color_code = BasicTextFormatter.TEXT_COLORS[color]
color_format = BasicTextFormatter.TEXT_COLOR_LEVELS[color_level].format(color_code)
formatted_text += color_format
else:
formatted_text += BasicTextFormatter.TEXT_COLORS[color]
if background in BasicTextFormatter.BACKGROUND_COLORS:
formatted_text += BasicTextFormatter.BACKGROUND_COLORS[background]
if attributes in BasicTextFormatter.TEXT_ATTRIBUTES:
formatted_text += BasicTextFormatter.TEXT_ATTRIBUTES[attributes]
if target_text == "":
formatted_text += text + BasicTextFormatter.RESET
else:
formatted_text += text[:start_index] + text[start_index:end_index] + BasicTextFormatter.RESET + text[end_index:]
return formatted_text

View File

@ -26,13 +26,16 @@ SOFTWARE.
"""
# this file is from damp11113-library
from itertools import cycle, islice
from itertools import cycle
import math
import time
from threading import Thread
from time import sleep
from ..interactive import Print
from ..system.sysfunc import replace_enter_with_crlf
def Print(channel, string, start="", end="\n"):
channel.send(replace_enter_with_crlf(start + string + end))
try:
from damp11113.utils import get_size_unit2, center_string, TextFormatter, insert_string
@ -47,9 +50,8 @@ steps5 = ['[ ]', '[ -]', '[ --]', '[---]', '[-- ]', '[- ]']
steps6 = ['[ ]', '[ -]', '[ - ]', '[- ]']
class indeterminateStatus:
def __init__(self, client, desc="Loading...", end="[ ✔ ]", timeout=0.1, fail='[ ❌ ]', steps=None):
self.channel = client['channel']
self.windowsize = client["windowsize"]
def __init__(self, client, desc="Loading...", end="[ OK ]", timeout=0.1, fail='[FAILED]', steps=None):
self.client = client
self.desc = desc
self.end = end
@ -72,7 +74,7 @@ class indeterminateStatus:
for c in cycle(self.steps):
if self.done:
break
Print(self.channel, f"\r{c} {self.desc}" , end="")
Print(self.client['channel'], f"\r{c} {self.desc}" , end="")
sleep(self.timeout)
def __enter__(self):
@ -80,23 +82,23 @@ class indeterminateStatus:
def stop(self):
self.done = True
cols = self.windowsize["width"]
Print(self.channel, "\r" + " " * cols, end="")
Print(self.channel, f"\r{self.end}")
cols = self.client["windowsize"]["width"]
Print(self.client['channel'], "\r" + " " * cols, end="")
Print(self.client['channel'], f"\r{self.end}")
def stopfail(self):
self.done = True
self.fail = True
cols = self.windowsize["width"]
Print(self.channel, "\r" + " " * cols, end="")
Print(self.channel, f"\r{self.faill}")
cols = self.client["windowsize"]["width"]
Print(self.client['channel'], "\r" + " " * cols, end="")
Print(self.client['channel'], f"\r{self.faill}")
def __exit__(self, exc_type, exc_value, tb):
# handle exceptions with those variables ^
self.stop()
class LoadingProgress:
def __init__(self, client, total=100, totalbuffer=None, length=50, fill='', fillbufferbar='', desc="Loading...", status="", enabuinstatus=True, end="[ ]", timeout=0.1, fail='[]', steps=None, unit="it", barbackground="-", shortnum=False, buffer=False, shortunitsize=1000, currentshortnum=False, show=True, indeterminate=False, barcolor="red", bufferbarcolor="white",barbackgroundcolor="black", color=True):
def __init__(self, client, total=100, totalbuffer=None, length=50, fill='', fillbufferbar='', desc="Loading...", status="", enabuinstatus=True, end="[ OK ]", timeout=0.1, fail='[FAILED]', steps=None, unit="it", barbackground="-", shortnum=False, buffer=False, shortunitsize=1000, currentshortnum=False, show=True, indeterminate=False, barcolor="red", bufferbarcolor="white",barbackgroundcolor="black", color=True):
"""
Simple loading progress bar python
@param client: from ssh client request
@ -116,8 +118,7 @@ class LoadingProgress:
@param barbackgroundcolor: change background color
@param color: enable colorful
"""
self.channel = client["channel"]
self.windowsize = client["windowsize"]
self.client = client
self.desc = desc
self.end = end
@ -278,7 +279,7 @@ class LoadingProgress:
self.currentprint = f"{c} {self.desc} | --%|{bar}| {elapsed_formatted} | {self.status}"
if self.printed:
Print(self.channel, f"\r{self.currentprint}", end="")
Print(self.client["channel"], f"\r{self.currentprint}", end="")
sleep(self.timeout)
@ -287,16 +288,16 @@ class LoadingProgress:
def stop(self):
self.done = True
cols = self.windowsize["width"]
Print(self.channel, "\r" + " " * cols, end="")
Print(self.channel, f"\r{self.end}")
cols = self.client["windowsize"]["width"]
Print(self.client["channel"], "\r" + " " * cols, end="")
Print(self.client["channel"], f"\r{self.end}")
def stopfail(self):
self.done = True
self.fail = True
cols = self.windowsize["width"]
Print(self.channel, "\r" + " " * cols, end="")
Print(self.channel, f"\r{self.faill}")
cols = self.client["windowsize"]["width"]
Print(self.client["channel"], "\r" + " " * cols, end="")
Print(self.client["channel"], f"\r{self.faill}")
def __exit__(self, exc_type, exc_value, tb):
# handle exceptions with those variables ^

View File

@ -0,0 +1,30 @@
"""
PyserSSH - A SSH server. For more info visit https://github.com/damp11113/PyserSSH
Copyright (C) 2023-2024 damp11113 (MIT)
Visit https://github.com/damp11113/PyserSSH
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
class PTOP:
def __init__(self, client, interval=1):
pass # working

View File

@ -24,32 +24,43 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import re
import socket
from .system.sysfunc import replace_enter_with_crlf
def Send(channel, string, ln=True):
if ln:
channel.send(replace_enter_with_crlf(string + "\n"))
else:
channel.send(replace_enter_with_crlf(string))
def Print(channel, string, start="", end="\n"):
channel.send(replace_enter_with_crlf(start + string + end))
def Clear(client):
def Send(client, string, ln=True):
channel = client["channel"]
if ln:
channel.send(replace_enter_with_crlf(str(string) + "\n"))
else:
channel.send(replace_enter_with_crlf(str(string)))
def Clear(client, oldclear=False):
sx, sy = client["windowsize"]["width"], client["windowsize"]["height"]
for x in range(sx):
for y in range(sy):
Send(channel, '\b \b', ln=False) # Send newline after each line
if oldclear:
for x in range(sy):
Send(client, '\b \b' * sx, ln=False) # Send newline after each line
else:
Send(client, "\033[3J", ln=False)
Send(client, "\033[1J", ln=False)
Send(client, "\033[H", ln=False)
def Title(client, title):
Send(client, f"\033]0;{title}\007", ln=False)
def wait_input(client, prompt="", defaultvalue=None, cursor_scroll=False, echo=True, password=False, passwordmask=b"*", noabort=False, timeout=0):
channel = client["channel"]
def wait_input(channel, prompt="", defaultvalue=None, cursor_scroll=False, echo=True, password=False, passwordmask=b"*", noabort=False):
channel.send(replace_enter_with_crlf(prompt))
buffer = bytearray()
cursor_position = 0
if timeout != 0:
channel.settimeout(timeout)
try:
while True:
byte = channel.recv(1)
@ -90,35 +101,104 @@ def wait_input(channel, prompt="", defaultvalue=None, cursor_scroll=False, echo=
channel.sendall(b'\r\n')
except socket.timeout:
output = ""
except Exception:
raise
else:
output = buffer.decode('utf-8')
if timeout != 0:
channel.settimeout(0)
channel.setblocking(False)
channel.sendall(b'\r\n')
# Return default value if specified and no input given
if defaultvalue is not None and not output.strip():
return defaultvalue
else:
return output
def wait_inputkey(channel, prompt="", raw=False):
def wait_inputkey(client, prompt="", raw=False, timeout=0):
channel = client["channel"]
if prompt != "":
channel.send(replace_enter_with_crlf(prompt))
if timeout != 0:
channel.settimeout(timeout)
try:
byte = channel.recv(10)
if not raw:
if not byte or byte == b'\x04':
raise EOFError()
elif byte == b'\t':
if not raw:
if bool(re.compile(b'\x1b\[[0-9;]*[mGK]').search(byte)):
pass
return byte.decode('utf-8')
return byte.decode('utf-8') # only regular character
else:
return byte
except socket.timeout:
channel.settimeout(0)
channel.setblocking(False)
channel.send("\r\n")
return None
except Exception:
if timeout != 0:
channel.settimeout(0)
channel.setblocking(False)
raise
def wait_choose(client, choose, prompt="", timeout=0):
channel = client["channel"]
chooseindex = 0
chooselen = len(choose) - 1
if timeout != 0:
channel.settimeout(timeout)
while True:
try:
tempchooselist = choose.copy()
tempchooselist[chooseindex] = "[" + tempchooselist[chooseindex] + "]"
exported = " ".join(tempchooselist)
if prompt.strip() == "":
Send(channel, f'\r{exported}', ln=False)
else:
Send(channel, f'\r{prompt}{exported}', ln=False)
keyinput = wait_inputkey(channel, raw=True)
if keyinput == b'\r': # Enter key
Send(channel, "\033[K")
return chooseindex
elif keyinput == b'\x03': # ' ctrl+c' key for cancel
Send(channel, "\033[K")
return None
elif keyinput == b'\x1b[D': # Up arrow key
chooseindex -= 1
if chooseindex < 0:
chooseindex = 0
elif keyinput == b'\x1b[C': # Down arrow key
chooseindex += 1
if chooseindex > chooselen:
chooseindex = chooselen
except socket.timeout:
channel.settimeout(0)
channel.setblocking(False)
channel.send("\r\n")
return chooseindex
except Exception:
if timeout != 0:
channel.settimeout(0)
channel.setblocking(False)
raise

View File

@ -28,7 +28,6 @@ SOFTWARE.
import os
import time
import paramiko
import socket
import threading
from functools import wraps
import logging
@ -37,25 +36,16 @@ from .system.SFTP import SSHSFTPServer
from .system.interface import Sinterface
from .interactive import *
from .system.inputsystem import expect
from .system.info import system_banner
try:
os.environ["pyserssh_systemmessage"]
except:
os.environ["pyserssh_systemmessage"] = "YES"
if os.environ["pyserssh_systemmessage"] == "YES":
print(system_banner)
from .system.info import system_banner, __version__
#paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow(2, 22)
sftpclient = ["WinSCP", "Xplore"]
logger = logging.getLogger("PyserSSH")
logger.disabled = True
class Server:
def __init__(self, accounts, system_message=True, timeout=0, disable_scroll_with_arrow=True, sftp=True, sftproot=os.getcwd(), system_commands=False, compression=True, usexternalauth=False, history=True):
def __init__(self, accounts, system_message=True, disable_scroll_with_arrow=True, sftp=True, sftproot=os.getcwd(), system_commands=True, compression=True, usexternalauth=False, history=True, inputsystem=True, XHandler=None, title=f"PyserSSH v{__version__}", inspeed=32768):
"""
A simple SSH server
"""
@ -64,7 +54,6 @@ class Server:
self.client_handlers = {} # Dictionary to store event handlers for each client
self.current_users = {} # Dictionary to store current_user for each connected client
self.accounts = accounts
self.timeout = timeout
self.disable_scroll_with_arrow = disable_scroll_with_arrow
self.sftproot = sftproot
self.sftpena = sftp
@ -72,6 +61,10 @@ class Server:
self.compressena = compression
self.usexternalauth = usexternalauth
self.history = history
self.enainputsystem = inputsystem
self.XHandler = XHandler
self.title = title
self.inspeed = inspeed
self.system_banner = system_banner
@ -111,6 +104,7 @@ class Server:
SSHSFTPServer.CLIENTHANDELES = self.client_handlers
bh_session.set_subsystem_handler('sftp', paramiko.SFTPServer, SSHSFTPServer)
if self.compressena:
bh_session.use_compression(True)
else:
@ -120,6 +114,8 @@ class Server:
bh_session.packetizer.REKEY_BYTES = pow(2, 40)
bh_session.packetizer.REKEY_PACKETS = pow(2, 40)
bh_session.default_max_packet_size = self.inspeed
server = Sinterface(self)
bh_session.start_server(server=server)
@ -127,74 +123,78 @@ class Server:
channel = bh_session.accept()
if self.timeout != 0:
channel.settimeout(self.timeout)
if channel is None:
logger.warning("no channel")
try:
logger.info("user authenticated")
client_address = channel.getpeername() # Get client's address to identify the user
if client_address not in self.client_handlers:
peername = channel.getpeername()
if peername not in self.client_handlers:
# Create a new event handler for this client if it doesn't exist
self.client_handlers[client_address] = {
self.client_handlers[peername] = {
"event_handlers": {},
"current_user": None,
"channel": channel, # Associate the channel with the client handler,
"last_activity_time": None,
"connecttype": None,
"last_login_time": None,
"windowsize": {}
"windowsize": {},
"x11": {}
}
client_handler = self.client_handlers[client_address]
client_handler = self.client_handlers[peername]
client_handler["current_user"] = server.current_user
client_handler["channel"] = channel # Update the channel attribute for the client handler
client_handler["last_activity_time"] = time.time()
client_handler["last_login_time"] = time.time()
peername = channel.getpeername()
#byte = channel.recv(1)
#if byte == b'\x00':
#if not any(bh_session.remote_version.split("-")[2].startswith(prefix) for prefix in sftpclient):
if not channel.out_window_size == bh_session.default_window_size:
while self.client_handlers[channel.getpeername()]["windowsize"] == {}:
pass
channel.send(f"\033]0;{self.title}\007".encode())
if self.sysmess:
channel.sendall(replace_enter_with_crlf(self.system_banner))
channel.sendall(replace_enter_with_crlf("\n"))
while self.client_handlers[channel.getpeername()]["windowsize"] == {}:
pass
self._handle_event("connect", channel, self.client_handlers[channel.getpeername()])
try:
self._handle_event("connect", self.client_handlers[channel.getpeername()])
except Exception as e:
self._handle_event("error", self.client_handlers[channel.getpeername()], e)
client_handler["connecttype"] = "ssh"
if self.enainputsystem:
try:
if self.accounts.get_user_timeout(self.client_handlers[channel.getpeername()]["current_user"]) != 0:
channel.settimeout(self.accounts.get_user_timeout(self.client_handlers[channel.getpeername()]["current_user"]))
channel.send(replace_enter_with_crlf(self.accounts.get_prompt(self.client_handlers[channel.getpeername()]["current_user"]) + " ").encode('utf-8'))
while True:
expect(self, channel, peername)
except KeyboardInterrupt:
self._handle_event("disconnected", self.client_handlers[peername]["current_user"])
channel.close()
bh_session.close()
except Exception as e:
self._handle_event("syserror", client_handler, e)
logger.error(e)
finally:
self._handle_event("disconnected", self.client_handlers[peername]["current_user"])
channel.close()
else:
if self.sftpena:
if self.accounts.get_user_sftp_allow(self.client_handlers[channel.getpeername()]["current_user"]):
client_handler["connecttype"] = "sftp"
self._handle_event("connectsftp", channel, self.client_handlers[channel.getpeername()])
self._handle_event("connectsftp", self.client_handlers[channel.getpeername()])
else:
del self.client_handlers[peername]
self._handle_event("disconnected", self.client_handlers[peername]["current_user"])
channel.close()
else:
del self.client_handlers[peername]
self._handle_event("disconnected", self.client_handlers[peername]["current_user"])
channel.close()
except:
raise
pass
def stop_server(self):
logger.info("Stopping the server...")
@ -233,37 +233,37 @@ class Server:
for peername, client_handler in list(self.client_handlers.items()):
if client_handler["current_user"] == username:
channel = client_handler.get("channel")
self._handle_event("disconnected", channel.getpeername(), self.client_handlers[channel.getpeername()]["current_user"])
if reason is None:
if channel:
channel.close()
del self.client_handlers[peername]
logger.info(f"User '{username}' has been kicked.")
else:
if channel:
Send(channel, f"You have been disconnected for {reason}")
channel.close()
del self.client_handlers[peername]
logger.info(f"User '{username}' has been kicked by reason {reason}.")
def kickbypeername(self, peername, reason=None):
client_handler = self.client_handlers.get(peername)
if client_handler:
channel = client_handler.get("channel")
self._handle_event("disconnected", channel.getpeername(), self.client_handlers[channel.getpeername()]["current_user"])
if reason is None:
if channel:
channel.close()
del self.client_handlers[peername]
logger.info(f"peername '{peername}' has been kicked.")
else:
if channel:
Send(channel, f"You have been disconnected for {reason}")
channel.close()
del self.client_handlers[peername]
logger.info(f"peername '{peername}' has been kicked by reason {reason}.")
def kickall(self, reason=None):
for peername, client_handler in self.client_handlers.items():
channel = client_handler.get("channel")
self._handle_event("disconnected", channel.getpeername(), self.client_handlers[channel.getpeername()]["current_user"])
if reason is None:
if channel:
channel.close()
@ -292,6 +292,7 @@ class Server:
for client_handler in self.client_handlers.values():
if client_handler.get("current_user") == username:
channel = client_handler.get("channel")
if channel:
try:
# Send the message to the specific client

View File

@ -25,10 +25,10 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
version = "4.0"
__version__ = "4.2"
system_banner = (
f"\033[36mPyserSSH V{version} \033[0m\n"
f"\033[36mPyserSSH V{__version__} \033[0m\n"
#"\033[33m!!Warning!! This is Testing Version of PyserSSH \033[0m\n"
"\033[35mUse Putty and WinSCP (SFTP) for best experience \033[0m"
)

View File

@ -24,27 +24,28 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import socket
import time
import logging
import shlex
import traceback
from .sysfunc import replace_enter_with_crlf
from .syscom import systemcommand
logger = logging.getLogger("PyserSSH")
logger.disabled = True
def expect(self, chan, peername, echo=True):
buffer = bytearray()
cursor_position = 0
outindexall = 0
history_index_position = 0 # Initialize history index position outside the loop
currentuser = self.client_handlers[chan.getpeername()]
try:
while True:
byte = chan.recv(1)
self._handle_event("onrawtype", chan, byte, self.client_handlers[chan.getpeername()])
self._handle_event("onrawtype", self.client_handlers[chan.getpeername()], byte)
if self.timeout != 0:
self.client_handlers[chan.getpeername()]["last_activity_time"] = time.time()
if not byte or byte == b'\x04':
@ -57,7 +58,10 @@ def expect(self, chan, peername, echo=True):
if cursor_position > 0:
buffer = buffer[:cursor_position - 1] + buffer[cursor_position:]
cursor_position -= 1
outindexall -= 1
chan.sendall(b"\b \b")
else:
chan.sendall(b"\x07")
elif byte == b"\x1b" and chan.recv(1) == b'[':
arrow_key = chan.recv(1)
if not self.disable_scroll_with_arrow:
@ -85,13 +89,13 @@ def expect(self, chan, peername, echo=True):
# Update buffer and cursor position with the new command
buffer = bytearray(command.encode('utf-8'))
cursor_position = len(buffer)
outindexall = cursor_position
# Print the updated buffer
chan.sendall(buffer)
history_index_position += 1
if arrow_key == b'B':
elif arrow_key == b'B':
if history_index_position != -1:
if history_index_position == 0:
command = self.accounts.get_lastcommand(currentuser["current_user"])
@ -105,6 +109,7 @@ def expect(self, chan, peername, echo=True):
# Update buffer and cursor position with the new command
buffer = bytearray(command.encode('utf-8'))
cursor_position = len(buffer)
outindexall = cursor_position
# Print the updated buffer
chan.sendall(buffer)
@ -115,6 +120,7 @@ def expect(self, chan, peername, echo=True):
buffer.clear()
cursor_position = 0
outindexall = 0
history_index_position -= 1
@ -122,36 +128,71 @@ def expect(self, chan, peername, echo=True):
break
else:
history_index_position = -1
self._handle_event("ontype", self.client_handlers[chan.getpeername()], byte)
if echo:
if outindexall != cursor_position:
chan.sendall(byte + buffer[cursor_position:])
chan.sendall(f"\033[{cursor_position}G".encode())
else:
chan.sendall(byte)
#print(buffer[:cursor_position], byte, buffer[cursor_position:])
buffer = buffer[:cursor_position] + byte + buffer[cursor_position:]
cursor_position += 1
self._handle_event("ontype", chan, byte, self.client_handlers[chan.getpeername()])
if echo:
chan.sendall(byte)
outindexall += 1
if echo:
chan.sendall(b'\r\n')
command = str(buffer.decode('utf-8'))
try:
if self.enasyscom:
systemcommand(currentuser, command)
self._handle_event("command", chan, command, currentuser)
except Exception as e:
self._handle_event("error", chan, e, currentuser)
command = str(buffer.decode('utf-8')).strip()
if self.history and command.strip() != "" and self.accounts.get_lastcommand(currentuser["current_user"]) != command:
self.accounts.add_history(currentuser["current_user"], command)
if command.strip() != "":
if self.accounts.get_user_timeout(self.client_handlers[chan.getpeername()]["current_user"]) != 0:
chan.settimeout(0)
chan.setblocking(False)
try:
if self.enasyscom:
sct = systemcommand(currentuser, command)
else:
sct = False
if not sct:
if self.XHandler != None:
self._handle_event("beforexhandler", currentuser, command)
self.XHandler.call(currentuser, command)
self._handle_event("afterxhandler", currentuser, command)
else:
self._handle_event("command", currentuser, command)
except Exception as e:
self._handle_event("error", currentuser, e)
try:
chan.send(replace_enter_with_crlf(self.accounts.get_prompt(currentuser["current_user"]) + " ").encode('utf-8'))
except:
logger.error("Send error")
if self.accounts.get_user_timeout(self.client_handlers[chan.getpeername()]["current_user"]) != 0:
chan.settimeout(self.accounts.get_user_timeout(self.client_handlers[chan.getpeername()]["current_user"]))
except socket.timeout:
chan.settimeout(0)
chan.setblocking(False)
chan.close()
except Exception as e:
logger.error(str(e))
finally:
try:
if not byte:
logger.info(f"{peername} is disconnected")
self._handle_event("disconnected", peername, self.client_handlers[peername]["current_user"])
self._handle_event("disconnected", self.client_handlers[peername]["current_user"])
except:
logger.info(f"{peername} is disconnected by timeout")
self._handle_event("timeout", self.client_handlers[peername]["current_user"])

View File

@ -48,6 +48,7 @@ class Sinterface(paramiko.ServerInterface):
return paramiko.AUTH_SUCCESSFUL
else:
if self.serverself._handle_event("auth", data):
self.current_user = username # Store the current user upon successful authentication
return paramiko.AUTH_SUCCESSFUL
else:
return paramiko.AUTH_FAILED
@ -67,8 +68,11 @@ class Sinterface(paramiko.ServerInterface):
"pixelwidth": pixelwidth,
"pixelheight": pixelheight,
}
try:
self.serverself.client_handlers[channel.getpeername()]["windowsize"] = data2
self.serverself._handle_event("connectpty", channel, data, self.serverself.client_handlers[channel.getpeername()])
self.serverself._handle_event("connectpty", self.serverself.client_handlers[channel.getpeername()], data)
except:
pass
return True
@ -76,6 +80,18 @@ class Sinterface(paramiko.ServerInterface):
return True
def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
data = {
"single_connection": single_connection,
"auth_protocol": auth_protocol,
"auth_cookie": auth_cookie,
"screen_number": screen_number
}
try:
self.serverself.client_handlers[channel.getpeername()]["x11"] = data
self.serverself._handle_event("connectx11", self.serverself.client_handlers[channel.getpeername()], data)
except:
pass
return True
def check_channel_window_change_request(self, channel, width: int, height: int, pixelwidth: int, pixelheight: int):
@ -86,4 +102,4 @@ class Sinterface(paramiko.ServerInterface):
"pixelheight": pixelheight
}
self.serverself.client_handlers[channel.getpeername()]["windowsize"] = data
self.serverself._handle_event("resized", channel, data, self.serverself.client_handlers[channel.getpeername()])
self.serverself._handle_event("resized", self.serverself.client_handlers[channel.getpeername()], data)

View File

@ -17,48 +17,33 @@ The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WA3RRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import shlex
from ..interactive import *
from .info import version
try:
from damp11113.info import pyofetch
from damp11113.utils import TextFormatter
damp11113lib = True
except:
damp11113lib = False
def systemcommand(client, command):
channel = client["channel"]
if command == "info":
if damp11113lib:
Send(channel, "Please wait...", ln=False)
pyf = pyofetch().info(f"{TextFormatter.format_text('PyserSSH Version', color='yellow')}: {TextFormatter.format_text(version, color='cyan')}")
Send(channel, " \r", ln=False)
for i in pyf:
Send(channel, i)
else:
Send(channel, "damp11113-library not available for use this command")
elif command == "whoami":
if command == "whoami":
Send(channel, client["current_user"])
return True
elif command.startswith("title"):
args = shlex.split(command)
title = args[1]
Title(client, title)
return True
elif command == "exit":
channel.close()
return True
elif command == "clear":
Clear(client)
elif command == "fullscreentest":
Clear(client)
sx, sy = client["windowsize"]["width"], client["windowsize"]["height"]
for x in range(sx):
for y in range(sy):
Send(channel, 'H', ln=False) # Send newline after each line
return True
else:
return False

View File

@ -29,3 +29,21 @@ def replace_enter_with_crlf(input_string):
if '\n' in input_string:
input_string = input_string.replace('\n', '\r\n')
return input_string
def text_centered_screen(text, screen_width, screen_height, spacecharacter=" "):
screen = []
lines = text.split("\n")
padding_vertical = (screen_height - len(lines)) // 2 # Calculate vertical padding
for y in range(screen_height):
line = ""
if padding_vertical <= y < padding_vertical + len(lines): # Check if it's within the range of the text lines
index = y - padding_vertical # Get the corresponding line index
padding_horizontal = (screen_width - len(lines[index])) // 2 # Calculate horizontal padding for each line
line += spacecharacter * padding_horizontal + lines[index] + spacecharacter * padding_horizontal
else: # Fill other lines with space characters
line += spacecharacter * screen_width
screen.append(line)
return "\n".join(screen)