From 099abe860cf839e477a45f43bda405c0408538fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Gr=C3=B3dek?= Date: Fri, 3 Nov 2023 16:22:05 +0100 Subject: [PATCH] Add python REPL running inside docker container * communication via HTTP --- .pre-commit-config.yaml | 2 +- pyproject.toml | 2 +- src/ds_pycontain/data/python_runner.py | 147 ++++++++++++++++ src/ds_pycontain/python_dockerized_repl.py | 194 +++++++++++++++++++++ tests/test_python_docerized_repl.py | 56 ++++++ 5 files changed, 399 insertions(+), 2 deletions(-) create mode 100644 src/ds_pycontain/data/python_runner.py create mode 100644 src/ds_pycontain/python_dockerized_repl.py create mode 100644 tests/test_python_docerized_repl.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4469ee1..b0ac193 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -44,7 +44,7 @@ repos: args: [] # You can add additional plugins for mypy below # such as types-python-dateutil - additional_dependencies: [] + additional_dependencies: ["types-requests"] exclude: (/test_|setup.py|/tests/|docs/) # Sort imports alphabetically, and automatically separated into sections and by type. diff --git a/pyproject.toml b/pyproject.toml index b210cd5..d75cc10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -119,7 +119,7 @@ max-locals=20 min-similarity-lines=10 [tool.bandit] -exclude_dirs = ["venv",] +exclude_dirs = ["venv", "src/ds_pycontain/data"] # B101 disables errors for asserts in the code # remember to not use asserts for security and control flows skips = ["B101"] diff --git a/src/ds_pycontain/data/python_runner.py b/src/ds_pycontain/data/python_runner.py new file mode 100644 index 0000000..11dabfc --- /dev/null +++ b/src/ds_pycontain/data/python_runner.py @@ -0,0 +1,147 @@ +"""Implementation of the server which will be copied to docker container. +You should not run it for other purposes than testing. +""" +# CAREFUL! This file is copied to docker container and executed there. +# it is based only on python standard library as other dependencies +# are not available in the container. See python_docker_repl.py for more info +# and check _get_dockerfile_content() function. + +import ast +import http.server +import json +import sys +from contextlib import redirect_stdout +from io import StringIO +from typing import Any, Dict, Optional + +REPL_GLOBALS: Dict[str, Any] = {} + + +def _run_ast(code: str, globals_: Optional[Dict] = None, locals_: Optional[Dict] = None) -> str: + """It execs code with intention of capturing the result of the last line, + similar to output of python REPL if you type a command. + + :param code: code to execute. + :param globals_: globals dictionary to use. + :param locals_: locals dictionary to use. + :return: python code evaluation result or error message. + """ + tree = ast.parse(code) + module = ast.Module(tree.body[:-1], type_ignores=[]) + exec(ast.unparse(module), globals_, locals_) # type: ignore # pylint: disable=exec-used + module_end = ast.Module(tree.body[-1:], type_ignores=[]) + module_end_str = ast.unparse(module_end) # type: ignore + io_buffer = StringIO() + try: + with redirect_stdout(io_buffer): + ret = eval(module_end_str, globals_, locals_) # pylint: disable=eval-used + if ret is None: + return io_buffer.getvalue() + return ret + except Exception: # pylint: disable=broad-except + with redirect_stdout(io_buffer): + exec(module_end_str, globals_, locals_) # pylint: disable=exec-used + return io_buffer.getvalue() + + +def run_ast(code: str, globals_: Optional[Dict] = None, locals_: Optional[Dict] = None) -> str: + """It is a wrapper around _run_ast that catches exceptions so it + behaves as run_code. + + :param code: code to execute. + :param globals_: globals dictionary to use. + :param locals_: locals dictionary to use. + :return: python evaluation result or error message. + """ + try: + return _run_ast(code, globals_, locals_) + except Exception as ex: # pylint: disable=broad-except + return repr(ex.with_traceback(None)) + + +def run_code(code: str, globals_: Optional[Dict] = None, locals_: Optional[Dict] = None) -> str: + """Executes code and returns captured stdout. or error message. + + :param code: code to execute. + :param globals_: globals dictionary to use. + :param locals_: locals dictionary to use. + :return: captured stdout or error message. + """ + old_stdout = sys.stdout + sys.stdout = new_stdout = StringIO() + try: + exec(code, globals_, locals_) # pylint: disable=exec-used + return new_stdout.getvalue() + except Exception as ex: # pylint: disable=broad-except + return repr(ex.with_traceback(None)) + finally: + sys.stdout = old_stdout + + +class PythonREPLService(http.server.BaseHTTPRequestHandler): + """Simple python REPL server - http server that accepts json requests and + returns json responses. + NOTE: this object is created for each request so it is stateless. + """ + + def do_GET(self) -> None: # pylint: disable=invalid-name + """Handles GET requests and serves as health check and + hello banner. + """ + self.send_response(200, "OK") + self.end_headers() + self.wfile.write(b"Hello! I am a python REPL server.") + + def do_POST(self) -> None: # pylint: disable=invalid-name + """Handles POST requests for code execution.""" + length = int(self.headers.get("content-length", 0)) + if length == 0: + self.send_response(400, "Bad Request") + self.end_headers() + self.wfile.write(b"Empty input.") + return + data = self.rfile.read(length) + + try: + cmd_json = json.loads(data) + except json.JSONDecodeError as exc: + self.send_response(400, "Bad Request") + self.end_headers() + self.wfile.write(f"Failed to parse input: {exc}".encode()) + # inform base class that connection should be closed + self.close_connection = True # pylint: disable=attribute-defined-outside-init + return + + if "code" in cmd_json: + use_ast = cmd_json.get("use_ast", False) + executor = run_ast if use_ast else run_code + result = str(executor(cmd_json["code"], REPL_GLOBALS)) + self.send_response(200, "OK") + self.end_headers() + self.wfile.write(json.dumps({"result": result}).encode("utf-8")) + + else: + self.send_response(400, "Bad Request") + self.end_headers() + self.wfile.write(b"Invalid input format.") + + +def run_server() -> None: + """Runs http server that accepts json requests and returns json responses.""" + http.server.HTTPServer.allow_reuse_address = True + # NOTE: 8080 is internal and important hardcoded port number to match + # python_docker_repl.py, do not change it. + httpd = http.server.HTTPServer(("", 8080), PythonREPLService) + try: + httpd.serve_forever() + except KeyboardInterrupt: + httpd.server_close() + + +def run() -> None: + """Runs infinite loop of python REPL.""" + run_server() + + +if __name__ == "__main__": + run() diff --git a/src/ds_pycontain/python_dockerized_repl.py b/src/ds_pycontain/python_dockerized_repl.py new file mode 100644 index 0000000..79730ed --- /dev/null +++ b/src/ds_pycontain/python_dockerized_repl.py @@ -0,0 +1,194 @@ +import json +import shutil +import time +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Any, Dict, Optional + +import requests + +from ds_pycontain.docker_containers import DockerContainer, DockerImage + + +def _get_dockerfile_content(base_image: str, script_path: str) -> str: + """Generates on the fly dockerfile for python runner. + It is used to build docker image for python REPL server. + + :param base_image: base image to use for the docker image. + :param script_path: path to python_docker_repl_runner.py script. + :return: dockerfile content as string. + """ + return f"""FROM {base_image} +RUN pip install --no-cache-dir pydantic==1.10.12 + +RUN adduser -D runner +USER runner + +WORKDIR /app + +COPY {script_path} /app/python_runner.py +# Ensure python output is not buffered to remove logs delay +ENV PYTHONUNBUFFERED=1 +EXPOSE 8080 +ENTRYPOINT ["python3", "/app/python_runner.py"] +""" + + +def _build_or_use_docker_image( + base_image: str = "python:3.11-alpine3.18", +) -> DockerImage: + """Builds docker image from data/python_runner.py script + and docker template. + + :param base_image: base image to use for the docker image. + :return: docker image. + """ + + # we autogenerate deterministic name for the image + name = f"ds_pycontain_pyrepl_{base_image}" + script_name = "python_runner.py" + # workaround for https://github.com/docker/docker-py/issues/2105 + # which fails to use in-memory dockerfile with passing docker build + # context. It requires to pass directory name. + with TemporaryDirectory() as tmpdir: + runner_script = Path(__file__).parent / "data" / script_name + assert runner_script.exists() + shutil.copy(runner_script, tmpdir) + dockerfile_content = _get_dockerfile_content(base_image, script_name) + dockerfile = Path(tmpdir) / "Dockerfile" + with dockerfile.open("w") as f: + f.write(dockerfile_content) + return DockerImage.from_dockerfile(dockerfile.parent, name=name) + + +class PythonContainerREPL: + """This class is a wrapper around the docker container that runs the python + REPL server. It is used to execute python code in the container and return + the results. + + It assumes specific docker image is used which runs langchain python runner + server and it communicates by HTTP requests.""" + + def __init__( + self, + port: int = 7123, + image: Optional[DockerImage] = None, + base_image: str = "python:3.11-alpine3.18", + **kwargs: Dict[str, Any], + ) -> None: + """Starts docker container with python REPL server and wait till it + gets operational. + + If image is not provided it will build based on + the base_image and python_docker_repl_runner.py script. + + All other params: **kwargs are passed to DockerContainer constructor, + however port mapping is hardcoded to map docker's 8080 to provided port. + You can use it to limit memory/cpu etc. of the container. + + :param port: port to use for the python REPL server. + :param image: docker image to use for the container. + :param base_image: base image to use for the docker image. + :param kwargs: additional params to pass to DockerContainer constructor. + """ + # for now use the image we created. + self.port = port + if image is None and not base_image: + raise ValueError("Either image or base_image must be provided.") + self.image = image if image is not None else _build_or_use_docker_image(base_image) + self.container = DockerContainer(self.image, ports={"8080/tcp": port}, **kwargs) + # we need to start non-lexical scope lifetime for container + # usually with statement should be used. + # __del__ will close container. + self.container.unsafe_start() + self.session = requests.Session() + # we need to ensure container is running and REPL server is + # ready to accept requests, otherwise we might get connection + # refused due to race conditions. + self._wait_for_container_running() + self._wait_for_repl_ready() + + def _wait_for_container_running(self, timeout: float = 3.0) -> None: + """Sleep until container is running or timeout is reached. + + :param timeout: timeout in seconds. + :raises TimeoutError: if timeout is reached. + """ + status = self.container.docker_container.status + while status not in ("created", "running"): + time.sleep(0.1) + timeout -= 0.1 + if timeout < 0: + raise TimeoutError(f"Failed to start container - status={status}") + + def _wait_for_repl_ready(self, timeout: float = 3.0) -> None: + """Sleep until REPL server is ready to accept requests or timeout is reached. + + :param timeout: timeout in seconds. + :raises TimeoutError: if timeout is reached.""" + while True: + try: + banner = self.session.get(f"http://localhost:{self.port}") + if banner.text != "Hello! I am a python REPL server.": + raise ValueError("Unrecognized banner, it is not a langchain python REPL server.") + break + except Exception as ex: # pylint: disable=broad-except + time.sleep(0.1) + timeout -= 0.1 + if timeout < 0: + raise TimeoutError("Failed to boot service. Timed out.") from ex + + def __del__(self) -> None: + """Closes container and removes it.""" + self.container.unsafe_exit() + + def _exec(self, code: str, use_ast: bool = True) -> str: + """Executes code and returns captured stdout. or error message. + + :param code: code to execute. + :param use_ast: if True, use ast module to parse code, otherwise use eval. + :return: stdout or error message. + """ + try: + msg = {"code": code, "use_ast": 1 if use_ast else 0} + result = self.session.post(f"http://localhost:{self.port}", json=msg) + except Exception as ex: # pylint: disable=broad-except + return repr(ex.with_traceback(None)) + data = result.text + if not data: + return "" + output = json.loads(data) + return output.get("result", "") + + def eval(self, code: str) -> str: + """Evaluate code and return result as string. + + :param code: code to evaluate. + :return: result as string. + """ + return self._exec(code, use_ast=True) + + def exec(self, code: str) -> str: + """Execute code and return stdout. + + :param code: code to execute. + :return: result as string. + """ + return self._exec(code, use_ast=False) + + def run(self, command: str, timeout: Optional[int] = None) -> str: + """Run command and returns anything printed. + Timeout, if provided, is not currently supported and will be ignored. + + :param command: command to run. + :param timeout: timeout in seconds. + :return: result from REPL as output. + """ + + # potentially add a warning or log message if a timeout is provided, + # as it's not supported in the current implementation. + if timeout is not None: + print("Warning: timeout is not supported in the current implementation.") + + # exec method is used here as it will execute the command and return stdout. + return self.exec(command) diff --git a/tests/test_python_docerized_repl.py b/tests/test_python_docerized_repl.py new file mode 100644 index 0000000..71f2cfa --- /dev/null +++ b/tests/test_python_docerized_repl.py @@ -0,0 +1,56 @@ +# NOTE: ports are set to be different in each test so they don't collide +# as sockets might not be available immediately after container is stopped. +# It means that tests might also fail to run if port is not freed in time! +from ds_pycontain.python_dockerized_repl import PythonContainerREPL + + +def test_python_container_repl_can_be_started() -> None: + repl = PythonContainerREPL(port=7120) + assert repl is not None + del repl + + +def test_python_container_repl_works() -> None: + repl = PythonContainerREPL(port=7121) + out1 = repl.exec("x = [1, 2, 3]") + assert out1 == "" + out2 = repl.eval("len(x)") + assert out2 == "3" + out3 = repl.exec("len(x)") + assert out3 == "" + out4 = repl.exec("print(len(x))") + assert out4 == "3\n" + + err = repl.exec("print(x") + assert "SyntaxError" in err + + err2 = repl.eval("print(x") + assert "SyntaxError" in err2 + + +def test_python_container_exec_code() -> None: + repl = PythonContainerREPL(port=7122) + code = """def fib(n): + if n < 2: + return n + return fib(n-1) + fib(n-2) +print(fib(6)) +""" + out = repl.exec(code) + assert out == "8\n" + out2 = repl.exec("print(fib(5))") + assert out2 == "5\n" + + +def test_python_container_ast_code() -> None: + repl = PythonContainerREPL(port=7123) + code = """def fib(n): + if n < 2: + return n + return fib(n-1) + fib(n-2) +fib(6) +""" + out = repl.eval(code) + assert out == "8" + out2 = repl.eval("fib(5)") + assert out2 == "5"