diff --git a/libs/langchain/langchain/utilities/python_docker_repl.py b/libs/langchain/langchain/utilities/python_docker_repl.py new file mode 100644 index 0000000000000..99f4e3b3bae98 --- /dev/null +++ b/libs/langchain/langchain/utilities/python_docker_repl.py @@ -0,0 +1,144 @@ +import time +from pathlib import Path +from typing import Optional + +import requests + +from langchain.utilities.docker_containers import DockerContainer, DockerImage + + +def _get_dockerfile_content(base_image: str, script_path: str) -> str: + return f"""FROM {base_image} +RUN pip install --no-cache-dir pydantic==1.10.12 + +RUN adduser -D runner +USER runner + +WORKDIR /app + +COPY {script_path} /app/python_runner.py +# Ensure python output is not buffered to remove logs delay +ENV PYTHONUNBUFFERED=1 +EXPOSE 8080 +ENTRYPOINT ["python3", "/app/python_runner.py"] +""" + + +def _build_or_use_docker_image( + base_image: str = "python:3.11-alpine3.18", +) -> DockerImage: + """Builds docker image from python_docker_repl_runner.py script + and docker template. Returns image object.""" + + # we autogenerate deterministic name for the image + name = f"langchain_pyrepl_{base_image}" + import shutil + from tempfile import TemporaryDirectory + + script_name = "python_docker_repl_runner.py" + # workaround for https://github.com/docker/docker-py/issues/2105 + # which fails to use in-memory dockerfile with passing docker build + # context. It requires to pass directory name. + with TemporaryDirectory() as tmpdir: + runner_script = Path(__file__).parent / script_name + assert runner_script.exists() + shutil.copy(runner_script, tmpdir) + dockerfile_content = _get_dockerfile_content(base_image, script_name) + dockerfile = Path(tmpdir) / "Dockerfile" + with dockerfile.open("w") as f: + f.write(dockerfile_content) + return DockerImage.from_dockerfile(dockerfile.parent, name=name) + + +class PythonContainerREPL: + """This class is a wrapper around the docker container that runs the python + REPL server. It is used to execute python code in the container and return + the results. + + It assumes specific docker image is used which runs langchain python runner + server and it communicates by HTTP requests.""" + + def __init__( + self, + port: int = 7123, + image: Optional[DockerImage] = None, + base_image: str = "python:3.11-alpine3.18", + **kwargs, + ) -> None: + """Starts docker container with python REPL server and wait till it + gets operational. + + If image is not provided it will build based on + the base_image and python_docker_repl_runner.py script. + + All other params: **kwargs are passed to DockerContainer constructor, + however port mapping is hardcoded to map docker's 8080 to provided port. + You can use it to limit memory/cpu etc. of the container. + """ + # for now use the image we created. + self.port = port + if image is None and not base_image: + raise ValueError("Either image or base_image must be provided.") + self.image = ( + image if image is not None else _build_or_use_docker_image(base_image) + ) + self.container = DockerContainer(self.image, ports={"8080/tcp": port}, **kwargs) + # we need to start non-lexical scope lifetime for container + # usually with statement should be used. + # __del__ will close container. + self.container.unsafe_start() + self.session = requests.Session() + # we need to ensure container is running and REPL server is + # ready to accept requests, otherwise we might get connection + # refused due to race conditions. + self._wait_for_container_running() + self._wait_for_repl_ready() + + def _wait_for_container_running(self, timeout: float = 3.0) -> None: + status = self.container.docker_container.status + while status not in ("created", "running"): + time.sleep(0.1) + timeout -= 0.1 + if timeout < 0: + raise TimeoutError(f"Failed to start container - status={status}") + + def _wait_for_repl_ready(self, timeout: float = 3.0) -> None: + while True: + try: + ex = self.session.get(f"http://localhost:{self.port}") + if ex.text != "Hello! I am a python REPL server.": + raise Exception( + "Unrecognized banner, it is not a langchain python REPL server." + ) + break + except Exception as ex: + time.sleep(0.1) + timeout -= 0.1 + if timeout < 0: + raise TimeoutError("Failed to boot service.") + + def __del__(self) -> None: + self.container.unsafe_exit() + + def _exec(self, code: str, use_ast: bool = True) -> str: + """Executes code and returns captured stdout. or error message.""" + import json + + try: + msg = {"code": code, "use_ast": 1 if use_ast else 0} + result = self.session.post(f"http://localhost:{self.port}", json=msg) + except Exception as ex: + return repr(ex.with_traceback(None)) + data = result.text + if not data: + return "" + output = json.loads(data) + return output.get("result", "") + + def eval(self, code: str) -> str: + """Evaluate code and return result as string.""" + return self._exec(code, use_ast=True) + + def exec(self, code: str) -> str: + """Execute code and return stdout.""" + return self._exec(code, use_ast=False) diff --git a/libs/langchain/langchain/utilities/python_docker_repl_runner.py b/libs/langchain/langchain/utilities/python_docker_repl_runner.py new file mode 100644 index 0000000000000..bb5ee5891b3c1 --- /dev/null +++ b/libs/langchain/langchain/utilities/python_docker_repl_runner.py @@ -0,0 +1,184 @@ +"""Implementation of the server which will be copied to docker container. +You should not run it for other purposes than testing. +""" +# CAREFUL! This file is copied to docker container and executed there. +# it is based only on python standard library and pydantic as other dependencies +# are not available in the container. See python_docker_repl.py for more info +# and check _get_dockerfile_content() function. +import ast +import http.server +import sys +from contextlib import redirect_stdout +from enum import Enum +from io import StringIO +from typing import Dict, Optional, Union + +from pydantic import BaseModel, ValidationError + + +class CommandName(Enum): + RESET = "reset" + QUIT = "quit" + + +class Cmd(BaseModel): + cmd: CommandName + + class Config: + extra = "forbid" + + +class Code(BaseModel): + code: str + use_ast: bool = False + + class Config: + extra = "forbid" + + +class InputMessage(BaseModel): + __root__: Union[Code, Cmd] + + +class OutputMessage(BaseModel): + result: str + + class Config: + extra = "forbid" + + +REPL_GLOBALS = {} +REPL_LOCALS = {} + + +def _run_ast( + code: str, globals: Optional[Dict] = None, locals: Optional[Dict] = None +) -> str: + """It execs code with intention of capturing the result of the last line, + similar to output of python REPL if you type a command. + """ + tree = ast.parse(code) + module = ast.Module(tree.body[:-1], type_ignores=[]) + exec(ast.unparse(module), globals, locals) # type: ignore + module_end = ast.Module(tree.body[-1:], type_ignores=[]) + module_end_str = ast.unparse(module_end) # type: ignore + io_buffer = StringIO() + try: + with redirect_stdout(io_buffer): + ret = eval(module_end_str, globals, locals) + if ret is None: + return io_buffer.getvalue() + else: + return ret + except Exception: + with redirect_stdout(io_buffer): + exec(module_end_str, globals, locals) + return io_buffer.getvalue() + + +def run_ast( + code: str, globals: Optional[Dict] = None, locals: Optional[Dict] = None +) -> str: + """It is a wrapper around _run_ast that catches exceptions so it + behaves as run_code. + """ + try: + return _run_ast(code, globals, locals) + except Exception as ex: + return repr(ex.with_traceback(None)) + + +def run_code( + code: str, globals: Optional[Dict] = None, locals: Optional[Dict] = None +) -> str: + """Executes code and returns captured stdout. or error message.""" + old_stdout = sys.stdout + sys.stdout = new_stdout = StringIO() + try: + exec(code, globals, locals) + return new_stdout.getvalue() + except Exception as ex: + return repr(ex.with_traceback(None)) + finally: + sys.stdout = old_stdout + + +class PythonREPLService(http.server.BaseHTTPRequestHandler): + """Simple python REPL server - http server that accepts json requests and + returns json responses. + NOTE: this object is created for each request so it is stateless. + """ + + def do_GET(self): + self.send_response(200, "OK") + self.end_headers() + self.wfile.write(b"Hello! I am a python REPL server.") + + def do_POST(self): + length = int(self.headers.get("content-length")) + data = self.rfile.read(length) + + try: + cmd = InputMessage.parse_raw(data) + except ValidationError as exc: + self.send_response(400, "Bad Request") + self.end_headers() + self.wfile.write(exc.json().encode("utf-8")) + self.close_connection = True + return + + cmd = cmd.__root__ + global REPL_LOCALS + global REPL_GLOBALS + if isinstance(cmd, Cmd): + if cmd.cmd == CommandName.QUIT: + self.send_response(200, "OK") + self.end_headers() + self.wfile.write(b"") + # to kill, we send CTRL_C_EVENT to our own process. + import os + import signal + + os.kill(os.getpid(), signal.CTRL_C_EVENT) + elif cmd.cmd == CommandName.RESET: + REPL_GLOBALS = {} + REPL_LOCALS = {} + self.send_response(200, "OK") + self.end_headers() + self.wfile.write(b"") + elif isinstance(cmd, Code): + executor = run_ast if cmd.use_ast else run_code + # NOTE: we only pass globals, otherwise for example code like this: + # def f(): + # return 42 + # print(f()) + # would not work. + result = str(executor(cmd.code, REPL_GLOBALS)) + self.send_response(200, "OK") + self.end_headers() + self.wfile.write(OutputMessage(result=result).json().encode("utf-8")) + else: + self.send_response(400, "Bad Request") + self.end_headers() + self.wfile.write(b"Failed to parse input.") + + +def run_server() -> None: + """Runs http server that accepts json requests and returns json responses.""" + http.server.HTTPServer.allow_reuse_address = True + # NOTE: 8080 is internal and important hardcoded port number to match + # python_docker_repl.py, do not change it. + httpd = http.server.HTTPServer(("", 8080), PythonREPLService) + try: + httpd.serve_forever() + except KeyboardInterrupt: + httpd.server_close() + + +def run() -> None: + """Runs infinite loop of python REPL.""" + run_server() + + +if __name__ == "__main__": + run() diff --git a/libs/langchain/tests/integration_tests/utilities/test_python_docker_repl.py b/libs/langchain/tests/integration_tests/utilities/test_python_docker_repl.py new file mode 100644 index 0000000000000..4cf81108f576e --- /dev/null +++ b/libs/langchain/tests/integration_tests/utilities/test_python_docker_repl.py @@ -0,0 +1,56 @@ +# NOTE: ports are set to be different in each test so they don't collide +# as sockets might not be available immediately after container is stopped. +# It means that tests might also fail to run if port is not freed in time! +from langchain.utilities.python_docker_repl import PythonContainerREPL + + +def test_python_container_repl_can_be_started() -> None: + repl = PythonContainerREPL(port=7120) + assert repl is not None + del repl + + +def test_python_container_repl_works(): + repl = PythonContainerREPL(port=7121) + out1 = repl.exec("x = [1, 2, 3]") + assert out1 == "" + out2 = repl.eval("len(x)") + assert out2 == "3" + out3 = repl.exec("len(x)") + assert out3 == "" + out4 = repl.exec("print(len(x))") + assert out4 == "3\n" + + err = repl.exec("print(x") + assert "SyntaxError" in err + + err2 = repl.eval("print(x") + assert "SyntaxError" in err2 + + +def test_python_container_exec_code() -> None: + repl = PythonContainerREPL(port=7122) + code = """def fib(n): + if n < 2: + return n + return fib(n-1) + fib(n-2) +print(fib(6)) +""" + out = repl.exec(code) + assert out == "8\n" + out2 = repl.exec("print(fib(5))") + assert out2 == "5\n" + + +def test_python_container_ast_code() -> None: + repl = PythonContainerREPL(port=7123) + code = """def fib(n): + if n < 2: + return n + return fib(n-1) + fib(n-2) +fib(6) +""" + out = repl.eval(code) + assert out == "8" + out2 = repl.eval("fib(5)") + assert out2 == "5"