Skip to content

Commit

Permalink
Add python REPL running inside docker container
Browse files Browse the repository at this point in the history
* communication via HTTP
  • Loading branch information
piotr-grodek-dsai committed Nov 3, 2023
1 parent 79ab31c commit 099abe8
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ repos:
args: []
# You can add additional plugins for mypy below
# such as types-python-dateutil
additional_dependencies: []
additional_dependencies: ["types-requests"]
exclude: (/test_|setup.py|/tests/|docs/)

# Sort imports alphabetically, and automatically separated into sections and by type.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ max-locals=20
min-similarity-lines=10

[tool.bandit]
exclude_dirs = ["venv",]
exclude_dirs = ["venv", "src/ds_pycontain/data"]
# B101 disables errors for asserts in the code
# remember to not use asserts for security and control flows
skips = ["B101"]
147 changes: 147 additions & 0 deletions src/ds_pycontain/data/python_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Implementation of the server which will be copied to docker container.
You should not run it for other purposes than testing.
"""
# CAREFUL! This file is copied to docker container and executed there.
# it is based only on python standard library as other dependencies
# are not available in the container. See python_docker_repl.py for more info
# and check _get_dockerfile_content() function.

import ast
import http.server
import json
import sys
from contextlib import redirect_stdout
from io import StringIO
from typing import Any, Dict, Optional

REPL_GLOBALS: Dict[str, Any] = {}


def _run_ast(code: str, globals_: Optional[Dict] = None, locals_: Optional[Dict] = None) -> str:
"""It execs code with intention of capturing the result of the last line,
similar to output of python REPL if you type a command.
:param code: code to execute.
:param globals_: globals dictionary to use.
:param locals_: locals dictionary to use.
:return: python code evaluation result or error message.
"""
tree = ast.parse(code)
module = ast.Module(tree.body[:-1], type_ignores=[])
exec(ast.unparse(module), globals_, locals_) # type: ignore # pylint: disable=exec-used
module_end = ast.Module(tree.body[-1:], type_ignores=[])
module_end_str = ast.unparse(module_end) # type: ignore
io_buffer = StringIO()
try:
with redirect_stdout(io_buffer):
ret = eval(module_end_str, globals_, locals_) # pylint: disable=eval-used
if ret is None:
return io_buffer.getvalue()
return ret
except Exception: # pylint: disable=broad-except
with redirect_stdout(io_buffer):
exec(module_end_str, globals_, locals_) # pylint: disable=exec-used
return io_buffer.getvalue()


def run_ast(code: str, globals_: Optional[Dict] = None, locals_: Optional[Dict] = None) -> str:
"""It is a wrapper around _run_ast that catches exceptions so it
behaves as run_code.
:param code: code to execute.
:param globals_: globals dictionary to use.
:param locals_: locals dictionary to use.
:return: python evaluation result or error message.
"""
try:
return _run_ast(code, globals_, locals_)
except Exception as ex: # pylint: disable=broad-except
return repr(ex.with_traceback(None))


def run_code(code: str, globals_: Optional[Dict] = None, locals_: Optional[Dict] = None) -> str:
"""Executes code and returns captured stdout. or error message.
:param code: code to execute.
:param globals_: globals dictionary to use.
:param locals_: locals dictionary to use.
:return: captured stdout or error message.
"""
old_stdout = sys.stdout
sys.stdout = new_stdout = StringIO()
try:
exec(code, globals_, locals_) # pylint: disable=exec-used
return new_stdout.getvalue()
except Exception as ex: # pylint: disable=broad-except
return repr(ex.with_traceback(None))
finally:
sys.stdout = old_stdout


class PythonREPLService(http.server.BaseHTTPRequestHandler):
"""Simple python REPL server - http server that accepts json requests and
returns json responses.
NOTE: this object is created for each request so it is stateless.
"""

def do_GET(self) -> None: # pylint: disable=invalid-name
"""Handles GET requests and serves as health check and
hello banner.
"""
self.send_response(200, "OK")
self.end_headers()
self.wfile.write(b"Hello! I am a python REPL server.")

def do_POST(self) -> None: # pylint: disable=invalid-name
"""Handles POST requests for code execution."""
length = int(self.headers.get("content-length", 0))
if length == 0:
self.send_response(400, "Bad Request")
self.end_headers()
self.wfile.write(b"Empty input.")
return
data = self.rfile.read(length)

try:
cmd_json = json.loads(data)
except json.JSONDecodeError as exc:
self.send_response(400, "Bad Request")
self.end_headers()
self.wfile.write(f"Failed to parse input: {exc}".encode())
# inform base class that connection should be closed
self.close_connection = True # pylint: disable=attribute-defined-outside-init
return

if "code" in cmd_json:
use_ast = cmd_json.get("use_ast", False)
executor = run_ast if use_ast else run_code
result = str(executor(cmd_json["code"], REPL_GLOBALS))
self.send_response(200, "OK")
self.end_headers()
self.wfile.write(json.dumps({"result": result}).encode("utf-8"))

else:
self.send_response(400, "Bad Request")
self.end_headers()
self.wfile.write(b"Invalid input format.")


def run_server() -> None:
"""Runs http server that accepts json requests and returns json responses."""
http.server.HTTPServer.allow_reuse_address = True
# NOTE: 8080 is internal and important hardcoded port number to match
# python_docker_repl.py, do not change it.
httpd = http.server.HTTPServer(("", 8080), PythonREPLService)
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.server_close()


def run() -> None:
"""Runs infinite loop of python REPL."""
run_server()


if __name__ == "__main__":
run()
194 changes: 194 additions & 0 deletions src/ds_pycontain/python_dockerized_repl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import json
import shutil
import time
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Dict, Optional

import requests

from ds_pycontain.docker_containers import DockerContainer, DockerImage


def _get_dockerfile_content(base_image: str, script_path: str) -> str:
"""Generates on the fly dockerfile for python runner.
It is used to build docker image for python REPL server.
:param base_image: base image to use for the docker image.
:param script_path: path to python_docker_repl_runner.py script.
:return: dockerfile content as string.
"""
return f"""FROM {base_image}
RUN pip install --no-cache-dir pydantic==1.10.12
RUN adduser -D runner
USER runner
WORKDIR /app
COPY {script_path} /app/python_runner.py
# Ensure python output is not buffered to remove logs delay
ENV PYTHONUNBUFFERED=1
EXPOSE 8080
ENTRYPOINT ["python3", "/app/python_runner.py"]
"""


def _build_or_use_docker_image(
base_image: str = "python:3.11-alpine3.18",
) -> DockerImage:
"""Builds docker image from data/python_runner.py script
and docker template.
:param base_image: base image to use for the docker image.
:return: docker image.
"""

# we autogenerate deterministic name for the image
name = f"ds_pycontain_pyrepl_{base_image}"
script_name = "python_runner.py"
# workaround for https://github.com/docker/docker-py/issues/2105
# which fails to use in-memory dockerfile with passing docker build
# context. It requires to pass directory name.
with TemporaryDirectory() as tmpdir:
runner_script = Path(__file__).parent / "data" / script_name
assert runner_script.exists()
shutil.copy(runner_script, tmpdir)
dockerfile_content = _get_dockerfile_content(base_image, script_name)
dockerfile = Path(tmpdir) / "Dockerfile"
with dockerfile.open("w") as f:
f.write(dockerfile_content)
return DockerImage.from_dockerfile(dockerfile.parent, name=name)


class PythonContainerREPL:
"""This class is a wrapper around the docker container that runs the python
REPL server. It is used to execute python code in the container and return
the results.
It assumes specific docker image is used which runs langchain python runner
server and it communicates by HTTP requests."""

def __init__(
self,
port: int = 7123,
image: Optional[DockerImage] = None,
base_image: str = "python:3.11-alpine3.18",
**kwargs: Dict[str, Any],
) -> None:
"""Starts docker container with python REPL server and wait till it
gets operational.
If image is not provided it will build based on
the base_image and python_docker_repl_runner.py script.
All other params: **kwargs are passed to DockerContainer constructor,
however port mapping is hardcoded to map docker's 8080 to provided port.
You can use it to limit memory/cpu etc. of the container.
:param port: port to use for the python REPL server.
:param image: docker image to use for the container.
:param base_image: base image to use for the docker image.
:param kwargs: additional params to pass to DockerContainer constructor.
"""
# for now use the image we created.
self.port = port
if image is None and not base_image:
raise ValueError("Either image or base_image must be provided.")
self.image = image if image is not None else _build_or_use_docker_image(base_image)
self.container = DockerContainer(self.image, ports={"8080/tcp": port}, **kwargs)
# we need to start non-lexical scope lifetime for container
# usually with statement should be used.
# __del__ will close container.
self.container.unsafe_start()
self.session = requests.Session()
# we need to ensure container is running and REPL server is
# ready to accept requests, otherwise we might get connection
# refused due to race conditions.
self._wait_for_container_running()
self._wait_for_repl_ready()

def _wait_for_container_running(self, timeout: float = 3.0) -> None:
"""Sleep until container is running or timeout is reached.
:param timeout: timeout in seconds.
:raises TimeoutError: if timeout is reached.
"""
status = self.container.docker_container.status
while status not in ("created", "running"):
time.sleep(0.1)
timeout -= 0.1
if timeout < 0:
raise TimeoutError(f"Failed to start container - status={status}")

def _wait_for_repl_ready(self, timeout: float = 3.0) -> None:
"""Sleep until REPL server is ready to accept requests or timeout is reached.
:param timeout: timeout in seconds.
:raises TimeoutError: if timeout is reached."""
while True:
try:
banner = self.session.get(f"http://localhost:{self.port}")
if banner.text != "Hello! I am a python REPL server.":
raise ValueError("Unrecognized banner, it is not a langchain python REPL server.")
break
except Exception as ex: # pylint: disable=broad-except
time.sleep(0.1)
timeout -= 0.1
if timeout < 0:
raise TimeoutError("Failed to boot service. Timed out.") from ex

def __del__(self) -> None:
"""Closes container and removes it."""
self.container.unsafe_exit()

def _exec(self, code: str, use_ast: bool = True) -> str:
"""Executes code and returns captured stdout. or error message.
:param code: code to execute.
:param use_ast: if True, use ast module to parse code, otherwise use eval.
:return: stdout or error message.
"""
try:
msg = {"code": code, "use_ast": 1 if use_ast else 0}
result = self.session.post(f"http://localhost:{self.port}", json=msg)
except Exception as ex: # pylint: disable=broad-except
return repr(ex.with_traceback(None))
data = result.text
if not data:
return ""
output = json.loads(data)
return output.get("result", "")

def eval(self, code: str) -> str:
"""Evaluate code and return result as string.
:param code: code to evaluate.
:return: result as string.
"""
return self._exec(code, use_ast=True)

def exec(self, code: str) -> str:
"""Execute code and return stdout.
:param code: code to execute.
:return: result as string.
"""
return self._exec(code, use_ast=False)

def run(self, command: str, timeout: Optional[int] = None) -> str:
"""Run command and returns anything printed.
Timeout, if provided, is not currently supported and will be ignored.
:param command: command to run.
:param timeout: timeout in seconds.
:return: result from REPL as output.
"""

# potentially add a warning or log message if a timeout is provided,
# as it's not supported in the current implementation.
if timeout is not None:
print("Warning: timeout is not supported in the current implementation.")

# exec method is used here as it will execute the command and return stdout.
return self.exec(command)
Loading

0 comments on commit 099abe8

Please sign in to comment.