From 6104a1801c7c142056adc0410aff553231c2a958 Mon Sep 17 00:00:00 2001 From: Simon Li Date: Fri, 10 Sep 2021 20:04:31 +0100 Subject: [PATCH 1/2] Call exec_podman in new thread, handle streaming logs whilst exited `podman logs --follow` doesn't always exit when the container exits, causing repo2docker to hang https://github.com/manics/repo2docker-podman/issues/6 This is reproducible with repo2docker images but not a simple image like busybox or nginx. This PR works around it by running `podman logs --follow` in a separate thread, and checking whether the container has exited in the main thread --- repo2podman/podman.py | 103 ++++++++++++++++++++++++++++++++------ tests/unit/test_podman.py | 33 +++++++++++- 2 files changed, 120 insertions(+), 16 deletions(-) diff --git a/repo2podman/podman.py b/repo2podman/podman.py index f775423..5caff61 100644 --- a/repo2podman/podman.py +++ b/repo2podman/podman.py @@ -1,10 +1,12 @@ # Use Podman instead of Docker from functools import partial import json +from queue import Queue, Empty import re from subprocess import CalledProcessError, PIPE, STDOUT, Popen import tarfile from tempfile import TemporaryDirectory +from threading import Thread from traitlets import Unicode from repo2docker.engine import ( @@ -14,10 +16,32 @@ ) -def execute_cmd(cmd, capture=None, **kwargs): +DEFAULT_READ_TIMEOUT = 1 + + +class ProcessTerminated(CalledProcessError): + """ + Thrown when a process was forcibly terminated + """ + + def __init__(self, message=None): + self.message = message + + def __str__(self): + s = "ProcessTerminated\n {}\n {}".format(self.e, self.message) + return s + + +def execute_cmd(cmd, capture=None, *, read_timeout=None, break_callback=None, **kwargs): """ Call given command, yielding output line by line if capture is set. + break_callback: A callable that returns a boolean indicating whether to + stop execution. + See https://stackoverflow.com/a/4896288 + This is needed to work around https://github.com/manics/repo2docker-podman/issues/6 + If a process is terminated due to break_callback then ProcessTerminated is thrown + Modified version of repo2docker.utils.execute_cmd that allows capturing of stdout, stderr or both. @@ -34,6 +58,9 @@ def execute_cmd(cmd, capture=None, **kwargs): elif capture is not None: raise ValueError("Invalid capture argument: {}".format(capture)) + if read_timeout is None: + read_timeout = DEFAULT_READ_TIMEOUT + proc = Popen(cmd, **kwargs) if not capture: @@ -48,6 +75,14 @@ def execute_cmd(cmd, capture=None, **kwargs): # This should behave the same as .readline(), but splits on `\r` OR `\n`, # not just `\n`. buf = [] + q = Queue() + + def readToQueue(proc, capture, q): + try: + for c in iter(partial(getattr(proc, capture).read, 1), b""): + q.put(c) + finally: + proc.wait() def flush(): """Flush next line of the buffer""" @@ -55,21 +90,42 @@ def flush(): buf[:] = [] return line + t = Thread(target=readToQueue, args=(proc, capture, q)) + # thread dies with the program + t.daemon = True + t.start() + c_last = "" - try: - for c in iter(partial(getattr(proc, capture).read, 1), b""): + terminate = False + terminated = False + while True: + try: + c = q.get(block=True, timeout=read_timeout) if c_last == b"\r" and buf and c != b"\n": yield flush() buf.append(c) if c == b"\n": yield flush() c_last = c - if buf: - yield flush() - finally: - ret = proc.wait() - if ret != 0: - raise CalledProcessError(ret, cmd) + except Empty: + # Only terminate if timeout occurred so that all output has been read + if proc.poll() is not None: + break + if terminate: + proc.terminate() + terminated = True + break + if break_callback: + terminate = break_callback() + if buf: + yield flush() + + t.join() + + if terminated: + raise ProcessTerminated(cmd) + if proc.returncode != 0: + raise CalledProcessError(proc.returncode, cmd) class PodmanCommandError(Exception): @@ -84,7 +140,7 @@ def __str__(self): return s -def exec_podman(args, *, capture): +def exec_podman(args, *, capture, read_timeout=None, break_callback=None): """ Execute a podman command capture: @@ -102,7 +158,7 @@ def exec_podman(args, *, capture): cmd = ["podman"] + args print("Executing: {}".format(" ".join(cmd))) try: - p = execute_cmd(cmd, capture=capture) + p = execute_cmd(cmd, capture=capture, break_callback=break_callback) except CalledProcessError as e: raise PodmanCommandError(e) from None # Need to iterate even if not capturing because execute_cmd is a generator @@ -116,7 +172,7 @@ def exec_podman(args, *, capture): raise PodmanCommandError(e, lines) from None -def exec_podman_stream(args): +def exec_podman_stream(args, *, read_timeout=None, break_callback=None): """ Execute a podman command and stream the output @@ -124,7 +180,7 @@ def exec_podman_stream(args): """ cmd = ["podman"] + args print("Executing: {}".format(" ".join(cmd))) - p = execute_cmd(cmd, capture="both") + p = execute_cmd(cmd, capture="both", break_callback=break_callback) # This will stream the output and also pass any exceptions to the caller yield from p @@ -144,6 +200,14 @@ def reload(self): self.attrs = d[0] assert self.attrs["Id"].startswith(self.id) + def _exited(self): + status = "\n".join( + exec_podman( + ["inspect", "--format={{.State.Status}}", self.id], capture="both" + ) + ) + return status.strip() == "exited" + def logs(self, *, stream=False, timestamps=False, since=None): log_command = ["logs"] if timestamps: @@ -153,9 +217,18 @@ def logs(self, *, stream=False, timestamps=False, since=None): if stream: + # Podman logs --follow may hang if container is stopped def iter_logs(cid): - for line in exec_podman_stream(log_command + ["--follow", cid]): - yield line.encode("utf-8") + try: + for line in exec_podman_stream( + log_command + ["--follow", cid], + read_timeout=2, + break_callback=self._exited, + ): + yield line.encode("utf-8") + except ProcessTerminated: + # Popen.terminate was called + pass return iter_logs(self.id) diff --git a/tests/unit/test_podman.py b/tests/unit/test_podman.py index 909e429..e7b6cb0 100644 --- a/tests/unit/test_podman.py +++ b/tests/unit/test_podman.py @@ -2,13 +2,44 @@ import pytest import re -from repo2podman.podman import PodmanCommandError, PodmanContainer, PodmanEngine +from repo2podman.podman import ( + execute_cmd, + PodmanCommandError, + PodmanContainer, + PodmanEngine, + ProcessTerminated, +) from time import sleep BUSYBOX = "docker.io/library/busybox" +class Counter: + def __init__(self): + self.n = 0 + + def inc(self): + self.n += 1 + return self.n + + +def test_execute_cmd(): + r = execute_cmd(["echo", "a"], capture="both", break_callback=None) + assert list(r) == ["a\n"] + + c = Counter() + with pytest.raises(ProcessTerminated): + r = execute_cmd( + ["sleep", "1m"], + capture="both", + read_timeout=1, + break_callback=lambda: c.inc() == 2, + ) + list(r) + assert c.n == 2 + + def test_run(): client = PodmanEngine(parent=None) c = client.run(BUSYBOX, command=["id", "-un"]) From 269e2878e87a4404ccc0f891e8f34436500ffbb7 Mon Sep 17 00:00:00 2001 From: Simon Li Date: Fri, 10 Sep 2021 20:30:52 +0100 Subject: [PATCH 2/2] Increase sleep in test_run_autoremove --- tests/unit/test_podman.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_podman.py b/tests/unit/test_podman.py index e7b6cb0..9403861 100644 --- a/tests/unit/test_podman.py +++ b/tests/unit/test_podman.py @@ -63,9 +63,9 @@ def test_run(): def test_run_autoremove(): client = PodmanEngine(parent=None) # Need to sleep in container to prevent race condition - c = client.run(BUSYBOX, command=["sh", "-c", "sleep 1; id -un"], remove=True) + c = client.run(BUSYBOX, command=["sh", "-c", "sleep 2; id -un"], remove=True) # Sleep to ensure container has exited - sleep(2) + sleep(3) with pytest.raises(PodmanCommandError) as exc: c.reload() assert "".join(exc.value.output).strip() == "[]"