From f82f523305ab306681727c985f72dfb655438422 Mon Sep 17 00:00:00 2001 From: elicn Date: Sun, 22 Aug 2021 20:10:19 +0300 Subject: [PATCH 1/4] Implement standard streams as QlOs properties instead of Qiling's --- qiling/core.py | 44 +----------------------- qiling/os/os.py | 65 +++++++++++++++++++++++++++--------- qiling/os/posix/posix.py | 30 +++++++++++++---- qiling/os/windows/windows.py | 9 ----- 4 files changed, 74 insertions(+), 74 deletions(-) diff --git a/qiling/core.py b/qiling/core.py index 9036fb3a2..039217a78 100644 --- a/qiling/core.py +++ b/qiling/core.py @@ -5,7 +5,7 @@ from configparser import ConfigParser import ntpath, os, pickle, platform -import io + # See https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports from typing import Dict, List, Union from typing import TYPE_CHECKING @@ -494,48 +494,6 @@ def internal_exception(self) -> Exception: """ return self._internal_exception - @property - def stdin(self) -> io.IOBase: - """ Stdin of the program. Can be any object which implements (even part of) io.IOBase. - - Type: io.Base - Example: - ql = Qiling(stdin=sys.stdin) - - ql.stdin = sys.stdin - """ - return self._stdin - - @stdin.setter - def stdin(self, s): - self._stdin = s - - @property - def stdout(self) -> io.IOBase: - """ Stdout of the program. Can be any object which implements (even part of) io.IOBase. - - Type: io.Base - Example: - ql = Qiling(stdout=sys.stdout) - - ql.stdout = sys.stdout - """ - return self._stdout - - @stdout.setter - def stdout(self, s): - self._stdout = s - - @property - def stderr(self) -> io.IOBase: - """ Stdout of the program. Can be any object which implements (even part of) io.IOBase. - - Type: io.Base - Example: - ql = Qiling(stderr=sys.stderr) - - ql.stderr = sys.stderr - """ - return self._stderr - - @stderr.setter - def stderr(self, s): - self._stderr = s - @property def libcache(self) -> bool: """ Whether cache dll files. Only take effect in Windows emulation. diff --git a/qiling/os/os.py b/qiling/os/os.py index 607292c48..7aedfe628 100644 --- a/qiling/os/os.py +++ b/qiling/os/os.py @@ -4,7 +4,7 @@ # import sys -from typing import Any, Iterable, Optional, Callable, Mapping, Sequence, Tuple +from typing import Any, Iterable, Optional, Callable, Mapping, Sequence, TextIO, Tuple from qiling import Qiling from qiling.const import QL_OS, QL_INTERCEPT, QL_OS_POSIX @@ -21,6 +21,12 @@ class QlOs: def __init__(self, ql: Qiling, resolvers: Mapping[Any, Resolver] = {}): self.ql = ql + + # standard streams overrides (elicn: should they be io.IOBase ?) + self._stdin: TextIO + self._stdout: TextIO + self._stderr: TextIO + self.utils = QlOsUtils(ql) self.fcall: QlFunctionCall self.fs_mapper = QlFsMapper(ql) @@ -40,22 +46,13 @@ def __init__(self, ql: Qiling, resolvers: Mapping[Any, Resolver] = {}): try: import ida_idaapi except ImportError: - self.stdin = ql_file('stdin', sys.stdin.fileno()) - self.stdout = ql_file('stdout', sys.stdout.fileno()) - self.stderr = ql_file('stderr', sys.stderr.fileno()) + self._stdin = ql_file('stdin', sys.stdin.fileno()) + self._stdout = ql_file('stdout', sys.stdout.fileno()) + self._stderr = ql_file('stderr', sys.stderr.fileno()) else: - self.stdin = sys.stdin.buffer if hasattr(sys.stdin, "buffer") else sys.stdin - self.stdout = sys.stdout.buffer if hasattr(sys.stdout, "buffer") else sys.stdout - self.stderr = sys.stderr.buffer if hasattr(sys.stderr, "buffer") else sys.stderr - - if self.ql.stdin != 0: - self.stdin = self.ql.stdin - - if self.ql.stdout != 0: - self.stdout = self.ql.stdout - - if self.ql.stderr != 0: - self.stderr = self.ql.stderr + self._stdin = getattr(sys.stdin, 'buffer', sys.stdin) + self._stdout = getattr(sys.stdout, 'buffer', sys.stdout) + self._stderr = getattr(sys.stderr, 'buffer', sys.stderr) # defult exit point self.exit_point = { @@ -88,6 +85,42 @@ def save(self): def restore(self, saved_state): pass + @property + def stdin(self) -> TextIO: + """Program's standard input stream. May be replaced by any object that implements + the `io.IOBase` interface, either fully or partially. + """ + + return self._stdin + + @property + def stdout(self) -> TextIO: + """Program's standard output stream. May be replaced by any object that implements + the `io.IOBase` interface, either fully or partially. + """ + + return self._stdout + + @property + def stderr(self) -> TextIO: + """Program's standard error stream. May be replaced by any object that implements + the `io.IOBase` interface, either fully or partially. + """ + + return self._stderr + + @stdin.setter + def stdin(self, stream: TextIO) -> None: + self._stdin = stream + + @stdout.setter + def stdout(self, stream: TextIO) -> None: + self._stdout = stream + + @stderr.setter + def stderr(self, stream: TextIO) -> None: + self._stderr = stream + def resolve_fcall_params(self, params: Mapping[str, Any]) -> Mapping[str, Any]: """Transform function call raw parameters values into meaningful ones, according to their assigned type. diff --git a/qiling/os/posix/posix.py b/qiling/os/posix/posix.py index 073951571..b0fb37573 100644 --- a/qiling/os/posix/posix.py +++ b/qiling/os/posix/posix.py @@ -4,7 +4,7 @@ # from inspect import signature, Parameter -from typing import Union, Callable +from typing import TextIO, Union, Callable from unicorn.arm64_const import UC_ARM64_REG_X8, UC_ARM64_REG_X16 from unicorn.arm_const import UC_ARM_REG_R7 @@ -54,7 +54,7 @@ def setReturnValue(self, value: int): class QlOsPosix(QlOs): def __init__(self, ql: Qiling): - super(QlOsPosix, self).__init__(ql) + super().__init__(ql) self.ql = ql self.sigaction_act = [0] * 256 @@ -100,13 +100,31 @@ def __init__(self, ql: Qiling): }[self.ql.archtype](ql) self._fd = QlFileDes([0] * NR_OPEN) - self._fd[0] = self.stdin - self._fd[1] = self.stdout - self._fd[2] = self.stderr + + # the QlOs constructor cannot assign the standard streams using their designated properties since + # it runs before the _fd array is declared. instead, it assigns them to the private members and here + # we force _fd to update manually. + self.stdin = self._stdin + self.stdout = self._stdout + self.stderr = self._stderr self._shms = {} - # ql.syscall - get syscall for all posix series + @QlOs.stdin.setter + def stdin(self, stream: TextIO) -> None: + self._stdin = stream + self._fd[0] = stream + + @QlOs.stdout.setter + def stdout(self, stream: TextIO) -> None: + self._stdout = stream + self._fd[1] = stream + + @QlOs.stderr.setter + def stderr(self, stream: TextIO) -> None: + self._stderr = stream + self._fd[2] = stream + @property def syscall(self): return self.get_syscall() diff --git a/qiling/os/windows/windows.py b/qiling/os/windows/windows.py index 6ca60d0cf..c7558bde6 100644 --- a/qiling/os/windows/windows.py +++ b/qiling/os/windows/windows.py @@ -172,15 +172,6 @@ def run(self): if self.ql.entry_point is not None: self.ql.loader.entry_point = self.ql.entry_point - if self.ql.stdin != 0: - self.stdin = self.ql.stdin - - if self.ql.stdout != 0: - self.stdout = self.ql.stdout - - if self.ql.stderr != 0: - self.stderr = self.ql.stderr - try: if self.ql.code: self.ql.emu_start(self.ql.loader.entry_point, (self.ql.loader.entry_point + len(self.ql.code)), self.ql.timeout, self.ql.count) From 9ae58618e86907f289d60caaab45068cca86461c Mon Sep 17 00:00:00 2001 From: elicn Date: Sun, 22 Aug 2021 20:10:54 +0300 Subject: [PATCH 2/4] Let Qiling constructor override standard streams --- qiling/core.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/qiling/core.py b/qiling/core.py index 039217a78..33a06a9e6 100644 --- a/qiling/core.py +++ b/qiling/core.py @@ -48,9 +48,9 @@ def __init__( filter = None, stop_on_stackpointer = False, stop_on_exit_trap = False, - stdin=0, - stdout=0, - stderr=0, + stdin=None, + stdout=None, + stderr=None, ): """ Create a Qiling instance. @@ -89,9 +89,6 @@ def __init__( ################################## # Definition after ql=Qiling() # ################################## - self._stdin = stdin - self._stdout = stdout - self._stderr = stderr self._verbose = verbose self._libcache = libcache self._patch_bin = [] @@ -229,7 +226,16 @@ def __init__( if (self.archtype not in QL_ARCH_NONEOS): if (self.archtype not in QL_ARCH_HARDWARE): self._os = os_setup(self.archtype, self.ostype, self) - + + if stdin is not None: + self._os.stdin = stdin + + if stdout is not None: + self._os.stdout = stdout + + if stderr is not None: + self._os.stderr = stderr + # Run the loader self.loader.run() From f707a364bd467886fc687c3bc9e3d5c0b40ba0fb Mon Sep 17 00:00:00 2001 From: elicn Date: Sun, 22 Aug 2021 20:12:01 +0300 Subject: [PATCH 3/4] Adjust tests and examples accordingly --- examples/crackme_x86_linux.py | 2 +- tests/test_pe.py | 18 ++++++++++-------- tests/test_windows_stdio.py | 8 ++++---- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/examples/crackme_x86_linux.py b/examples/crackme_x86_linux.py index 0847d0712..84afd26ae 100644 --- a/examples/crackme_x86_linux.py +++ b/examples/crackme_x86_linux.py @@ -74,7 +74,7 @@ def __count_instructions(ql: Qiling, address: int, size: int): hobj = self.ql.hook_code(__count_instructions) # feed stdin with input - self.ql.stdin.write(input + b'\n') + self.ql.os.stdin.write(input + b'\n') # resume emulation till function returns self.ql.run(begin=self.replay_starts, end=self.replay_ends) diff --git a/tests/test_pe.py b/tests/test_pe.py index f0133b255..519474859 100644 --- a/tests/test_pe.py +++ b/tests/test_pe.py @@ -389,8 +389,10 @@ def our_sandbox(path, rootfs): ql.patch(0x004010CD, b'\x90\x90') ql.patch(0x0040110B, b'\x90\x90') ql.patch(0x00401112, b'\x90\x90') - ql.stdin = StringBuffer() - ql.stdin.write(b"Ea5yR3versing\n") + + ql.os.stdin = StringBuffer() + ql.os.stdin.write(b"Ea5yR3versing\n") + ql.hook_address(force_call_dialog_func, 0x00401016) ql.run() del ql @@ -402,13 +404,13 @@ def test_pe_win_x86_cmdln(self): ql = Qiling( ["../examples/rootfs/x86_windows/bin/cmdln32.exe", 'arg1', 'arg2 with spaces'], "../examples/rootfs/x86_windows") - ql.stdout = TestOut() + ql.os.stdout = TestOut() ql.run() expected_string = b'\n' expected_keys = [b'_acmdln', b'_wcmdln', b'__p__acmdln', b'__p__wcmdln', b'GetCommandLineA', b'GetCommandLineW'] for key in expected_keys: - self.assertTrue(key in ql.stdout.output) - self.assertEqual(expected_string, ql.stdout.output[key]) + self.assertTrue(key in ql.os.stdout.output) + self.assertEqual(expected_string, ql.os.stdout.output[key]) del ql @@ -416,13 +418,13 @@ def test_pe_win_x8664_cmdln(self): ql = Qiling( ["../examples/rootfs/x8664_windows/bin/cmdln64.exe", 'arg1', 'arg2 with spaces'], "../examples/rootfs/x8664_windows") - ql.stdout = TestOut() + ql.os.stdout = TestOut() ql.run() expected_string = b'\n' expected_keys = [b'_acmdln', b'_wcmdln', b'GetCommandLineA', b'GetCommandLineW'] for key in expected_keys: - self.assertTrue(key in ql.stdout.output) - self.assertEqual(expected_string, ql.stdout.output[key]) + self.assertTrue(key in ql.os.stdout.output) + self.assertEqual(expected_string, ql.os.stdout.output[key]) del ql class RefreshCache(QlPeCache): diff --git a/tests/test_windows_stdio.py b/tests/test_windows_stdio.py index 1afeba341..c82b34d54 100644 --- a/tests/test_windows_stdio.py +++ b/tests/test_windows_stdio.py @@ -37,13 +37,13 @@ def instruction_count(ql, address, size, user_data): def get_count(flag): ql = Qiling(["../examples/rootfs/x86_windows/bin/crackme.exe"], "../examples/rootfs/x86_windows", verbose=QL_VERBOSE.OFF, libcache = True) - ql.stdin = StringBuffer() - ql.stdout = StringBuffer() - ql.stdin.write(bytes("".join(flag) + "\n", 'utf-8')) + ql.os.stdin = StringBuffer() + ql.os.stdout = StringBuffer() + ql.os.stdin.write(bytes("".join(flag) + "\n", 'utf-8')) count = [0] ql.hook_code(instruction_count, count) ql.run() - print(ql.stdout.read_all().decode('utf-8'), end='') + print(ql.os.stdout.read_all().decode('utf-8'), end='') print(" ============ count: %d ============ " % count[0]) return count[0] From 0471169aae8a6a00b16e8b80190a5e2369955851 Mon Sep 17 00:00:00 2001 From: elicn Date: Sun, 22 Aug 2021 20:12:56 +0300 Subject: [PATCH 4/4] Add typing annotations --- qiling/os/filestruct.py | 66 ++++++++++++++++++++--------------------- qiling/utils.py | 2 +- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/qiling/os/filestruct.py b/qiling/os/filestruct.py index 510689b3e..f8b5284d0 100644 --- a/qiling/os/filestruct.py +++ b/qiling/os/filestruct.py @@ -4,6 +4,7 @@ # import os +from typing import AnyStr from qiling.exception import * from qiling.os.posix.stat import * @@ -12,10 +13,9 @@ import fcntl except ImportError: pass -import socket class ql_file: - def __init__(self, path, fd): + def __init__(self, path: AnyStr, fd: int): self.__path = path self.__fd = fd # information for syscall mmap @@ -24,70 +24,70 @@ def __init__(self, path, fd): self._close_on_exec = 0 @classmethod - def open(self, open_path, open_flags, open_mode, dir_fd=None): + def open(cls, open_path: AnyStr, open_flags: int, open_mode: int, dir_fd: int = None): open_mode &= 0x7fffffff try: fd = os.open(open_path, open_flags, open_mode, dir_fd=dir_fd) except OSError as e: raise QlSyscallError(e.errno, e.args[1] + ' : ' + e.filename) - return self(open_path, fd) - def read(self, read_len): + return cls(open_path, fd) + + def read(self, read_len: int) -> bytes: return os.read(self.__fd, read_len) - - def write(self, write_buf): + + def write(self, write_buf: bytes) -> int: return os.write(self.__fd, write_buf) - - def fileno(self): + + def fileno(self) -> int: return self.__fd - - def lseek(self, lseek_offset, lseek_origin = os.SEEK_SET): + + def lseek(self, lseek_offset: int, lseek_origin: int = os.SEEK_SET) -> int: return os.lseek(self.__fd, lseek_offset, lseek_origin) - - def close(self): - return os.close(self.__fd) - + + def close(self) -> None: + os.close(self.__fd) + def fstat(self): return Fstat(self.__fd) - def fcntl(self, fcntl_cmd, fcntl_arg): + def fcntl(self, fcntl_cmd: int, fcntl_arg): try: return fcntl.fcntl(self.__fd, fcntl_cmd, fcntl_arg) except Exception: pass - + def ioctl(self, ioctl_cmd, ioctl_arg): try: return fcntl.ioctl(self.__fd, ioctl_cmd, ioctl_arg) except Exception: pass - def tell(self): + def tell(self) -> int: return self.lseek(0, os.SEEK_CUR) - + def dup(self): new_fd = os.dup(self.__fd) - new_ql_file = ql_file(self.__path, new_fd) - return new_ql_file - - def readline(self, end = b'\n'): - ret = b'' - while True: - c = self.read(1) - ret += c - if c == end: - break - return ret - + + return ql_file(self.__path, new_fd) + + def readline(self, end: bytes = b'\n') -> bytes: + ret = bytearray() + + while not ret.endswith(end): + ret.extend(self.read(1)) + + return bytes(ret) + @property def name(self): return self.__path @property - def close_on_exec(self): + def close_on_exec(self) -> int: return self._close_on_exec @close_on_exec.setter - def close_on_exec(self, value: int): + def close_on_exec(self, value: int) -> None: self._close_on_exec = value diff --git a/qiling/utils.py b/qiling/utils.py index 7b07a7429..4461f5144 100644 --- a/qiling/utils.py +++ b/qiling/utils.py @@ -467,7 +467,7 @@ def ql_syscall_mapping_function(ostype): return ql_get_module_function(f"qiling.os.{ostype_str.lower()}.map_syscall", "map_syscall") -def os_setup(archtype, ostype, ql): +def os_setup(archtype: QL_ARCH, ostype: QL_OS, ql): if not ql_is_valid_ostype(ostype): raise QlErrorOsType("Invalid OSType")