Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call exec_podman in new thread, handle streaming logs whilst exited #7

Merged
merged 2 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 88 additions & 15 deletions repo2podman/podman.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Use Podman instead of Docker
from functools import partial
import json
from queue import Queue, Empty
import re
from subprocess import CalledProcessError, PIPE, STDOUT, Popen
import tarfile
from tempfile import TemporaryDirectory
from threading import Thread
from traitlets import Unicode

from repo2docker.engine import (
Expand All @@ -14,10 +16,32 @@
)


def execute_cmd(cmd, capture=None, **kwargs):
DEFAULT_READ_TIMEOUT = 1


class ProcessTerminated(CalledProcessError):
"""
Thrown when a process was forcibly terminated
"""

def __init__(self, message=None):
self.message = message

def __str__(self):
s = "ProcessTerminated\n {}\n {}".format(self.e, self.message)
return s


def execute_cmd(cmd, capture=None, *, read_timeout=None, break_callback=None, **kwargs):
"""
Call given command, yielding output line by line if capture is set.

break_callback: A callable that returns a boolean indicating whether to
stop execution.
See https://stackoverflow.com/a/4896288
This is needed to work around https://github.com/manics/repo2docker-podman/issues/6
If a process is terminated due to break_callback then ProcessTerminated is thrown

Modified version of repo2docker.utils.execute_cmd
that allows capturing of stdout, stderr or both.

Expand All @@ -34,6 +58,9 @@ def execute_cmd(cmd, capture=None, **kwargs):
elif capture is not None:
raise ValueError("Invalid capture argument: {}".format(capture))

if read_timeout is None:
read_timeout = DEFAULT_READ_TIMEOUT

proc = Popen(cmd, **kwargs)

if not capture:
Expand All @@ -48,28 +75,57 @@ def execute_cmd(cmd, capture=None, **kwargs):
# This should behave the same as .readline(), but splits on `\r` OR `\n`,
# not just `\n`.
buf = []
q = Queue()

def readToQueue(proc, capture, q):
try:
for c in iter(partial(getattr(proc, capture).read, 1), b""):
q.put(c)
finally:
proc.wait()

def flush():
"""Flush next line of the buffer"""
line = b"".join(buf).decode("utf8", "replace")
buf[:] = []
return line

t = Thread(target=readToQueue, args=(proc, capture, q))
# thread dies with the program
t.daemon = True
t.start()

c_last = ""
try:
for c in iter(partial(getattr(proc, capture).read, 1), b""):
terminate = False
terminated = False
while True:
try:
c = q.get(block=True, timeout=read_timeout)
if c_last == b"\r" and buf and c != b"\n":
yield flush()
buf.append(c)
if c == b"\n":
yield flush()
c_last = c
if buf:
yield flush()
finally:
ret = proc.wait()
if ret != 0:
raise CalledProcessError(ret, cmd)
except Empty:
# Only terminate if timeout occurred so that all output has been read
if proc.poll() is not None:
break
if terminate:
proc.terminate()
terminated = True
break
if break_callback:
terminate = break_callback()
if buf:
yield flush()

t.join()

if terminated:
raise ProcessTerminated(cmd)
if proc.returncode != 0:
raise CalledProcessError(proc.returncode, cmd)


class PodmanCommandError(Exception):
Expand All @@ -84,7 +140,7 @@ def __str__(self):
return s


def exec_podman(args, *, capture):
def exec_podman(args, *, capture, read_timeout=None, break_callback=None):
"""
Execute a podman command
capture:
Expand All @@ -102,7 +158,7 @@ def exec_podman(args, *, capture):
cmd = ["podman"] + args
print("Executing: {}".format(" ".join(cmd)))
try:
p = execute_cmd(cmd, capture=capture)
p = execute_cmd(cmd, capture=capture, break_callback=break_callback)
except CalledProcessError as e:
raise PodmanCommandError(e) from None
# Need to iterate even if not capturing because execute_cmd is a generator
Expand All @@ -116,15 +172,15 @@ def exec_podman(args, *, capture):
raise PodmanCommandError(e, lines) from None


def exec_podman_stream(args):
def exec_podman_stream(args, *, read_timeout=None, break_callback=None):
"""
Execute a podman command and stream the output

Passes on CalledProcessError if exit code is not 0
"""
cmd = ["podman"] + args
print("Executing: {}".format(" ".join(cmd)))
p = execute_cmd(cmd, capture="both")
p = execute_cmd(cmd, capture="both", break_callback=break_callback)
# This will stream the output and also pass any exceptions to the caller
yield from p

Expand All @@ -144,6 +200,14 @@ def reload(self):
self.attrs = d[0]
assert self.attrs["Id"].startswith(self.id)

def _exited(self):
status = "\n".join(
exec_podman(
["inspect", "--format={{.State.Status}}", self.id], capture="both"
)
)
return status.strip() == "exited"

def logs(self, *, stream=False, timestamps=False, since=None):
log_command = ["logs"]
if timestamps:
Expand All @@ -153,9 +217,18 @@ def logs(self, *, stream=False, timestamps=False, since=None):

if stream:

# Podman logs --follow may hang if container is stopped
def iter_logs(cid):
for line in exec_podman_stream(log_command + ["--follow", cid]):
yield line.encode("utf-8")
try:
for line in exec_podman_stream(
log_command + ["--follow", cid],
read_timeout=2,
break_callback=self._exited,
):
yield line.encode("utf-8")
except ProcessTerminated:
# Popen.terminate was called
pass

return iter_logs(self.id)

Expand Down
37 changes: 34 additions & 3 deletions tests/unit/test_podman.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,44 @@

import pytest
import re
from repo2podman.podman import PodmanCommandError, PodmanContainer, PodmanEngine
from repo2podman.podman import (
execute_cmd,
PodmanCommandError,
PodmanContainer,
PodmanEngine,
ProcessTerminated,
)
from time import sleep


BUSYBOX = "docker.io/library/busybox"


class Counter:
def __init__(self):
self.n = 0

def inc(self):
self.n += 1
return self.n


def test_execute_cmd():
r = execute_cmd(["echo", "a"], capture="both", break_callback=None)
assert list(r) == ["a\n"]

c = Counter()
with pytest.raises(ProcessTerminated):
r = execute_cmd(
["sleep", "1m"],
capture="both",
read_timeout=1,
break_callback=lambda: c.inc() == 2,
)
list(r)
assert c.n == 2


def test_run():
client = PodmanEngine(parent=None)
c = client.run(BUSYBOX, command=["id", "-un"])
Expand All @@ -32,9 +63,9 @@ def test_run():
def test_run_autoremove():
client = PodmanEngine(parent=None)
# Need to sleep in container to prevent race condition
c = client.run(BUSYBOX, command=["sh", "-c", "sleep 1; id -un"], remove=True)
c = client.run(BUSYBOX, command=["sh", "-c", "sleep 2; id -un"], remove=True)
# Sleep to ensure container has exited
sleep(2)
sleep(3)
with pytest.raises(PodmanCommandError) as exc:
c.reload()
assert "".join(exc.value.output).strip() == "[]"
Expand Down