no demo folder but it on library
python -m PyserSSH
This commit is contained in:
dharm pimsen 2024-04-19 15:31:11 +07:00
parent 79353cf668
commit f33be3bca1
14 changed files with 232 additions and 124 deletions

View File

@ -6,6 +6,9 @@ This project is part from [damp11113-library](https://github.com/damp11113/damp1
This Server use port **2222** for default port
> [!WARNING]
> For use in product please **generate new private key**! If you still use this demo private key maybe your product getting **hacked**! up to 90%. Please don't use this demo private key for real product.
# Install
Install from pypi
```bash
@ -40,11 +43,25 @@ If you input `hello` the response is `world`
# Demo
https://github.com/damp11113/PyserSSH/assets/64675096/49bef3e2-3b15-4b64-b88e-3ca84a955de7
See [server.py](https://github.com/damp11113/PyserSSH/blob/main/demo/server.py)
For run this demo you can use this command
```
$ python -m PyserSSH
```
then
```
Do you want to run demo? (y/n): y
```
But if no [damp11113-library](https://github.com/damp11113/damp11113-library)
```
No 'damp11113-library'
This demo is require 'damp11113-library' for run
```
you need to install [damp11113-library](https://github.com/damp11113/damp11113-library) for run this demo by choose `y` or `yes` in lowercase or uppercase
```
Do you want to install 'damp11113-library'? (y/n): y
```
For exit demo you can use `ctrl+c` or use `shutdown now` in PyserSSH shell **(not in real terminal)**
I intend to leaked private key because that key i generated new. I recommend to generate new key if you want to use on your host because that key is for demo only.
why i talk about this? because when i push private key into this repo in next 5 min++ i getting new email from GitGuardian. in that email say "
GitGuardian has detected the following RSA Private Key exposed within your GitHub account" i dont knows what is GitGuardian and i not install this app into my account.

View File

@ -1,27 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAq7UgQtL4Nv2s8rvaJjQryNxpsKpcSeDIsABnvry6Xkd3KhOi
K3c4dkYJjiAb4w4wfPiJ7sFL/PFP3f/slcpNHz18meWZkktia3rBX8uyJQ3soyNw
Vbxm6mOPntAqC4JBoPaYS4HABSYxJYY6yPU1i0UufvWg5pNRgeZIM8kQSyie4q1C
AEFG1T6sabJ5mOWH8Yw/zu3nTQpz2yIZYSVOsvJxBtaCEHCThhmQk2jPb88Ss0XT
a7uzaX/UIRktDz1FN6ooJbFqHsHxOsZJrC6YdZ3lo7DZYJU+jclG4jy95rGe7KpE
0p9cMAYNO0ya6toJM6GwFzJEk+HD0BTxdi7dKQIDAQABAoIBAFY1ciUa1xSE+LhG
KJjVyMXoJAhXAE73VMtI6M2S499B8kpl4R4BlY+MSm/ZHyc4kI+uGVKOKiCs53SG
cboi/+WXcV+zLw+MWbWsxDncg2ynORAPUu840FMN+aW6zeFJXLn8FSqT0lzDeBlm
80zCEEgES/viRw59GIcnn0igwlV5EzO6zhWzwdeMpBO4XFFDaiEY5idIBQf4jCEY
JcfQOrkPpfPgjLyQmFLyeojyaUVLIOOLUGMsSS8Hk7MJlgEdneEIXX7EhPqLDPyc
1f33WsnlTvbHLGWHE7lMG1LbK1ecsNwWWUFfoVQaQzYUQAVSuoqTYYkAYfGnQmV0
nnsIUAECgYEA7xR8cqu/knx9gFeinYwLx+/BbUw1MAX0WSK4GOx0O7qgbDb+scb2
x4aBCZnDE44KRl1/mbLXxb3wq8GN7W4owIHSud/8gZNqJM0uXbiJS7gMW0XnoJuk
D4hr0ADddPn/vGfQyUf0oUOFg2nP+H99GEuyYpHXr39R4Fh4UljXEkECgYEAt9wG
GRUhW6BoE3t2/mUcgkpXrljI7W2SDtHGgGzNb8Mlxcu/KUC4b6qdNwe83t0w3SaP
+34JHXIqnb2cuvigQj8pCoFxaMT6gH7x1zQWI1cORCA+Vfx/NZ2cCyXpebNOytxu
AwtAVo+r/QZlfs4OlG+TVwKBxYz9huCPFaAfQOkCgYA5iViZ0DN+cW9Sn8SG3dlH
+K84OoriT8yKVwyvEti2Nye8Y0/QQO3K/te3E8Yawqg+XuoCd0PuVtPAwggCB+zO
x2+LRBhkprF4wdhSvcJs8pImtSAVSt+kzVQE7vBc4n1lPibFCggZd0J+acyfJS9Z
1X3MswSRO7bcou3yA2dfAQKBgAHz3Dy39Lq8YV6TmRfqivr3Pyci2j9rQnnV0H3c
qfHd6LDJESanAU5uSW0kL+VOBA7VMgJBvGcLp1g1g0yZB1qswQrThRjPvrlOn9Lh
QrrtWcFvdjoDjHZNTjLwHCKmvNd6r9Bodi51KCZvwvQtzAnXhYEPDcHDVY3xJJPe
N3bBAoGADnEc8G2taL/tq7Skcw1G/cZYUp4CZw+ypCLd1xyus7Lnu9Wl6y3U0HEl
pjgzBGlwvTRkvC5ewz46WIWE+hlmOdSih81Cro48baXR2T1OD8jKQ2pXmHK5Z/wy
0V7t7eHd/k8CcXzIWIk6gmpOYhKkIVvQW5g7ssbwsfsk3qD++Fs=
-----END RSA PRIVATE KEY-----

View File

@ -5,7 +5,7 @@ with open('README.md', 'r', encoding='utf-8') as f:
setup(
name='PyserSSH',
version='4.3',
version='4.4',
license='MIT',
author='damp11113',
author_email='damp51252@gmail.com',

View File

@ -42,9 +42,10 @@ import logging
from .interactive import *
from .server import Server
from .account import AccountManager
from .system.info import system_banner
from .system.info import system_banner
try:
os.environ["pyserssh_systemmessage"]
except:
@ -66,3 +67,10 @@ if os.environ["pyserssh_log"] == "NO":
if os.environ["pyserssh_systemmessage"] == "YES":
print(system_banner)
if __name__ == "__main__":
stadem = input("Do you want to run demo? (y/n): ")
if stadem.upper() in ["Y", "YES"]:
from .demo import demo1
else:
exit()

View File

@ -60,6 +60,8 @@ class AccountManager:
self.save("autosave_session.ses")
self.__autosaveworknexttime = time.time() + self.autosavedelay
time.sleep(1) # fix cpu load
def __saveexit(self):
self.__autosavework = False
self.save("autosave_session.ses")
@ -140,6 +142,15 @@ class AccountManager:
return self.accounts[username]["sftp_path"]
return ""
def set_banner(self, username, banner):
if username in self.accounts:
self.accounts[username]["banner"] = banner
def get_banner(self, username):
if username in self.accounts and "banner" in self.accounts[username]:
return self.accounts[username]["banner"]
return None
def get_user_timeout(self, username):
if username in self.accounts and "timeout" in self.accounts[username]:
return self.accounts[username]["timeout"]

View File

@ -0,0 +1,38 @@
"""
PyserSSH - A Scriptable SSH server. For more info visit https://github.com/damp11113/PyserSSH
Copyright (C) 2023-2024 damp11113 (MIT)
Visit https://github.com/damp11113/PyserSSH
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
"""
note
ansi cursor arrow
up - \x1b[A
down - \x1b[B
left - \x1b[D
right - \x1b[C
https://en.wikipedia.org/wiki/ANSI_escape_code
"""

View File

@ -2,53 +2,57 @@ import os
import socket
import time
import shlex
from damp11113 import TextFormatter, sort_files, allfiles
import cv2
import traceback
import requests
from bs4 import BeautifulSoup
import pyfiglet
from PyserSSH import Server, AccountManager, Send, wait_input, wait_inputkey, wait_choose, Clear, Title
from PyserSSH.system.info import system_banner
from PyserSSH.extensions.processbar import indeterminateStatus, LoadingProgress
from PyserSSH.extensions.dialog import MenuDialog, TextDialog, TextInputDialog
from PyserSSH.extensions.moredisplay import clickable_url
from ..server import Server
from ..account import AccountManager
from ..interactive import Send, Clear, wait_input, wait_inputkey, wait_choose
from ..system.info import system_banner, __version__
from ..extensions.processbar import (indeterminateStatus, LoadingProgress)
from ..extensions.dialog import MenuDialog, TextDialog, TextInputDialog
from ..extensions.moredisplay import clickable_url
try:
from damp11113 import TextFormatter
except:
print("No 'damp11113-library'")
print("This demo is require 'damp11113-library' for run")
ins = input("Do you want to install 'damp11113-library'? (y/n): ")
if ins.upper() in ["Y", "YES"]:
import pip
pip.main(["install", "damp11113"])
from damp11113 import TextFormatter
else:
exit()
useraccount = AccountManager()
useraccount.add_account("admin", "") # create user without password
useraccount.add_account("test", "test") # create user without password
ssh = Server(useraccount, system_commands=True, system_message=False)
nonamewarning = """Connection Warning:
Unauthorized access or improper use of this system is prohibited.
Please ensure you have proper authorization before proceeding."""
Authorizedmessage = """You have successfully connected to the server.
Enjoy your session and remember to follow security protocols."""
ssh = Server(useraccount, system_commands=True, system_message=False, sftp=False)
loading = ["PyserSSH", "Extensions"]
print("you connect to this demo using 'ssh admin@localhost -p 2222' (no password)")
print("command list: passtest, colortest, typing <speed> <text>, renimtest, errortest, inloadtest, loadtest, dialogtest, dialogtest2, dialogtest3, passdialogtest3, choosetest, vieweb <url>, shutdown now")
print("Do not you this demo private key for real production")
@ssh.on_user("connect")
def connect(client):
Title(client, "PyserSSH")
#print(client["windowsize"])
if client['current_user'] == "":
warningmessage = nonamewarning
else:
warningmessage = Authorizedmessage
wm = f"""*********************************************************************************************
wm = f"""{pyfiglet.figlet_format('PyserSSH', font='usaflag', width=client["windowsize"]["width"])}*********************************************************************************************
Hello {client['current_user']},
{warningmessage}
This is the testing server of PyserSSH v{__version__}.
For use in product please use new private key.
Visit: {clickable_url("https://damp11113.xyz", "DPCloudev")}
{system_banner}
*********************************************************************************************"""
if client['current_user'] != "test":
for i in loading:
P = indeterminateStatus(client, f"Starting {i}", f"[ OK ] Started {i}")
P.start()
@ -72,7 +76,6 @@ def error(client, error):
else:
Send(client, traceback.format_exc())
#@ssh.on_user("onrawtype")
#def onrawtype(client, key):
# print(key)
@ -95,11 +98,11 @@ def command(client, command: str):
Send(client, "")
Send(client, "TrueColors 24-Bit")
elif command == "keytest":
user = wait_inputkey(client, "press any key", raw=True)
user = wait_inputkey(client, "press any key", raw=True, timeout=1)
Send(client, "")
Send(client, f"key: {user}")
for i in range(10):
user = wait_inputkey(client, "press any key", raw=True)
user = wait_inputkey(client, "press any key", raw=True, timeout=1)
Send(client, "")
Send(client, f"key: {user}")
elif command.startswith("typing"):
@ -110,10 +113,9 @@ def command(client, command: str):
Send(client, w, ln=False)
time.sleep(speed)
Send(client, "")
elif command.startswith("renimtest"):
args = shlex.split(command)
elif command == "renimtest":
Clear(client)
image = cv2.imread(f"opensource.png", cv2.IMREAD_COLOR)
image = cv2.imread(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'opensource.png'), cv2.IMREAD_COLOR)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
width, height = client['windowsize']["width"]-5, client['windowsize']["height"]-5
@ -126,7 +128,6 @@ def command(client, command: str):
for y in range(0, height):
for x in range(0, width):
pixel_color = resized[y, x]
# PyserSSH.Send(channel, f"Pixel color at ({x}, {y}): {pixel_color}")
if pixel_color.tolist() != [0, 0, 0]:
t += TextFormatter.format_text_truecolor(" ", background=f"{pixel_color[0]};{pixel_color[1]};{pixel_color[2]}")
else:
@ -192,6 +193,7 @@ def command(client, command: str):
loading.stop()
Di1 = TextDialog(client, url, text_content)
Di1.render()
elif command == "shutdown now":
ssh.stop_server()
ssh.run(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'private_key.pem'))

View File

Before

Width:  |  Height:  |  Size: 78 KiB

After

Width:  |  Height:  |  Size: 78 KiB

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAwNfkia91HNrpyqlHwjYrVKDV5SkDt5P27MxKZDjwOokGBX7E
g5cMXb1wxQeCm+zptg680qIXHfSaaOi1E/DAutaTIQa3GI+gDMphlWMxrEWFuZOB
ylvTuFAxLB8xKcuBjelQX4TYlcgA1WgyeI6LFPNdJPekVHnzkLCZnW+y05PkT6a0
QY1Eoa6DY2TtY8w4NZmnyCy1ZPYV5qLKN/P7aVSU52AD8u25St1WprvxpM4TtZiG
2O9X1Unx+wtco2P8G1M4qcuWqPDdITn4n19DcR7rhuACjUo2poFTlnl9lfEsW11R
5sDfYlgc3n8a4Iw49Ea4GaLkSEMluOfB9eOLUQIDAQABAoIBAAeZmpVTN7uFjyLg
YrEZ6cGXPcbJw9k8zhr3/tM4q+hf/7+WBuWkEtCGR5xO7Ev73XFs3u6IL9QLkKL4
z4YefgypqeO/0YB4zJckdLhqpTRZCxOiEhfpCuI1MDLrycgQD/uJSenHIQgKI/+a
cH7Ffgq7Kp0V22vu4HVVLcCsJxvlIxFd92xCKFl8zRHBdyKikfvZAEidbMu9xdsW
S9DzFCveCGrE8g6HWQyXiCpq2xb4b2C37O+0iZRtYfJQSCrnG99Y/KfWIVbb+3gU
5WbIlYm57TKzMGgKc3LWtGCWxfB/NNP5wOxR+4y78oWDzTibrT5OZDsX2S+mbgNB
wAo/0U8CgYEAxHAOrlz9Ae2kYfyUgx9JTonElIFlDmDVdYcRW8Go7xpeMZ+XG2sR
f/za6t6jiCxI9FSD5gl4nDyOVhx5zRpu2QZvZBHICaWDwEmZC+d3suYtQY/ixR3K
3sdDKK6wzOtta+OBVNPQWAW2rmTr/J1JobguflM0NBm+YZC02gQyL9sCgYEA+1DU
llDGDaU08WQNTLRgW+1RAbzsBTFd+DhvbYM8+mgmlFzHKHJP3jCpwLZmqdBzLl0R
wUZBwpZ5MnkiQV0e9AW4/tnqBw8n9pf+NgNqcssw8MEMXHPbLNwr7OVS/LG8VNOm
LbuLjxq8O8wfbS87eBj2D18c1x4voEIw1AWYn0MCgYEAnPBF2moyPMMmjJ5l7Ggn
ghaxNlA2c4lLoOz7IkqTdAul65FsARzGS3GxWOnsztNKqeGHy1YPxQrgUM3JReLz
YnIwtks6fPJ+Uza5jngr+oLI71NMQl1uAhRChJMkb2M79XE6l5HuJxTRgXzhyN3E
wO5MPuKsl19l6b7ZrkCh8/cCgYEAjIL6+TgcI9D0suo/zV0kawFaw1//jj+1zGyx
UEeKNm848saUy3ZuVUpb/tV8vQFBBPEgVjGT3toG1UOI9Ya9Ia55anQoNt4wd90v
Ur/CKoCU0mb9JEvahVBsdr0ZExPEuqDDTtqHAvHtwHk2MPOxikpaeOmy1EuaUT3w
0vp2BMUCgYEAsLL592l8pclhxk2b0lmgvhPLOmZuQ7QkcnMMyYCeUr9Kt95VN40J
N/LK9LIbf/l9CUN4eO1JqCJkAiMIW2Gvumw3g+TMj+nqcfsufSHJCG1EZNYMUftG
aL7KtccPyFwotMD/P+OaAeJimwuC5247hCep1SSf1A41gbdmutiirM4=
-----END RSA PRIVATE KEY-----

View File

@ -36,12 +36,15 @@ def Send(client, string, ln=True):
else:
channel.send(replace_enter_with_crlf(str(string)))
def Clear(client, oldclear=False):
def Clear(client, oldclear=False, keep=False):
sx, sy = client["windowsize"]["width"], client["windowsize"]["height"]
if oldclear:
for x in range(sy):
Send(client, '\b \b' * sx, ln=False) # Send newline after each line
elif keep:
Send(client, "\033[2J", ln=False)
Send(client, "\033[H", ln=False)
else:
Send(client, "\033[3J", ln=False)
Send(client, "\033[1J", ln=False)

View File

@ -36,7 +36,7 @@ from .system.SFTP import SSHSFTPServer
from .system.interface import Sinterface
from .interactive import *
from .system.inputsystem import expect
from .system.info import system_banner, __version__
from .system.info import __version__
# paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow(2, 22)
@ -45,14 +45,13 @@ sftpclient = ["WinSCP", "Xplore"]
logger = logging.getLogger("PyserSSH")
class Server:
def __init__(self, accounts, system_message=True, disable_scroll_with_arrow=True, sftp=True, sftproot=os.getcwd(), system_commands=True, compression=True, usexternalauth=False, history=True, inputsystem=True, XHandler=None, title=f"PyserSSH v{__version__}", inspeed=32768):
def __init__(self, accounts, system_message=True, disable_scroll_with_arrow=True, sftp=False, sftproot=os.getcwd(), system_commands=True, compression=True, usexternalauth=False, history=True, inputsystem=True, XHandler=None, title=f"PyserSSH v{__version__}", inspeed=32768):
"""
A simple SSH server
"""
self._event_handlers = {}
self.sysmess = system_message
self.client_handlers = {} # Dictionary to store event handlers for each client
self.current_users = {} # Dictionary to store current_user for each connected client
self.accounts = accounts
self.disable_scroll_with_arrow = disable_scroll_with_arrow
self.sftproot = sftproot
@ -66,7 +65,9 @@ class Server:
self.title = title
self.inspeed = inspeed
self.system_banner = system_banner
self.__processmode = None
self.__serverisrunning = False
self.__server_stopped = threading.Event() # Event to signal server stop
if self.enasyscom:
print("\033[33m!!Warning!! System commands is enable! \033[0m")
@ -104,7 +105,6 @@ class Server:
SSHSFTPServer.CLIENTHANDELES = self.client_handlers
bh_session.set_subsystem_handler('sftp', paramiko.SFTPServer, SSHSFTPServer)
if self.compressena:
bh_session.use_compression(True)
else:
@ -139,13 +139,17 @@ class Server:
"connecttype": None,
"last_login_time": None,
"windowsize": {},
"x11": {}
"x11": {},
"prompt": None,
"inputbuffer": None,
"peername": peername
}
client_handler = self.client_handlers[peername]
client_handler["current_user"] = server.current_user
client_handler["channel"] = channel # Update the channel attribute for the client handler
client_handler["last_activity_time"] = time.time()
client_handler["last_login_time"] = time.time()
client_handler["prompt"] = self.accounts.get_prompt(server.current_user)
self.accounts.set_user_last_login(self.client_handlers[channel.getpeername()]["current_user"], peername[0])
@ -154,10 +158,11 @@ class Server:
while self.client_handlers[channel.getpeername()]["windowsize"] == {}:
pass
channel.send(f"\033]0;{self.title}\007".encode())
userbanner = self.accounts.get_banner(self.client_handlers[channel.getpeername()]["current_user"])
if self.sysmess:
channel.sendall(replace_enter_with_crlf(self.system_banner))
if self.sysmess or userbanner != None:
channel.send(f"\033]0;{self.title}\007".encode())
channel.sendall(replace_enter_with_crlf(userbanner))
channel.sendall(replace_enter_with_crlf("\n"))
try:
@ -172,9 +177,9 @@ class Server:
channel.setblocking(False)
channel.settimeout(self.accounts.get_user_timeout(self.client_handlers[channel.getpeername()]["current_user"]))
channel.send(replace_enter_with_crlf(self.accounts.get_prompt(self.client_handlers[channel.getpeername()]["current_user"]) + " ").encode('utf-8'))
channel.send(replace_enter_with_crlf(self.client_handlers[channel.getpeername()]["prompt"] + " ").encode('utf-8'))
while True:
expect(self, channel, peername)
expect(self, self.client_handlers[channel.getpeername()])
except KeyboardInterrupt:
self._handle_event("disconnected", self.client_handlers[peername]["current_user"])
channel.close()
@ -206,6 +211,7 @@ class Server:
channel = client_handler.get("channel")
if channel:
channel.close()
self.__serverisrunning = True
self.server.close()
logger.info("Server stopped.")
except Exception as e:
@ -213,23 +219,34 @@ class Server:
def _start_listening_thread(self):
try:
self.server.listen(10)
logger.info("Start Listening for connections...")
while True:
while self.__serverisrunning:
client, addr = self.server.accept()
if self.__processmode == "thread":
client_thread = threading.Thread(target=self.handle_client, args=(client, addr))
client_thread.start()
else:
self.handle_client(client, addr)
except Exception as e:
logger.error(e)
def run(self, private_key_path, host="0.0.0.0", port=2222):
def run(self, private_key_path, host="0.0.0.0", port=2222, mode="thread", maxuser=0, daemon=False):
"""mode: single, thread"""
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.server.bind((host, port))
self.private_key = paramiko.RSAKey(filename=private_key_path)
if maxuser == 0:
self.server.listen()
else:
self.server.listen(maxuser)
self.__processmode = mode.lower()
self.__serverisrunning = True
client_thread = threading.Thread(target=self._start_listening_thread)
client_thread.daemon = daemon
client_thread.start()
def kickbyusername(self, username, reason=None):

View File

@ -25,7 +25,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
__version__ = "4.2"
__version__ = "4.4"
system_banner = (
f"\033[36mPyserSSH V{__version__} \033[0m\n"

View File

@ -35,12 +35,14 @@ from .syscom import systemcommand
logger = logging.getLogger("PyserSSH")
def expect(self, chan, peername, echo=True):
def expect(self, client, echo=True):
buffer = bytearray()
cursor_position = 0
outindexall = 0
history_index_position = 0 # Initialize history index position outside the loop
currentuser = self.client_handlers[chan.getpeername()]
chan = client["channel"]
peername = client["peername"]
try:
while True:
try:
@ -66,6 +68,10 @@ def expect(self, chan, peername, echo=True):
buffer = buffer[:cursor_position - 1] + buffer[cursor_position:]
cursor_position -= 1
outindexall -= 1
if cursor_position != outindexall:
chan.sendall(b"\b \b")
chan.sendall(buffer[cursor_position:])
else:
chan.sendall(b"\b \b")
else:
chan.sendall(b"\x07")
@ -76,18 +82,22 @@ def expect(self, chan, peername, echo=True):
# Right arrow key, move cursor right if not at the end
if cursor_position < len(buffer):
chan.sendall(b'\x1b[C')
cursor_position += 1
# cursor_position += 1
cursor_position = min(len(buffer), cursor_position + 1)
elif arrow_key == b'D':
# Left arrow key, move cursor left if not at the beginning
if cursor_position > 0:
chan.sendall(b'\x1b[D')
cursor_position -= 1
elif self.history:
# cursor_position -= 1
cursor_position = max(0, cursor_position - 1)
if self.history:
if arrow_key == b'A':
if history_index_position == 0:
command = self.accounts.get_lastcommand(currentuser["current_user"])
command = self.accounts.get_lastcommand(client["current_user"])
else:
command = self.accounts.get_history(currentuser["current_user"], history_index_position)
command = self.accounts.get_history(client["current_user"], history_index_position)
# Clear the buffer
for i in range(cursor_position):
@ -105,9 +115,9 @@ def expect(self, chan, peername, echo=True):
elif arrow_key == b'B':
if history_index_position != -1:
if history_index_position == 0:
command = self.accounts.get_lastcommand(currentuser["current_user"])
command = self.accounts.get_lastcommand(client["current_user"])
else:
command = self.accounts.get_history(currentuser["current_user"], history_index_position)
command = self.accounts.get_history(client["current_user"], history_index_position)
# Clear the buffer
for i in range(cursor_position):
@ -139,8 +149,10 @@ def expect(self, chan, peername, echo=True):
self._handle_event("ontype", self.client_handlers[chan.getpeername()], byte)
if echo:
if outindexall != cursor_position:
chan.sendall(b" ")
chan.sendall(b'\033[s')
chan.sendall(byte + buffer[cursor_position:])
chan.sendall(f"\033[{cursor_position}G".encode())
chan.sendall(b'\033[u')
else:
chan.sendall(byte)
@ -149,13 +161,15 @@ def expect(self, chan, peername, echo=True):
cursor_position += 1
outindexall += 1
client["inputbuffer"] = buffer
if echo:
chan.sendall(b'\r\n')
command = str(buffer.decode('utf-8')).strip()
if self.history and command.strip() != "" and self.accounts.get_lastcommand(currentuser["current_user"]) != command:
self.accounts.add_history(currentuser["current_user"], command)
if self.history and command.strip() != "" and self.accounts.get_lastcommand(client["current_user"]) != command:
self.accounts.add_history(client["current_user"], command)
if command.strip() != "":
if self.accounts.get_user_timeout(self.client_handlers[chan.getpeername()]["current_user"]) != None:
@ -164,25 +178,25 @@ def expect(self, chan, peername, echo=True):
try:
if self.enasyscom:
sct = systemcommand(currentuser, command)
sct = systemcommand(client, command)
else:
sct = False
if not sct:
if self.XHandler != None:
self._handle_event("beforexhandler", currentuser, command)
self._handle_event("beforexhandler", client, command)
self.XHandler.call(currentuser, command)
self.XHandler.call(client, command)
self._handle_event("afterxhandler", currentuser, command)
self._handle_event("afterxhandler", client, command)
else:
self._handle_event("command", currentuser, command)
self._handle_event("command", client, command)
except Exception as e:
self._handle_event("error", currentuser, e)
self._handle_event("error", client, e)
try:
chan.send(replace_enter_with_crlf(self.accounts.get_prompt(currentuser["current_user"]) + " ").encode('utf-8'))
chan.send(replace_enter_with_crlf(client["prompt"] + " ").encode('utf-8'))
except:
logger.error("Send error")

View File

@ -26,13 +26,11 @@ SOFTWARE.
"""
import shlex
from ..interactive import *
from ..interactive import Send, Clear, Title
def systemcommand(client, command):
channel = client["channel"]
if command == "whoami":
Send(channel, client["current_user"])
Send(client, client["current_user"])
return True
elif command.startswith("title"):
args = shlex.split(command)
@ -40,7 +38,7 @@ def systemcommand(client, command):
Title(client, title)
return True
elif command == "exit":
channel.close()
client["channel"].close()
return True
elif command == "clear":
Clear(client)