Skip to content

Commit

Permalink
Add dockerized Python REPL runner utility
Browse files Browse the repository at this point in the history
  • Loading branch information
piotr-grodek-dsai committed Sep 20, 2023
1 parent ea81e17 commit f4b32c4
Show file tree
Hide file tree
Showing 3 changed files with 384 additions and 0 deletions.
144 changes: 144 additions & 0 deletions libs/langchain/langchain/utilities/python_docker_repl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import time
from pathlib import Path
from typing import Optional

import requests

from langchain.utilities.docker_containers import DockerContainer, DockerImage


def _get_dockerfile_content(base_image: str, script_path: str) -> str:
return f"""FROM {base_image}
RUN pip install --no-cache-dir pydantic==1.10.12
RUN adduser -D runner
USER runner
WORKDIR /app
COPY {script_path} /app/python_runner.py
# Ensure python output is not buffered to remove logs delay
ENV PYTHONUNBUFFERED=1
EXPOSE 8080
ENTRYPOINT ["python3", "/app/python_runner.py"]
"""


def _build_or_use_docker_image(
base_image: str = "python:3.11-alpine3.18",
) -> DockerImage:
"""Builds docker image from python_docker_repl_runner.py script
and docker template. Returns image object."""

# we autogenerate deterministic name for the image
name = f"langchain_pyrepl_{base_image}"
import shutil
from tempfile import TemporaryDirectory

script_name = "python_docker_repl_runner.py"
# workaround for https://github.com/docker/docker-py/issues/2105
# which fails to use in-memory dockerfile with passing docker build
# context. It requires to pass directory name.
with TemporaryDirectory() as tmpdir:
runner_script = Path(__file__).parent / script_name
assert runner_script.exists()
shutil.copy(runner_script, tmpdir)
dockerfile_content = _get_dockerfile_content(base_image, script_name)
dockerfile = Path(tmpdir) / "Dockerfile"
with dockerfile.open("w") as f:
f.write(dockerfile_content)
return DockerImage.from_dockerfile(dockerfile.parent, name=name)


class PythonContainerREPL:
"""This class is a wrapper around the docker container that runs the python
REPL server. It is used to execute python code in the container and return
the results.
It assumes specific docker image is used which runs langchain python runner
server and it communicates by HTTP requests."""

def __init__(
self,
port: int = 7123,
image: Optional[DockerImage] = None,
base_image: str = "python:3.11-alpine3.18",
**kwargs,
) -> None:
"""Starts docker container with python REPL server and wait till it
gets operational.
If image is not provided it will build based on
the base_image and python_docker_repl_runner.py script.
All other params: **kwargs are passed to DockerContainer constructor,
however port mapping is hardcoded to map docker's 8080 to provided port.
You can use it to limit memory/cpu etc. of the container.
"""
# for now use the image we created.
self.port = port
if image is None and not base_image:
raise ValueError("Either image or base_image must be provided.")
self.image = (
image if image is not None else _build_or_use_docker_image(base_image)
)
self.container = DockerContainer(self.image, ports={"8080/tcp": port}, **kwargs)
# we need to start non-lexical scope lifetime for container
# usually with statement should be used.
# __del__ will close container.
self.container.unsafe_start()
self.session = requests.Session()
# we need to ensure container is running and REPL server is
# ready to accept requests, otherwise we might get connection
# refused due to race conditions.
self._wait_for_container_running()
self._wait_for_repl_ready()

def _wait_for_container_running(self, timeout: float = 3.0) -> None:
status = self.container.docker_container.status
while status not in ("created", "running"):
time.sleep(0.1)
timeout -= 0.1
if timeout < 0:
raise TimeoutError(f"Failed to start container - status={status}")

def _wait_for_repl_ready(self, timeout: float = 3.0) -> None:
while True:
try:
ex = self.session.get(f"http://localhost:{self.port}")
if ex.text != "Hello! I am a python REPL server.":
raise Exception(
"Unrecognized banner, it is not a langchain python REPL server."
)
break
except Exception as ex:
time.sleep(0.1)
timeout -= 0.1
if timeout < 0:
raise TimeoutError("Failed to boot service.")

def __del__(self) -> None:
self.container.unsafe_exit()

def _exec(self, code: str, use_ast: bool = True) -> str:
"""Executes code and returns captured stdout. or error message."""
import json

try:
msg = {"code": code, "use_ast": 1 if use_ast else 0}
result = self.session.post(f"http://localhost:{self.port}", json=msg)
except Exception as ex:
return repr(ex.with_traceback(None))
data = result.text
if not data:
return ""
output = json.loads(data)
return output.get("result", "")

def eval(self, code: str) -> str:
"""Evaluate code and return result as string."""
return self._exec(code, use_ast=True)

