diff --git a/dvc/daemon.py b/dvc/daemon.py index cc41cb0eb0..88b55c913c 100644 --- a/dvc/daemon.py +++ b/dvc/daemon.py @@ -3,32 +3,20 @@ import inspect import logging import os -import platform import subprocess # nosec B404 import sys from contextlib import nullcontext -from typing import ( - TYPE_CHECKING, - Any, - Iterable, - List, - Mapping, - Optional, - Sequence, - Union, -) +from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Sequence, Union + +if TYPE_CHECKING: + from typing import ContextManager from dvc.env import DVC_DAEMON, DVC_DAEMON_LOGFILE from dvc.utils import fix_env, is_binary from dvc.utils.collections import ensure_list -if TYPE_CHECKING: - from typing import IO, ContextManager - logger = logging.getLogger(__name__) -_FILE = Union[None, int, "IO[Any]"] - def _suppress_resource_warning(popen: subprocess.Popen) -> None: """Sets the returncode to avoid ResourceWarning when popen is garbage collected.""" @@ -37,43 +25,28 @@ def _suppress_resource_warning(popen: subprocess.Popen) -> None: popen.returncode = 0 -def run_detached( - args: Sequence[str], - env: Optional[Mapping[str, str]] = None, - stdin: _FILE = subprocess.DEVNULL, - stdout: _FILE = subprocess.DEVNULL, - stderr: _FILE = subprocess.DEVNULL, - **kwargs: Any, -) -> int: - # NOTE: this may create zombie processes on unix - startupinfo = None - creationflags = 0 - if sys.platform == "win32": - from subprocess import ( # nosec B404 - CREATE_NEW_PROCESS_GROUP, - CREATE_NO_WINDOW, - STARTF_USESHOWWINDOW, - STARTUPINFO, - ) - - # https://stackoverflow.com/a/7006424 - # https://bugs.python.org/issue41619 - creationflags = CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW - - startupinfo = STARTUPINFO() - startupinfo.dwFlags |= STARTF_USESHOWWINDOW +def _win_detached_subprocess(args: Sequence[str], **kwargs) -> int: + assert os.name == "nt" + from subprocess import ( # type: ignore[attr-defined] # nosec B404 + CREATE_NEW_PROCESS_GROUP, + CREATE_NO_WINDOW, + STARTF_USESHOWWINDOW, + STARTUPINFO, + ) + + # https://stackoverflow.com/a/7006424 + # https://bugs.python.org/issue41619 + creationflags = CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW + + startupinfo = STARTUPINFO() + startupinfo.dwFlags |= STARTF_USESHOWWINDOW popen = subprocess.Popen( args, - stdin=stdin, - stdout=stdout, - stderr=stderr, close_fds=True, shell=False, # noqa: S603 # nosec B603 - env=env, startupinfo=startupinfo, creationflags=creationflags, - start_new_session=True, **kwargs, ) _suppress_resource_warning(popen) @@ -89,20 +62,8 @@ def _get_dvc_args() -> List[str]: return args -def _spawn_posix( - executable: List[str], - args: List[str], - env: Optional[Mapping[str, str]] = None, - output_file: Optional[str] = None, -) -> None: - from dvc.cli import main - - # `fork` will copy buffers, so we need to flush them before forking. - # Otherwise, we will get duplicated outputs. - if sys.stdout and not sys.stdout.closed: - sys.stdout.flush() - if sys.stderr and not sys.stderr.closed: - sys.stderr.flush() +def _fork_process() -> int: + assert os.name == "posix" # NOTE: using os._exit instead of sys.exit, because dvc built # with PyInstaller has trouble with SystemExit exception and throws @@ -111,7 +72,7 @@ def _spawn_posix( # pylint: disable-next=no-member pid = os.fork() # type: ignore[attr-defined] if pid > 0: - return + return pid except OSError: logger.exception("failed at first fork") os._exit(1) # pylint: disable=protected-access @@ -129,83 +90,85 @@ def _spawn_posix( # disconnect from the terminal fd = os.open(os.devnull, os.O_RDWR) - os.dup2(fd, 0) + for fd2 in range(3): + os.dup2(fd, fd2) os.close(fd) + return pid - with open(output_file or os.devnull, "ab") as f: - os.dup2(f.fileno(), 1) - os.dup2(f.fileno(), 2) - if platform.system() == "Darwin": - # workaround for MacOS bug - # https://github.com/iterative/dvc/issues/4294 - subprocess.Popen( - executable + args, env=env, shell=False # noqa: S603 # nosec B603 - ).communicate() - else: - os.environ.update(env or {}) - main(args) +def _posix_detached_subprocess(args: Sequence[str], **kwargs) -> Optional[int]: + # double fork and execute a subprocess so that there are no zombies + pid = _fork_process() + if pid > 0: # in parent + return None + + proc = subprocess.Popen( + args, + shell=False, # noqa: S603 # nosec B603 + close_fds=True, + **kwargs, + ) + exit_code = proc.wait() + os._exit(exit_code) - os._exit(0) # pylint: disable=protected-access +def _detached_subprocess(args: Sequence[str], **kwargs) -> Optional[int]: + """Run in a detached subprocess.""" + kwargs.setdefault("stdin", subprocess.DEVNULL) + kwargs.setdefault("stdout", subprocess.DEVNULL) + kwargs.setdefault("stderr", subprocess.DEVNULL) -def daemon(args: Iterable[str]) -> None: + if os.name == "nt": + return _win_detached_subprocess(args, **kwargs) + _posix_detached_subprocess(args, **kwargs) + return None + + +def _map_log_level_to_flag() -> Optional[str]: + flags = {logging.DEBUG: "-v", logging.TRACE: "-vv"} # type: ignore[attr-defined] + return flags.get(logger.getEffectiveLevel()) + + +def daemon(args: List[str]) -> None: """Launch a `dvc daemon` command in a detached process. Args: args (list): list of arguments to append to `dvc daemon` command. """ - flags = { - logging.CRITICAL: "-q", - logging.DEBUG: "-v", - logging.TRACE: "-vv", # type: ignore[attr-defined] - } - args = list(args) - if flag := flags.get(logger.getEffectiveLevel()): - args.append(flag) + if flag := _map_log_level_to_flag(): + args = [*args, flag] daemonize(["daemon", *args]) -def _spawn_subprocess( - executable: List[str], +def _spawn( args: List[str], + executable: Optional[Union[str, List[str]]] = None, env: Optional[Mapping[str, str]] = None, output_file: Optional[str] = None, ) -> Optional[int]: - # adapt run_detached to _spawn's interface file: "ContextManager[Any]" = nullcontext() - kw = {} + kwargs = {} if output_file: file = open(output_file, "ab") # noqa: SIM115 - kw = {"stdout": file, "stderr": file} + kwargs = {"stdout": file, "stderr": file} - with file: - return run_detached(executable + args, env, **kw) + if executable is None: + executable = _get_dvc_args() + else: + executable = ensure_list(executable) + with file: + return _detached_subprocess(executable + args, env=env, **kwargs) -def _spawn( - executable: List[str], - args: List[str], - env: Optional[Mapping[str, str]] = None, - output_file: Optional[str] = None, -) -> Optional[int]: - if os.name == "nt": - return _spawn_subprocess(executable, args, env, output_file=output_file) - if os.name == "posix": - _spawn_posix(executable, args, env, output_file=output_file) +def daemonize(args: List[str], executable: Union[None, str, List[str]] = None) -> None: + if os.name not in ("posix", "nt"): return None - raise NotImplementedError - - -def daemonize(args: List[str], executable: Union[None, str, List[str]] = None) -> None: if os.environ.get(DVC_DAEMON): logger.debug("skipping launching a new daemon.") return - executable = _get_dvc_args() if executable is None else ensure_list(executable) - env = fix_env() env[DVC_DAEMON] = "1" if not is_binary(): @@ -213,5 +176,5 @@ def daemonize(args: List[str], executable: Union[None, str, List[str]] = None) - env["PYTHONPATH"] = os.path.dirname(os.path.dirname(file_path)) logger.debug("Trying to spawn %r", args) - pid = _spawn(executable, args, env, output_file=env.get(DVC_DAEMON_LOGFILE)) + pid = _spawn(args, executable, env, output_file=env.get(DVC_DAEMON_LOGFILE)) logger.debug("Spawned %r%s", args, f" with pid {pid}" if pid else "") diff --git a/tests/unit/test_daemon.py b/tests/unit/test_daemon.py index 20595eec0b..91571d9b6f 100644 --- a/tests/unit/test_daemon.py +++ b/tests/unit/test_daemon.py @@ -5,19 +5,11 @@ def test_daemon(mocker): - mock_windows = mocker.patch("dvc.daemon._spawn_subprocess") - mock_posix = mocker.patch("dvc.daemon._spawn_posix") + mock = mocker.patch("dvc.daemon._spawn") daemon.daemon(["updater"]) - if os.name == "nt": - mock_posix.assert_not_called() - mock_windows.assert_called() - args = mock_windows.call_args[0] - else: - mock_windows.assert_not_called() - mock_posix.assert_called() - args = mock_posix.call_args[0] - + mock.assert_called() + args = mock.call_args[0] env = args[2] assert "PYTHONPATH" in env