Skip to content

Commit

Permalink
daemon: ipc to log pid from the child (#10058)
Browse files Browse the repository at this point in the history
* daemon: ipc to log pid from the child

* fix tests to use pid

* also test for updater/analytics keyword
  • Loading branch information
skshetry authored Oct 30, 2023
1 parent 48f1f52 commit 89b4beb
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 36 deletions.
21 changes: 14 additions & 7 deletions dvc/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,32 +96,39 @@ def _fork_process() -> int:
return pid


def _posix_detached_subprocess(args: Sequence[str], **kwargs) -> Optional[int]:
def _posix_detached_subprocess(args: Sequence[str], **kwargs) -> int:
# double fork and execute a subprocess so that there are no zombies
read_end, write_end = os.pipe()
pid = _fork_process()
if pid > 0: # in parent
return None
os.close(write_end)
pid_str = os.read(read_end, 32).decode("utf8")
os.close(read_end)
return int(pid_str)

proc = subprocess.Popen(
args,
shell=False, # noqa: S603 # nosec B603
close_fds=True,
**kwargs,
)
os.close(read_end)
os.write(write_end, str(proc.pid).encode("utf8"))
os.close(write_end)

exit_code = proc.wait()
os._exit(exit_code)


def _detached_subprocess(args: Sequence[str], **kwargs) -> Optional[int]:
def _detached_subprocess(args: Sequence[str], **kwargs) -> int:
"""Run in a detached subprocess."""
kwargs.setdefault("stdin", subprocess.DEVNULL)
kwargs.setdefault("stdout", subprocess.DEVNULL)
kwargs.setdefault("stderr", subprocess.DEVNULL)

if os.name == "nt":
return _win_detached_subprocess(args, **kwargs)
_posix_detached_subprocess(args, **kwargs)
return None
return _posix_detached_subprocess(args, **kwargs)


def _map_log_level_to_flag() -> Optional[str]:
Expand All @@ -145,7 +152,7 @@ def _spawn(
executable: Optional[Union[str, List[str]]] = None,
env: Optional[Mapping[str, str]] = None,
output_file: Optional[str] = None,
) -> Optional[int]:
) -> int:
file: "ContextManager[Any]" = nullcontext()
kwargs = {}
if output_file:
Expand Down Expand Up @@ -177,4 +184,4 @@ def daemonize(args: List[str], executable: Union[None, str, List[str]] = None) -

logger.debug("Trying to spawn %r", args)
pid = _spawn(args, executable, env, output_file=env.get(DVC_DAEMON_LOGFILE))
logger.debug("Spawned %r%s", args, f" with pid {pid}" if pid else "")
logger.debug("Spawned %r with pid %s", args, pid)
48 changes: 19 additions & 29 deletions tests/func/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import re
import subprocess
import sys
import time
from collections import defaultdict
from contextlib import contextmanager
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from threading import Thread
from typing import Dict, Iterator

import psutil
import pytest

from dvc import version_tuple
Expand Down Expand Up @@ -82,16 +82,6 @@ def server():
yield httpd


def retry_until(pred, timeout):
timeout_ns = timeout * 1e9
start = time.perf_counter_ns()
while time.perf_counter_ns() - start < timeout_ns:
if pred():
return
time.sleep(0.01)
raise RuntimeError(f"timed out after {timeout}s")


def test_analytics(tmp_path, server):
addr = server.server_address
logfile = tmp_path / "logfile"
Expand All @@ -110,18 +100,17 @@ def test_analytics(tmp_path, server):
text=True,
)

pattern = r".*Saving analytics report to (.*)"
for line in output.splitlines():
if match := re.search(pattern, line):
report_file = match.group(1).strip()
break
else:
raise AssertionError("no match for the report file")

# wait until the file disappears
retry_until(lambda: not os.path.exists(report_file), 10)
# wait till the daemon exits
retry_until(lambda: "exiting with 0" in logfile.read_text(encoding="utf8"), 5)
match = re.search(r".*Saving analytics report to (.*)", output, flags=re.M)
assert match, "no match for the report file"
report_file = match.group(1).strip()

match = re.search(r".*Spawned .*analytics.* with pid (.*)", output, flags=re.M)
assert match, "no match for the pid"
pid = int(match.group(1).strip())

psutil.Process(pid).wait(timeout=10)
assert not os.path.exists(report_file)
assert f"Process {pid} exiting with 0" in logfile.read_text(encoding="utf8")
assert server.RequestHandlerClass.hits == {"POST": 1}


Expand All @@ -139,20 +128,21 @@ def test_updater(tmp_dir, dvc, server):
env.pop("DVC_TEST", None)
env.pop("CI", None)

subprocess.check_output(
output = subprocess.check_output(
[*_get_dvc_args(), "version", "-vv"],
env=env,
text=True,
)

updater_file = Path(dvc.tmp_dir) / Updater.UPDATER_FILE
match = re.search(r".*Spawned .*updater.* with pid (.*)", output, flags=re.M)
assert match, "no match for the pid"
pid = int(match.group(1).strip())

# wait until the updater file appears
retry_until(updater_file.is_file, 10)
# wait till the daemon exits
retry_until(lambda: "exiting with 0" in logfile.read_text(encoding="utf8"), 5)
psutil.Process(pid).wait(timeout=10)
assert f"Process {pid} exiting with 0" in logfile.read_text(encoding="utf8")
assert server.RequestHandlerClass.hits == {"GET": 1}
# check that the file is saved correctly
updater_file = Path(dvc.tmp_dir) / Updater.UPDATER_FILE
assert json.loads(updater_file.read_text(encoding="utf8")) == UPDATER_INFO


Expand Down

0 comments on commit 89b4beb

Please sign in to comment.