-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
00460f8
commit 536f165
Showing
1 changed file
with
56 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,15 @@ | ||
# -*- coding: utf-8 -*- | ||
__author__ = 'Michal Ernst, Marcin Usielski, Tomasz Krol' | ||
__copyright__ = 'Copyright (C) 2018-2022, Nokia' | ||
__email__ = '[email protected], [email protected], [email protected]' | ||
__author__ = "Michal Ernst, Marcin Usielski, Tomasz Krol" | ||
__copyright__ = "Copyright (C) 2018-2024, Nokia" | ||
__email__ = "[email protected], [email protected], [email protected]" | ||
|
||
import codecs | ||
import re | ||
import select | ||
import datetime | ||
import logging | ||
from threading import Event | ||
import threading | ||
import time | ||
|
||
from ptyprocess import PtyProcessUnicode # Unix-only | ||
|
||
|
@@ -27,10 +28,19 @@ class ThreadedTerminal(IOConnection): | |
ThreadedTerminal is shell working under Pty | ||
""" | ||
|
||
def __init__(self, moler_connection, cmd="/bin/bash", select_timeout=0.002, | ||
read_buffer_size=4096, first_prompt=r'[%$#]+', target_prompt=r'moler_bash#', | ||
set_prompt_cmd='export PS1="moler_bash# "\n', dimensions=(100, 300), terminal_delayafterclose=0.2): | ||
""" # TODO: # 'export PS1="moler_bash\\$ "\n' would give moler_bash# for root and moler_bash$ for user | ||
def __init__( | ||
self, | ||
moler_connection, | ||
cmd="/bin/bash", | ||
select_timeout=0.002, | ||
read_buffer_size=4096, | ||
first_prompt=r"[%$#]+", | ||
target_prompt=r"moler_bash#", | ||
set_prompt_cmd='export PS1="moler_bash# "\n', | ||
dimensions=(100, 300), | ||
terminal_delayafterclose=0.2, | ||
): | ||
"""# TODO: # 'export PS1="moler_bash\\$ "\n' would give moler_bash# for root and moler_bash$ for user | ||
:param moler_connection: Moler's connection to join with | ||
:param cmd: command to run terminal | ||
:param select_timeout: timeout for reading data from terminal | ||
|
@@ -42,10 +52,12 @@ def __init__(self, moler_connection, cmd="/bin/bash", select_timeout=0.002, | |
:param terminal_delayafterclose: delay for checking if terminal was properly closed | ||
""" | ||
super(ThreadedTerminal, self).__init__(moler_connection=moler_connection) | ||
self.debug_hex_on_non_printable_chars = False # Set True to log incoming non printable chars as hex. | ||
self.debug_hex_on_non_printable_chars = ( | ||
False # Set True to log incoming non printable chars as hex. | ||
) | ||
self.debug_hex_on_all_chars = False # Set True to log incoming data as hex. | ||
self._terminal = None | ||
self._shell_operable = Event() | ||
self._shell_operable: threading.Event = threading.Event() | ||
self._export_sent = False | ||
self.pulling_thread = None | ||
self.read_buffer = "" | ||
|
@@ -57,7 +69,9 @@ def __init__(self, moler_connection, cmd="/bin/bash", select_timeout=0.002, | |
self.target_prompt = target_prompt | ||
self._cmd = [cmd] | ||
self.set_prompt_cmd = set_prompt_cmd | ||
self._re_set_prompt_cmd = re.sub("['\"].*['\"]", "", self.set_prompt_cmd.strip()) | ||
self._re_set_prompt_cmd = re.sub( | ||
"['\"].*['\"]", "", self.set_prompt_cmd.strip() | ||
) | ||
self._terminal_delayafterclose = terminal_delayafterclose | ||
|
||
def open(self): | ||
|
@@ -66,26 +80,34 @@ def open(self): | |
|
||
if not self._terminal: | ||
self.moler_connection.open() | ||
self._terminal = PtyProcessUnicode.spawn(self._cmd, dimensions=self.dimensions) | ||
self._terminal = PtyProcessUnicode.spawn( | ||
self._cmd, dimensions=self.dimensions | ||
) | ||
self._terminal.delayafterclose = self._terminal_delayafterclose | ||
# need to not replace not unicode data instead of raise exception | ||
self._terminal.decoder = codecs.getincrementaldecoder('utf-8')(errors='replace') | ||
|
||
done = Event() | ||
self.pulling_thread = TillDoneThread(target=self.pull_data, | ||
done_event=done, | ||
kwargs={'pulling_done': done}) | ||
self._terminal.decoder = codecs.getincrementaldecoder("utf-8")( | ||
errors="replace" | ||
) | ||
|
||
done = threading.Event() | ||
self.pulling_thread = TillDoneThread( | ||
target=self.pull_data, done_event=done, kwargs={"pulling_done": done} | ||
) | ||
self.pulling_thread.start() | ||
retry = 0 | ||
is_operable = False | ||
|
||
while (retry < 10) and (not is_operable): | ||
timeout = 4 * 60 | ||
start_time = time.monotonic() | ||
|
||
while (time.monotonic() - start_time <= timeout) and (not is_operable): | ||
is_operable = self._shell_operable.wait(timeout=1) | ||
if not is_operable: | ||
buff = self.read_buffer.encode("UTF-8", "replace") | ||
self.logger.warning( | ||
f"Terminal open but not fully operable yet.\nREAD_BUFFER: '{buff}'") | ||
self._terminal.write('\n') | ||
f"Terminal open but not fully operable yet. Try {retry} after {time.monotonic() - start_time:.2f} s\nREAD_BUFFER: '{buff}'" | ||
) | ||
self._terminal.write("\n") | ||
retry += 1 | ||
|
||
return ret | ||
|
@@ -125,7 +147,9 @@ def pull_data(self, pulling_done): | |
if next(heartbeat): | ||
logging.getLogger("moler_threads").debug(f"ALIVE {self}") | ||
try: | ||
reads, _, _ = select.select([self._terminal.fd], [], [], self._select_timeout) | ||
reads, _, _ = select.select( | ||
[self._terminal.fd], [], [], self._select_timeout | ||
) | ||
except ValueError as exc: | ||
self.logger.warning(f"'{exc.__class__}: {exc}'") | ||
self._notify_on_disconnect() | ||
|
@@ -137,7 +161,9 @@ def pull_data(self, pulling_done): | |
if self.debug_hex_on_all_chars: | ||
self.logger.debug(f"incoming data: '{all_chars_to_hex(data)}'.") | ||
if self.debug_hex_on_non_printable_chars: | ||
self.logger.debug(f"incoming data: '{non_printable_chars_to_hex(data)}'.") | ||
self.logger.debug( | ||
f"incoming data: '{non_printable_chars_to_hex(data)}'." | ||
) | ||
|
||
if self._shell_operable.is_set(): | ||
self.data_received(data=data, recv_time=datetime.datetime.now()) | ||
|
@@ -154,11 +180,15 @@ def _verify_shell_is_operable(self, data): | |
|
||
for line in lines: | ||
line = remove_all_known_special_chars(line) | ||
if not re.search(self._re_set_prompt_cmd, line) and re.search(self.target_prompt, line): | ||
if not re.search(self._re_set_prompt_cmd, line) and re.search( | ||
self.target_prompt, line | ||
): | ||
self._notify_on_connect() | ||
self._shell_operable.set() | ||
data = re.sub(self.target_prompt, '', self.read_buffer, re.MULTILINE) | ||
data = re.sub(self.target_prompt, "", self.read_buffer, re.MULTILINE) | ||
self.data_received(data=data, recv_time=datetime.datetime.now()) | ||
elif not self._export_sent and re.search(self.first_prompt, self.read_buffer, re.MULTILINE): | ||
elif not self._export_sent and re.search( | ||
self.first_prompt, self.read_buffer, re.MULTILINE | ||
): | ||
self.send(self.set_prompt_cmd) | ||
self._export_sent = True |