Skip to content

Commit

Permalink
refactor: dvc.daemon (#10045)
Browse files Browse the repository at this point in the history
* minor refactor to dvc.daemon

* fix tests

* run daemon only inside a subprocess
  • Loading branch information
skshetry authored Oct 30, 2023
1 parent 3fee81f commit 48f1f52
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 119 deletions.
179 changes: 71 additions & 108 deletions dvc/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,20 @@
import inspect
import logging
import os
import platform
import subprocess # nosec B404
import sys
from contextlib import nullcontext
from typing import (
TYPE_CHECKING,
Any,
Iterable,
List,
Mapping,
Optional,
Sequence,
Union,
)
from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Sequence, Union

if TYPE_CHECKING:
from typing import ContextManager

from dvc.env import DVC_DAEMON, DVC_DAEMON_LOGFILE
from dvc.utils import fix_env, is_binary
from dvc.utils.collections import ensure_list

if TYPE_CHECKING:
from typing import IO, ContextManager

logger = logging.getLogger(__name__)

_FILE = Union[None, int, "IO[Any]"]


def _suppress_resource_warning(popen: subprocess.Popen) -> None:
"""Sets the returncode to avoid ResourceWarning when popen is garbage collected."""
Expand All @@ -37,43 +25,28 @@ def _suppress_resource_warning(popen: subprocess.Popen) -> None:
popen.returncode = 0


def run_detached(
args: Sequence[str],
env: Optional[Mapping[str, str]] = None,
stdin: _FILE = subprocess.DEVNULL,
stdout: _FILE = subprocess.DEVNULL,
stderr: _FILE = subprocess.DEVNULL,
**kwargs: Any,
) -> int:
# NOTE: this may create zombie processes on unix
startupinfo = None
creationflags = 0
if sys.platform == "win32":
from subprocess import ( # nosec B404
CREATE_NEW_PROCESS_GROUP,
CREATE_NO_WINDOW,
STARTF_USESHOWWINDOW,
STARTUPINFO,
)

# https://stackoverflow.com/a/7006424
# https://bugs.python.org/issue41619
creationflags = CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW

startupinfo = STARTUPINFO()
startupinfo.dwFlags |= STARTF_USESHOWWINDOW
def _win_detached_subprocess(args: Sequence[str], **kwargs) -> int:
assert os.name == "nt"

from subprocess import ( # type: ignore[attr-defined] # nosec B404
CREATE_NEW_PROCESS_GROUP,
CREATE_NO_WINDOW,
STARTF_USESHOWWINDOW,
STARTUPINFO,
)

# https://stackoverflow.com/a/7006424
# https://bugs.python.org/issue41619
creationflags = CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW

startupinfo = STARTUPINFO()
startupinfo.dwFlags |= STARTF_USESHOWWINDOW
popen = subprocess.Popen(
args,
stdin=stdin,
stdout=stdout,
stderr=stderr,
close_fds=True,
shell=False, # noqa: S603 # nosec B603
env=env,
startupinfo=startupinfo,
creationflags=creationflags,
start_new_session=True,
**kwargs,
)
_suppress_resource_warning(popen)
Expand All @@ -89,20 +62,8 @@ def _get_dvc_args() -> List[str]:
return args


def _spawn_posix(
executable: List[str],
args: List[str],
env: Optional[Mapping[str, str]] = None,
output_file: Optional[str] = None,
) -> None:
from dvc.cli import main

# `fork` will copy buffers, so we need to flush them before forking.
# Otherwise, we will get duplicated outputs.
if sys.stdout and not sys.stdout.closed:
sys.stdout.flush()
if sys.stderr and not sys.stderr.closed:
sys.stderr.flush()
def _fork_process() -> int:
assert os.name == "posix"

# NOTE: using os._exit instead of sys.exit, because dvc built
# with PyInstaller has trouble with SystemExit exception and throws
Expand All @@ -111,7 +72,7 @@ def _spawn_posix(
# pylint: disable-next=no-member
pid = os.fork() # type: ignore[attr-defined]
if pid > 0:
return
return pid
except OSError:
logger.exception("failed at first fork")
os._exit(1) # pylint: disable=protected-access
Expand All @@ -129,89 +90,91 @@ def _spawn_posix(

# disconnect from the terminal
fd = os.open(os.devnull, os.O_RDWR)
os.dup2(fd, 0)
for fd2 in range(3):
os.dup2(fd, fd2)
os.close(fd)
return pid

with open(output_file or os.devnull, "ab") as f:
os.dup2(f.fileno(), 1)
os.dup2(f.fileno(), 2)

if platform.system() == "Darwin":
# workaround for MacOS bug
# https://github.com/iterative/dvc/issues/4294
subprocess.Popen(
executable + args, env=env, shell=False # noqa: S603 # nosec B603
).communicate()
else:
os.environ.update(env or {})
main(args)
def _posix_detached_subprocess(args: Sequence[str], **kwargs) -> Optional[int]:
# double fork and execute a subprocess so that there are no zombies
pid = _fork_process()
if pid > 0: # in parent
return None

proc = subprocess.Popen(
args,
shell=False, # noqa: S603 # nosec B603
close_fds=True,
**kwargs,
)
exit_code = proc.wait()
os._exit(exit_code)

os._exit(0) # pylint: disable=protected-access

def _detached_subprocess(args: Sequence[str], **kwargs) -> Optional[int]:
"""Run in a detached subprocess."""
kwargs.setdefault("stdin", subprocess.DEVNULL)
kwargs.setdefault("stdout", subprocess.DEVNULL)
kwargs.setdefault("stderr", subprocess.DEVNULL)

def daemon(args: Iterable[str]) -> None:
if os.name == "nt":
return _win_detached_subprocess(args, **kwargs)
_posix_detached_subprocess(args, **kwargs)
return None


def _map_log_level_to_flag() -> Optional[str]:
flags = {logging.DEBUG: "-v", logging.TRACE: "-vv"} # type: ignore[attr-defined]
return flags.get(logger.getEffectiveLevel())


def daemon(args: List[str]) -> None:
"""Launch a `dvc daemon` command in a detached process.
Args:
args (list): list of arguments to append to `dvc daemon` command.
"""
flags = {
logging.CRITICAL: "-q",
logging.DEBUG: "-v",
logging.TRACE: "-vv", # type: ignore[attr-defined]
}
args = list(args)
if flag := flags.get(logger.getEffectiveLevel()):
args.append(flag)
if flag := _map_log_level_to_flag():
args = [*args, flag]
daemonize(["daemon", *args])


def _spawn_subprocess(
executable: List[str],
def _spawn(
args: List[str],
executable: Optional[Union[str, List[str]]] = None,
env: Optional[Mapping[str, str]] = None,
output_file: Optional[str] = None,
) -> Optional[int]:
# adapt run_detached to _spawn's interface
file: "ContextManager[Any]" = nullcontext()
kw = {}
kwargs = {}
if output_file:
file = open(output_file, "ab") # noqa: SIM115
kw = {"stdout": file, "stderr": file}
kwargs = {"stdout": file, "stderr": file}

with file:
return run_detached(executable + args, env, **kw)
if executable is None:
executable = _get_dvc_args()
else:
executable = ensure_list(executable)

with file:
return _detached_subprocess(executable + args, env=env, **kwargs)

def _spawn(
executable: List[str],
args: List[str],
env: Optional[Mapping[str, str]] = None,
output_file: Optional[str] = None,
) -> Optional[int]:
if os.name == "nt":
return _spawn_subprocess(executable, args, env, output_file=output_file)

if os.name == "posix":
_spawn_posix(executable, args, env, output_file=output_file)
def daemonize(args: List[str], executable: Union[None, str, List[str]] = None) -> None:
if os.name not in ("posix", "nt"):
return None

raise NotImplementedError


def daemonize(args: List[str], executable: Union[None, str, List[str]] = None) -> None:
if os.environ.get(DVC_DAEMON):
logger.debug("skipping launching a new daemon.")
return

executable = _get_dvc_args() if executable is None else ensure_list(executable)

env = fix_env()
env[DVC_DAEMON] = "1"
if not is_binary():
file_path = os.path.abspath(inspect.stack()[0][1])
env["PYTHONPATH"] = os.path.dirname(os.path.dirname(file_path))

logger.debug("Trying to spawn %r", args)
pid = _spawn(executable, args, env, output_file=env.get(DVC_DAEMON_LOGFILE))
pid = _spawn(args, executable, env, output_file=env.get(DVC_DAEMON_LOGFILE))
logger.debug("Spawned %r%s", args, f" with pid {pid}" if pid else "")
14 changes: 3 additions & 11 deletions tests/unit/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,11 @@


def test_daemon(mocker):
mock_windows = mocker.patch("dvc.daemon._spawn_subprocess")
mock_posix = mocker.patch("dvc.daemon._spawn_posix")
mock = mocker.patch("dvc.daemon._spawn")
daemon.daemon(["updater"])

if os.name == "nt":
mock_posix.assert_not_called()
mock_windows.assert_called()
args = mock_windows.call_args[0]
else:
mock_windows.assert_not_called()
mock_posix.assert_called()
args = mock_posix.call_args[0]

mock.assert_called()
args = mock.call_args[0]
env = args[2]
assert "PYTHONPATH" in env

Expand Down

0 comments on commit 48f1f52

Please sign in to comment.