def exec(self, code: str) -> str:
"""Execute code and return stdout."""
return self._exec(code, use_ast=False)
184 changes: 184 additions & 0 deletions libs/langchain/langchain/utilities/python_docker_repl_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""Implementation of the server which will be copied to docker container.
You should not run it for other purposes than testing.
"""
# CAREFUL! This file is copied to docker container and executed there.
# it is based only on python standard library and pydantic as other dependencies
# are not available in the container. See python_docker_repl.py for more info
# and check _get_dockerfile_content() function.
import ast
import http.server
import sys
from contextlib import redirect_stdout
from enum import Enum
from io import StringIO
from typing import Dict, Optional, Union

from pydantic import BaseModel, ValidationError


class CommandName(Enum):
RESET = "reset"
QUIT = "quit"


class Cmd(BaseModel):
cmd: CommandName

class Config:
extra = "forbid"


class Code(BaseModel):
code: str
use_ast: bool = False

class Config:
extra = "forbid"


class InputMessage(BaseModel):
__root__: Union[Code, Cmd]


class OutputMessage(BaseModel):
result: str

class Config:
extra = "forbid"


REPL_GLOBALS = {}
REPL_LOCALS = {}


def _run_ast(
code: str, globals: Optional[Dict] = None, locals: Optional[Dict] = None
) -> str:
"""It execs code with intention of capturing the result of the last line,
similar to output of python REPL if you type a command.
"""
tree = ast.parse(code)
module = ast.Module(tree.body[:-1], type_ignores=[])
exec(ast.unparse(module), globals, locals) # type: ignore
module_end = ast.Module(tree.body[-1:], type_ignores=[])
module_end_str = ast.unparse(module_end) # type: ignore
io_buffer = StringIO()
try:
with redirect_stdout(io_buffer):
ret = eval(module_end_str, globals, locals)
if ret is None:
return io_buffer.getvalue()
else:
return ret
except Exception:
with redirect_stdout(io_buffer):
exec(module_end_str, globals, locals)
return io_buffer.getvalue()


def run_ast(
code: str, globals: Optional[Dict] = None, locals: Optional[Dict] = None
) -> str:
"""It is a wrapper around _run_ast that catches exceptions so it
behaves as run_code.
"""
try:
return _run_ast(code, globals, locals)
except Exception as ex:
return repr(ex.with_traceback(None))


def run_code(
code: str, globals: Optional[Dict] = None, locals: Optional[Dict] = None
) -> str:
"""Executes code and returns captured stdout. or error message."""
old_stdout = sys.stdout
sys.stdout = new_stdout = StringIO()
try:
exec(code, globals, locals)
return new_stdout.getvalue()
except Exception as ex:
return repr(ex.with_traceback(None))
finally:
sys.stdout = old_stdout


class PythonREPLService(http.server.BaseHTTPRequestHandler):
"""Simple python REPL server - http server that accepts json requests and
returns json responses.
NOTE: this object is created for each request so it is stateless.
"""

def do_GET(self):
self.send_response(200, "OK")
self.end_headers()
self.wfile.write(b"Hello! I am a python REPL server.")

def do_POST(self):
length = int(self.headers.get("content-length"))
data = self.rfile.read(length)

try:
cmd = InputMessage.parse_raw(data)
except ValidationError as exc:
self.send_response(400, "Bad Request")
self.end_headers()
self.wfile.write(exc.json().encode("utf-8"))
self.close_connection = True
return

cmd = cmd.__root__
global REPL_LOCALS
global REPL_GLOBALS
if isinstance(cmd, Cmd):
if cmd.cmd == CommandName.QUIT:
self.send_response(200, "OK")
self.end_headers()
self.wfile.write(b"")
# to kill, we send CTRL_C_EVENT to our own process.
import os
import signal

os.kill(os.getpid(), signal.CTRL_C_EVENT)
elif cmd.cmd == CommandName.RESET:
REPL_GLOBALS = {}
REPL_LOCALS = {}
self.send_response(200, "OK")
self.end_headers()
self.wfile.write(b"")
elif isinstance(cmd, Code):
executor = run_ast if cmd.use_ast else run_code
# NOTE: we only pass globals, otherwise for example code like this:
# def f():
# return 42
# print(f())
# would not work.
result = str(executor(cmd.code, REPL_GLOBALS))
self.send_response(200, "OK")
self.end_headers()
self.wfile.write(OutputMessage(result=result).json().encode("utf-8"))
else:
self.send_response(400, "Bad Request")
self.end_headers()
self.wfile.write(b"Failed to parse input.")


def run_server() -> None:
"""Runs http server that accepts json requests and returns json responses."""
http.server.HTTPServer.allow_reuse_address = True
# NOTE: 8080 is internal and important hardcoded port number to match
# python_docker_repl.py, do not change it.
httpd = http.server.HTTPServer(("", 8080), PythonREPLService)
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.server_close()


def run() -> None:
"""Runs infinite loop of python REPL."""
run_server()


if __name__ == "__main__":
run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# NOTE: ports are set to be different in each test so they don't collide
# as sockets might not be available immediately after container is stopped.
# It means that tests might also fail to run if port is not freed in time!
from langchain.utilities.python_docker_repl import PythonContainerREPL


def test_python_container_repl_can_be_started() -> None:
repl = PythonContainerREPL(port=7120)
assert repl is not None
del repl


def test_python_container_repl_works():
repl = PythonContainerREPL(port=7121)
out1 = repl.exec("x = [1, 2, 3]")
assert out1 == ""
out2 = repl.eval("len(x)")
assert out2 == "3"
out3 = repl.exec("len(x)")
assert out3 == ""
out4 = repl.exec("print(len(x))")
assert out4 == "3\n"

err = repl.exec("print(x")
assert "SyntaxError" in err

err2 = repl.eval("print(x")
assert "SyntaxError" in err2


def test_python_container_exec_code() -> None:
repl = PythonContainerREPL(port=7122)
code = """def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
print(fib(6))
"""
out = repl.exec(code)
assert out == "8\n"
out2 = repl.exec("print(fib(5))")
assert out2 == "5\n"


def test_python_container_ast_code() -> None:
repl = PythonContainerREPL(port=7123)
code = """def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
fib(6)
"""
out = repl.eval(code)
assert out == "8"
out2 = repl.eval("fib(5)")
assert out2 == "5"

0 comments on commit f4b32c4

Please sign in to comment.