From a051b8b60f5ca7268ff1b1f7e8454f703a5539c6 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Fri, 11 Aug 2023 14:07:50 +0900 Subject: [PATCH] remove support for `dvc machine` --- dvc/cli/parser.py | 2 - dvc/commands/experiments/run.py | 10 - dvc/commands/machine.py | 490 -------------------- dvc/machine/__init__.py | 227 --------- dvc/machine/backend/__init__.py | 0 dvc/machine/backend/base.py | 50 -- dvc/machine/backend/terraform.py | 99 ---- dvc/repo/__init__.py | 12 - dvc/repo/experiments/__init__.py | 4 - dvc/repo/experiments/executor/ssh.py | 292 ------------ pyproject.toml | 3 +- tests/func/experiments/executor/test_ssh.py | 162 ------- tests/func/machine/__init__.py | 0 tests/func/machine/conftest.py | 55 --- tests/func/machine/test_machine_config.py | 163 ------- tests/func/machine/test_machine_status.py | 22 - tests/unit/command/test_experiments.py | 1 - tests/unit/command/test_machine.py | 167 ------- tests/unit/machine/__init__.py | 0 tests/unit/machine/test_machine.py | 36 -- 20 files changed, 1 insertion(+), 1794 deletions(-) delete mode 100644 dvc/commands/machine.py delete mode 100644 dvc/machine/__init__.py delete mode 100644 dvc/machine/backend/__init__.py delete mode 100644 dvc/machine/backend/base.py delete mode 100644 dvc/machine/backend/terraform.py delete mode 100644 dvc/repo/experiments/executor/ssh.py delete mode 100644 tests/func/experiments/executor/test_ssh.py delete mode 100644 tests/func/machine/__init__.py delete mode 100644 tests/func/machine/conftest.py delete mode 100644 tests/func/machine/test_machine_config.py delete mode 100644 tests/func/machine/test_machine_status.py delete mode 100644 tests/unit/command/test_machine.py delete mode 100644 tests/unit/machine/__init__.py delete mode 100644 tests/unit/machine/test_machine.py diff --git a/dvc/cli/parser.py b/dvc/cli/parser.py index d38a13bd51..10e1cce540 100644 --- a/dvc/cli/parser.py +++ b/dvc/cli/parser.py @@ -31,7 +31,6 @@ install, ls, ls_url, - machine, metrics, move, params, @@ -89,7 +88,6 @@ stage, experiments, check_ignore, - machine, data, ] diff --git a/dvc/commands/experiments/run.py b/dvc/commands/experiments/run.py index a6faabac7e..dd8c90cd3e 100644 --- a/dvc/commands/experiments/run.py +++ b/dvc/commands/experiments/run.py @@ -17,7 +17,6 @@ def run(self): jobs=self.args.jobs, params=self.args.set_param, tmp_dir=self.args.tmp_dir, - machine=self.args.machine, copy_paths=self.args.copy_paths, message=self.args.message, **self._common_kwargs, @@ -90,15 +89,6 @@ def _add_run_common(parser): "your workspace." ), ) - parser.add_argument( - "--machine", - default=None, - help=argparse.SUPPRESS, - # help=( - # "Run this experiment on the specified 'dvc machine' instance." - # ) - # metavar="", - ) parser.add_argument( "-C", "--copy-paths", diff --git a/dvc/commands/machine.py b/dvc/commands/machine.py deleted file mode 100644 index 5dbd32d6af..0000000000 --- a/dvc/commands/machine.py +++ /dev/null @@ -1,490 +0,0 @@ -import argparse -from typing import Dict, List - -from dvc.cli.command import CmdBase -from dvc.cli.utils import append_doc_link, fix_subparsers -from dvc.commands.config import CmdConfig -from dvc.compare import TabularData -from dvc.config import ConfigError -from dvc.exceptions import DvcException -from dvc.ui import ui -from dvc.utils import format_link - - -class MachineDisabledError(ConfigError): - def __init__(self): - super().__init__("Machine feature is disabled") - - -class CmdMachineConfig(CmdConfig): - def __init__(self, args): - super().__init__(args) - if not self.config["feature"].get("machine", False): - raise MachineDisabledError - - if getattr(self.args, "name", None): - self.args.name = self.args.name.lower() - - def _check_exists(self, conf): - if self.args.name not in conf["machine"]: - raise ConfigError(f"machine '{self.args.name}' doesn't exist.") - - -class CmdMachineAdd(CmdMachineConfig): - def run(self): - from dvc.machine import validate_name - - validate_name(self.args.name) - - if self.args.default: - ui.write(f"Setting '{self.args.name}' as a default machine.") - - with self.config.edit(self.args.level) as conf: - if self.args.name in conf["machine"] and not self.args.force: - raise ConfigError( - "machine '{}' already exists. Use `-f|--force` to " - "overwrite it.".format(self.args.name) - ) - - conf["machine"][self.args.name] = {"cloud": self.args.cloud} - if self.args.default: - conf["core"]["machine"] = self.args.name - - return 0 - - -class CmdMachineRemove(CmdMachineConfig): - def run(self): - with self.config.edit(self.args.level) as conf: - self._check_exists(conf) - del conf["machine"][self.args.name] - - up_to_level = self.args.level or "repo" - # Remove core.machine refs to this machine in any shadowing configs - for level in reversed(self.config.LEVELS): - with self.config.edit(level) as conf: - if conf["core"].get("machine") == self.args.name: - del conf["core"]["machine"] - - if level == up_to_level: - break - - return 0 - - -class CmdMachineList(CmdMachineConfig): - TABLE_COLUMNS = [ - "name", - "cloud", - "region", - "image", - "spot", - "spot_price", - "instance_hdd_size", - "instance_type", - "ssh_private", - "startup_script", - ] - - PRIVATE_COLUMNS = ["ssh_private", "startup_script"] - - def _hide_private(self, conf): - for machine in conf: - for column in self.PRIVATE_COLUMNS: - if column in conf[machine]: - conf[machine][column] = "***" - - def _show_origin(self): - levels = [self.args.level] if self.args.level else self.config.LEVELS - for level in levels: - conf = self.config.read(level)["machine"] - if self.args.name: - conf = conf.get(self.args.name, {}) - self._hide_private(conf) - prefix = self._config_file_prefix(True, self.config, level) - configs = list(self._format_config(conf, prefix)) - if configs: - ui.write("\n".join(configs)) - - def _show_table(self): - td = TabularData(self.TABLE_COLUMNS, fill_value="-") - conf = self.config.read()["machine"] - if self.args.name: - conf = {self.args.name: conf.get(self.args.name, {})} - self._hide_private(conf) - for machine, machine_config in conf.items(): - machine_config["name"] = machine - td.row_from_dict(machine_config) - td.dropna("cols", "all") - td.render() - - def run(self): - if self.args.show_origin: - self._show_origin() - else: - self._show_table() - return 0 - - -class CmdMachineModify(CmdMachineConfig): - def run(self): - from dvc.config import merge - - with self.config.edit(self.args.level) as conf: - merged = self.config.load_config_to_level(self.args.level) - merge(merged, conf) - self._check_exists(merged) - - if self.args.name not in conf["machine"]: - conf["machine"][self.args.name] = {} - section = conf["machine"][self.args.name] - if self.args.unset: - section.pop(self.args.option, None) - else: - section[self.args.option] = self.args.value - return 0 - - -class CmdMachineRename(CmdBase): - def _check_exists(self, conf): - if self.args.name not in conf["machine"]: - raise ConfigError(f"machine '{self.args.name}' doesn't exist.") - - def _rename_default(self, conf): - if conf["core"].get("machine") == self.args.name: - conf["core"]["machine"] = self.args.new - - def _check_before_rename(self): - from dvc.machine import validate_name - - validate_name(self.args.new) - - all_config = self.config.load_config_to_level(None) - if self.args.new in all_config.get("machine", {}): - raise ConfigError( - f"Rename failed. Machine '{self.args.new}' already exists." - ) - ui.write(f"Rename machine '{self.args.name}' to '{self.args.new}'.") - - def run(self): - self._check_before_rename() - - with self.config.edit(self.args.level) as conf: - self._check_exists(conf) - conf["machine"][self.args.new] = conf["machine"][self.args.name] - try: - assert self.repo.machine - self.repo.machine.rename(self.args.name, self.args.new) - except DvcException as error: - del conf["machine"][self.args.new] - raise ConfigError("terraform rename failed") from error - del conf["machine"][self.args.name] - self._rename_default(conf) - - up_to_level = self.args.level or "repo" - for level in reversed(self.config.LEVELS): - if level == up_to_level: - break - with self.config.edit(level) as level_conf: - self._rename_default(level_conf) - - return 0 - - -class CmdMachineDefault(CmdMachineConfig): - def run(self): - if self.args.name is None and not self.args.unset: - conf = self.config.read(self.args.level) - try: - ui.write(conf["core"]["machine"]) - except KeyError: - ui.write("No default machine set") - return 1 - else: - with self.config.edit(self.args.level) as conf: - if self.args.unset: - conf["core"].pop("machine", None) - else: - merged_conf = self.config.load_config_to_level(self.args.level) - if ( - self.args.name in conf["machine"] - or self.args.name in merged_conf["machine"] - ): - conf["core"]["machine"] = self.args.name - else: - raise ConfigError( - "default machine must be present in machine list." - ) - return 0 - - -class CmdMachineCreate(CmdBase): - def run(self): - if self.repo.machine is None: - raise MachineDisabledError - - self.repo.machine.create(self.args.name) - return 0 - - -class CmdMachineStatus(CmdBase): - INSTANCE_FIELD = ["name", "instance", "status"] - SHOWN_FIELD = [ - "cloud", - "instance_ip", - "instance_type", - "instance_hdd_size", - "instance_gpu", - ] - FILL_VALUE = "-" - - def _add_row( - self, - name: str, - all_status: List[Dict], - td: TabularData, - ): - if not all_status: - row = [ - name, - self.FILL_VALUE, - "offline", - ] # back to `None` after #7167 - td.append(row) - for i, status in enumerate(all_status, start=1): - row = [name, f"num_{i}", "running" if status else "offline"] - for field in self.SHOWN_FIELD: - value = str(status.get(field, "")) - row.append(value) - td.append(row) - - def run(self): - if self.repo.machine is None: - raise MachineDisabledError - - td = TabularData( - self.INSTANCE_FIELD + self.SHOWN_FIELD, fill_value=self.FILL_VALUE - ) - - if self.args.name: - all_status = list(self.repo.machine.status(self.args.name)) - self._add_row(self.args.name, all_status, td) - else: - name_set = set() - for level in self.repo.config.LEVELS: - conf = self.repo.config.read(level)["machine"] - name_set.update(conf.keys()) - name_list = list(name_set) - for name in sorted(name_list): - all_status = list(self.repo.machine.status(name)) - self._add_row(name, all_status, td) - - td.dropna("cols", "all") - td.render() - return 0 - - -class CmdMachineDestroy(CmdBase): - def run(self): - if self.repo.machine is None: - raise MachineDisabledError - - self.repo.machine.destroy(self.args.name) - return 0 - - -class CmdMachineSsh(CmdBase): - def run(self): - if self.repo.machine is None: - raise MachineDisabledError - - self.repo.machine.run_shell(self.args.name) - return 0 - - -def add_parser(subparsers, parent_parser): # noqa: PLR0915 - from dvc.commands.config import parent_config_parser - - machine_HELP = "Set up and manage cloud machines." - machine_parser = subparsers.add_parser( - "machine", - parents=[parent_parser], - description=append_doc_link(machine_HELP, "machine"), - # NOTE: suppress help during development to hide command - # help=machine_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - machine_subparsers = machine_parser.add_subparsers( - dest="cmd", - help="Use `dvc machine CMD --help` for command-specific help.", - ) - - fix_subparsers(machine_subparsers) - - machine_ADD_HELP = "Add a new data machine." - machine_add_parser = machine_subparsers.add_parser( - "add", - parents=[parent_config_parser, parent_parser], - description=append_doc_link(machine_ADD_HELP, "machine/add"), - help=machine_ADD_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_add_parser.add_argument("name", help="Name of the machine") - machine_add_parser.add_argument( - "cloud", - help="Machine cloud. See full list of supported clouds at {}".format( - format_link( - "https://github.com/iterative/terraform-provider-iterative#machine" - ) - ), - ) - machine_add_parser.add_argument( - "-d", - "--default", - action="store_true", - default=False, - help="Set as default machine.", - ) - machine_add_parser.add_argument( - "-f", - "--force", - action="store_true", - default=False, - help="Force overwriting existing configs", - ) - machine_add_parser.set_defaults(func=CmdMachineAdd) - - machine_DEFAULT_HELP = "Set/unset the default machine." - machine_default_parser = machine_subparsers.add_parser( - "default", - parents=[parent_config_parser, parent_parser], - description=append_doc_link(machine_DEFAULT_HELP, "machine/default"), - help=machine_DEFAULT_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_default_parser.add_argument("name", nargs="?", help="Name of the machine") - machine_default_parser.add_argument( - "-u", - "--unset", - action="store_true", - default=False, - help="Unset default machine.", - ) - machine_default_parser.set_defaults(func=CmdMachineDefault) - - machine_LIST_HELP = "List the configuration of one/all machines." - machine_list_parser = machine_subparsers.add_parser( - "list", - parents=[parent_config_parser, parent_parser], - description=append_doc_link(machine_LIST_HELP, "machine/list"), - help=machine_LIST_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_list_parser.add_argument( - "--show-origin", - default=False, - action="store_true", - help="Show the source file containing each config value.", - ) - machine_list_parser.add_argument( - "name", - nargs="?", - type=str, - help="name of machine to specify", - ) - machine_list_parser.set_defaults(func=CmdMachineList) - machine_MODIFY_HELP = "Modify the configuration of an machine." - machine_modify_parser = machine_subparsers.add_parser( - "modify", - parents=[parent_config_parser, parent_parser], - description=append_doc_link(machine_MODIFY_HELP, "machine/modify"), - help=machine_MODIFY_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_modify_parser.add_argument("name", help="Name of the machine") - machine_modify_parser.add_argument("option", help="Name of the option to modify.") - machine_modify_parser.add_argument( - "value", nargs="?", help="(optional) Value of the option." - ) - machine_modify_parser.add_argument( - "-u", - "--unset", - default=False, - action="store_true", - help="Unset option.", - ) - machine_modify_parser.set_defaults(func=CmdMachineModify) - - machine_RENAME_HELP = "Rename a machine " - machine_rename_parser = machine_subparsers.add_parser( - "rename", - parents=[parent_config_parser, parent_parser], - description=append_doc_link(machine_RENAME_HELP, "remote/rename"), - help=machine_RENAME_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_rename_parser.add_argument("name", help="Machine to be renamed") - machine_rename_parser.add_argument("new", help="New name of the machine") - machine_rename_parser.set_defaults(func=CmdMachineRename) - - machine_REMOVE_HELP = "Remove an machine." - machine_remove_parser = machine_subparsers.add_parser( - "remove", - parents=[parent_config_parser, parent_parser], - description=append_doc_link(machine_REMOVE_HELP, "machine/remove"), - help=machine_REMOVE_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_remove_parser.add_argument("name", help="Name of the machine to remove.") - machine_remove_parser.set_defaults(func=CmdMachineRemove) - - machine_CREATE_HELP = "Create and start a machine instance." - machine_create_parser = machine_subparsers.add_parser( - "create", - parents=[parent_parser], - description=append_doc_link(machine_CREATE_HELP, "machine/create"), - help=machine_CREATE_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_create_parser.add_argument("name", help="Name of the machine to create.") - machine_create_parser.set_defaults(func=CmdMachineCreate) - - machine_STATUS_HELP = "List the status of running instances for one/all machines." - machine_status_parser = machine_subparsers.add_parser( - "status", - parents=[parent_parser], - description=append_doc_link(machine_STATUS_HELP, "machine/status"), - help=machine_STATUS_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_status_parser.add_argument( - "name", nargs="?", help="(optional) Name of the machine." - ) - machine_status_parser.set_defaults(func=CmdMachineStatus) - - machine_DESTROY_HELP = "Destroy an machine instance." - machine_destroy_parser = machine_subparsers.add_parser( - "destroy", - parents=[parent_parser], - description=append_doc_link(machine_DESTROY_HELP, "machine/destroy"), - help=machine_DESTROY_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_destroy_parser.add_argument( - "name", help="Name of the machine instance to destroy." - ) - machine_destroy_parser.set_defaults(func=CmdMachineDestroy) - - machine_SSH_HELP = "Connect to a machine via SSH." - machine_ssh_parser = machine_subparsers.add_parser( - "ssh", - parents=[parent_parser], - description=append_doc_link(machine_SSH_HELP, "machine/ssh"), - help=machine_SSH_HELP, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - machine_ssh_parser.add_argument( - "name", help="Name of the machine instance to connect to." - ) - machine_ssh_parser.set_defaults(func=CmdMachineSsh) diff --git a/dvc/machine/__init__.py b/dvc/machine/__init__.py deleted file mode 100644 index 4e2a1d3055..0000000000 --- a/dvc/machine/__init__.py +++ /dev/null @@ -1,227 +0,0 @@ -import logging -import os -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Iterable, - Iterator, - Mapping, - Optional, - Tuple, - Type, -) - -from dvc.exceptions import DvcException - -if TYPE_CHECKING: - from dvc.repo import Repo - from dvc.types import StrPath - - from .backend.base import BaseMachineBackend - - BackendCls = Type[BaseMachineBackend] - -logger = logging.getLogger(__name__) - - -RESERVED_NAMES = {"local", "localhost"} - -DEFAULT_STARTUP_SCRIPT = """#!/bin/bash -sudo apt-get update -sudo apt-get install --yes python3 python3-pip -python3 -m pip install --upgrade pip - -pushd /etc/apt/sources.list.d -sudo wget https://dvc.org/deb/dvc.list -sudo apt-get update -sudo apt-get install --yes dvc -popd - -sudo echo "OK" > /var/log/dvc-machine-init.log -""" - - -def validate_name(name: str): - from dvc.exceptions import InvalidArgumentError - - name = name.lower() - if name in RESERVED_NAMES: - raise InvalidArgumentError( - f"Machine name '{name}' is reserved for internal DVC use." - ) - - -class MachineBackends(Mapping): - try: - from .backend.terraform import TerraformBackend - except ImportError: - TerraformBackend = None # type: ignore[assignment, misc] - - DEFAULT: Dict[str, Optional["BackendCls"]] = { - "terraform": TerraformBackend, - } - - def __getitem__(self, key: str) -> "BaseMachineBackend": - """Lazily initialize backends and cache it afterwards""" - initialized = self.initialized.get(key) - if not initialized: - backend = self.backends[key] - initialized = backend(os.path.join(self.tmp_dir, key), **self.kwargs) - self.initialized[key] = initialized - return initialized - - def __init__( - self, - selected: Optional[Iterable[str]], - tmp_dir: "StrPath", - **kwargs, - ) -> None: - selected = selected or list(self.DEFAULT) - self.backends: Dict[str, "BackendCls"] = {} - for key in selected: - cls = self.DEFAULT.get(key) - if cls is None: - raise DvcException( - f"'dvc machine' backend '{key}' is missing required " - "dependencies. Install them with:\n" - f"\tpip install dvc[{key}]" - ) - self.backends[key] = cls - - self.initialized: Dict[str, "BaseMachineBackend"] = {} - - self.tmp_dir = tmp_dir - self.kwargs = kwargs - - def __iter__(self): - return iter(self.backends) - - def __len__(self) -> int: - return len(self.backends) - - def close_initialized(self) -> None: - for backend in self.initialized.values(): - backend.close() - - -class MachineManager: - """Class that manages dvc cloud machines. - - Args: - repo (dvc.repo.Repo): repo instance that belongs to the repo that - we are working on. - - Raises: - config.ConfigError: thrown when config has invalid format. - """ - - CLOUD_BACKENDS = { - "aws": "terraform", - "azure": "terraform", - } - - def __init__( - self, repo: "Repo", backends: Optional[Iterable[str]] = None, **kwargs - ): - self.repo = repo - assert self.repo.tmp_dir - tmp_dir = os.path.join(self.repo.tmp_dir, "machine") - self.backends = MachineBackends(backends, tmp_dir=tmp_dir, **kwargs) - - def get_config_and_backend( - self, - name: Optional[str] = None, - ) -> Tuple[dict, "BaseMachineBackend"]: - from dvc.config import NoMachineError - - if not name: - name = self.repo.config["core"].get("machine") - - if name: - config = self._get_config(name=name) - backend = self._get_backend(config["cloud"]) - return config, backend - - if bool(self.repo.config["machine"]): - error_msg = ( - "no machine specified. Setup default machine with\n" - " dvc machine default \n" - ) - else: - error_msg = ( - "no machine specified. Create a default machine with\n" - " dvc machine add -d " - ) - - raise NoMachineError(error_msg) - - def _get_config(self, **kwargs): - config = self.repo.config - name = kwargs.get("name") - if name: - try: - conf = config["machine"][name.lower()] - conf["name"] = name - except KeyError: - from dvc.config import MachineNotFoundError - - raise MachineNotFoundError( # noqa: B904 - f"machine '{name}' doesn't exist" - ) - else: - conf = kwargs - return conf - - def _get_backend(self, cloud: str) -> "BaseMachineBackend": - from dvc.config import NoMachineError - - try: - backend = self.CLOUD_BACKENDS[cloud] - return self.backends[backend] - except KeyError: - raise NoMachineError( # noqa: B904 - f"Machine platform '{cloud}' unsupported" - ) - - def create(self, name: Optional[str]): - """Create and start the specified machine instance.""" - config, backend = self.get_config_and_backend(name) - if "startup_script" in config: - with open(config["startup_script"], encoding="utf-8") as fobj: - startup_script = fobj.read() - else: - startup_script = DEFAULT_STARTUP_SCRIPT - config["startup_script"] = startup_script - config.pop("setup_script", None) - return backend.create(**config) - - def destroy(self, name: Optional[str]): - """Destroy the specified machine instance.""" - config, backend = self.get_config_and_backend(name) - return backend.destroy(**config) - - def get_sshfs(self, name: Optional[str]): - config, backend = self.get_config_and_backend(name) - return backend.get_sshfs(**config) - - def run_shell(self, name: Optional[str]): - config, backend = self.get_config_and_backend(name) - return backend.run_shell(**config) - - def status(self, name: str) -> Iterator[Dict[Any, Any]]: - config, backend = self.get_config_and_backend(name) - return backend.instances(**config) - - def rename(self, name: str, new: str): - """move configuration to a new location if the machine is running.""" - config, backend = self.get_config_and_backend(name) - return backend.rename(new=new, **config) - - def get_executor_kwargs(self, name: Optional[str]): - config, backend = self.get_config_and_backend(name) - return backend.get_executor_kwargs(**config) - - def get_setup_script(self, name: Optional[str]) -> Optional[str]: - config, _backend = self.get_config_and_backend(name) - return config.get("setup_script") diff --git a/dvc/machine/backend/__init__.py b/dvc/machine/backend/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dvc/machine/backend/base.py b/dvc/machine/backend/base.py deleted file mode 100644 index f72425cc66..0000000000 --- a/dvc/machine/backend/base.py +++ /dev/null @@ -1,50 +0,0 @@ -from abc import ABC, abstractmethod -from contextlib import contextmanager -from typing import TYPE_CHECKING, Iterator, Optional - -if TYPE_CHECKING: - from dvc_ssh import SSHFileSystem - - from dvc.types import StrPath - - -class BaseMachineBackend(ABC): - def __init__(self, tmp_dir: "StrPath", **kwargs): - self.tmp_dir = tmp_dir - - @abstractmethod - def create(self, name: Optional[str] = None, **config): - """Create and start an instance of the specified machine.""" - - @abstractmethod - def destroy(self, name: Optional[str] = None, **config): - """Stop and destroy all instances of the specified machine.""" - - @abstractmethod - def instances(self, name: Optional[str] = None, **config) -> Iterator[dict]: - """Iterate over status of all instances of the specified machine.""" - - def close(self): # noqa: B027 - pass - - @abstractmethod - def run_shell(self, name: Optional[str] = None, **config): - """Spawn an interactive SSH shell for the specified machine.""" - - @abstractmethod - def get_executor_kwargs(self, name: str, **config) -> dict: - """Return SSHExecutor kwargs which can be used for DVC - experiment/pipeline execution on the specified machine. - """ - - @abstractmethod - @contextmanager - def get_sshfs( - self, name: Optional[str] = None, **config - ) -> Iterator["SSHFileSystem"]: - """Return an sshfs instance for the default directory on the - specified machine.""" - - @abstractmethod - def rename(self, name: str, new: str, **config): - """Rename a machine instance.""" diff --git a/dvc/machine/backend/terraform.py b/dvc/machine/backend/terraform.py deleted file mode 100644 index 9d00946b9c..0000000000 --- a/dvc/machine/backend/terraform.py +++ /dev/null @@ -1,99 +0,0 @@ -import os -from contextlib import contextmanager -from functools import partial, partialmethod -from typing import TYPE_CHECKING, Iterator, Optional - -from dvc_ssh import DEFAULT_PORT, SSHFileSystem - -from dvc.exceptions import DvcException - -from .base import BaseMachineBackend - -if TYPE_CHECKING: - from dvc.types import StrPath - - -@contextmanager -def _sshfs(resource: dict): - from tpi import TerraformProviderIterative - - with TerraformProviderIterative.pemfile(resource) as pem: - fs = SSHFileSystem( - host=resource["instance_ip"], - user="ubuntu", - keyfile=pem, - ) - yield fs - - -class TerraformBackend(BaseMachineBackend): - def __init__(self, tmp_dir: "StrPath", **kwargs): - super().__init__(tmp_dir, **kwargs) - os.makedirs(tmp_dir, exist_ok=True) - - @contextmanager - def make_tpi(self, name: str): - from tpi import TPIError - from tpi.terraform import TerraformBackend as TPIBackend - - try: - working_dir = os.path.join(self.tmp_dir, name) - os.makedirs(working_dir, exist_ok=True) - yield TPIBackend(working_dir=working_dir) - except TPIError as exc: - raise DvcException("TPI operation failed") from exc - - def _tpi_func(self, fname, name: Optional[str] = None, **config): - from tpi import TPIError - - assert name - with self.make_tpi(name) as tpi: - func = getattr(tpi, fname) - try: - return func(name=name, **config) - except TPIError as exc: - raise DvcException(f"TPI {fname} failed") from exc - - create = partialmethod(_tpi_func, "create") # type: ignore[assignment] - destroy = partialmethod(_tpi_func, "destroy") # type: ignore[assignment] - instances = partialmethod(_tpi_func, "instances") # type: ignore[assignment] - run_shell = partialmethod(_tpi_func, "run_shell") # type: ignore[assignment] - - def get_executor_kwargs(self, name: str, **config) -> dict: - with self.make_tpi(name) as tpi: - resource = tpi.default_resource(name) - return { - "host": resource["instance_ip"], - "port": DEFAULT_PORT, - "username": "ubuntu", - "fs_factory": partial(_sshfs, dict(resource)), - } - - @contextmanager - def get_sshfs( # pylint: disable=unused-argument - self, name: Optional[str] = None, **config - ) -> Iterator["SSHFileSystem"]: - assert name - with self.make_tpi(name) as tpi: - resource = tpi.default_resource(name) - with _sshfs(resource) as fs: - yield fs - - def rename(self, name: str, new: str, **config): - """rename a dvc machine instance to a new name""" - import shutil - - mtype = "iterative_machine" - - new_dir = os.path.join(self.tmp_dir, new) - old_dir = os.path.join(self.tmp_dir, name) - if os.path.exists(new_dir): - raise DvcException(f"rename failed: path {new_dir} already exists") - - if not os.path.exists(old_dir): - return - - with self.make_tpi(name) as tpi: - tpi.state_mv(f"{mtype}.{name}", f"{mtype}.{new}", **config) - - shutil.move(old_dir, new_dir) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 9aee521179..fbdf982018 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -15,7 +15,6 @@ from dvc.exceptions import NotDvcRepoError, OutputNotFoundError from dvc.ignore import DvcIgnoreFilter -from dvc.utils import env2bool from dvc.utils.fs import path_isin from dvc.utils.objects import cached_property @@ -24,7 +23,6 @@ from dvc.fs.data import DataFileSystem from dvc.fs.dvc import DVCFileSystem from dvc.lock import LockBase - from dvc.machine import MachineManager from dvc.scm import Git, NoSCM from dvc.stage import Stage from dvc.types import DictStrAny @@ -352,16 +350,6 @@ def experiments(self) -> "Experiments": return Experiments(self) - @cached_property - def machine(self) -> Optional["MachineManager"]: - from dvc.machine import MachineManager - - if self.tmp_dir and ( - self.config["feature"].get("machine", False) or env2bool("DVC_TEST") - ): - return MachineManager(self) - return None - @property def fs(self) -> "FileSystem": return self._fs diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index 2e138922e1..c77f04e08a 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -136,10 +136,6 @@ def queue_one( **kwargs, ) -> "QueueEntry": """Queue a single experiment.""" - if kwargs.pop("machine", None) is not None: - # TODO: decide how to handle queued remote execution - raise NotImplementedError - return self.new( queue, **kwargs, diff --git a/dvc/repo/experiments/executor/ssh.py b/dvc/repo/experiments/executor/ssh.py deleted file mode 100644 index 6ec153238b..0000000000 --- a/dvc/repo/experiments/executor/ssh.py +++ /dev/null @@ -1,292 +0,0 @@ -import logging -import os -import posixpath -import sys -from contextlib import contextmanager -from typing import TYPE_CHECKING, Callable, Iterable, List, Optional - -from dvc_ssh import SSHFileSystem -from funcy import first - -from dvc.repo.experiments.refs import ( - EXEC_BASELINE, - EXEC_BRANCH, - EXEC_HEAD, - EXEC_MERGE, - EXEC_NAMESPACE, -) - -from .base import BaseExecutor, ExecutorInfo, ExecutorResult, TaskStatus - -if TYPE_CHECKING: - from queue import Queue - - from dvc.repo import Repo - from dvc.repo.experiments.refs import ExpRefInfo - from dvc.repo.experiments.stash import ExpStashEntry - from dvc.scm import Git - -logger = logging.getLogger(__name__) - - -@contextmanager -def _sshfs(fs_factory, **kwargs): - if fs_factory: - with fs_factory() as fs: - yield fs - return - yield SSHFileSystem(**kwargs) - - -class SSHExecutor(BaseExecutor): - """SSH experiment executor.""" - - WARN_UNTRACKED = True - QUIET = True - SETUP_SCRIPT_FILENAME = "exec-setup.sh" - - def __init__( - self, - *args, - host: Optional[str] = None, - port: Optional[int] = None, - username: Optional[str] = None, - fs_factory: Optional[Callable] = None, - setup_script: Optional[str] = None, - **kwargs, - ): - assert host - - super().__init__(*args, **kwargs) - self.host: str = host - self.port = port - self.username = username - self._fs_factory = fs_factory - self._repo_abspath = "" - self._setup_script = setup_script - - @classmethod - def gen_dirname(cls, name: Optional[str] = None): - from shortuuid import uuid - - return "-".join([name or "dvc-exp", "executor", uuid()]) - - @classmethod - def from_stash_entry( - cls, - repo: "Repo", - entry: "ExpStashEntry", - **kwargs, - ): - machine_name: Optional[str] = kwargs.pop("machine_name", None) - assert repo.machine - executor = cls._from_stash_entry( - repo, - entry, - cls.gen_dirname(entry.name), - location=machine_name, - **repo.machine.get_executor_kwargs(machine_name), - setup_script=repo.machine.get_setup_script(machine_name), - ) - logger.debug("Init SSH executor for host '%s'", executor.host) - return executor - - def sshfs(self): - return _sshfs(self._fs_factory, host=self.host, port=self.port) - - @property - def git_url(self) -> str: - user = f"{self.username}@" if self.username else "" - port = f":{self.port}" if self.port is not None else "" - path = f"{self.root_dir}" if self.root_dir else "" - if path and not posixpath.isabs(path): - path = f"/~/{path}" - return f"ssh://{user}{self.host}{port}{path}" - - @property - def abs_url(self) -> str: - assert self._repo_abspath - user = f"{self.username}@" if self.username else "" - port = f":{self.port}" if self.port is not None else "" - return f"ssh://{user}{self.host}{port}{self._repo_abspath}" - - @staticmethod - def _git_client_args(fs): - return { - "password": fs.fs_args.get("password"), - "key_filename": first(fs.fs_args.get("client_keys", [])), - } - - def init_git( - self, - repo: "Repo", # noqa: ARG002 - scm: "Git", - stash_rev: str, - entry: "ExpStashEntry", - infofile: Optional[str], - branch: Optional[str] = None, - ): - from dvc.repo.experiments.utils import push_refspec - - self.status = TaskStatus.PREPARING - if infofile: - self.info.dump_json(infofile) - - with self.sshfs() as fs: - fs.makedirs(self.root_dir) - self._ssh_cmd(fs, "git init .") - self._ssh_cmd(fs, "git config user.name dvc-exp") - self._ssh_cmd(fs, "git config user.email dvc-exp@noreply.localhost") - - result = self._ssh_cmd(fs, "pwd") - path = result.stdout.strip() - self._repo_abspath = path - - # TODO: support multiple client key retries in git backends - # (see https://github.com/iterative/dvc/issues/6508) - kwargs = self._git_client_args(fs) - - ref_dict = { - EXEC_HEAD: entry.head_rev, - EXEC_MERGE: stash_rev, - EXEC_BASELINE: entry.baseline_rev, - } - with self.set_temp_refs(scm, ref_dict): - exec_namespace = f"{EXEC_NAMESPACE}/" - refspec = [(exec_namespace, exec_namespace)] - push_refspec(scm, self.git_url, refspec, **kwargs) - - if branch: - push_refspec(scm, self.git_url, [(branch, branch)], **kwargs) - self._ssh_cmd(fs, f"git symbolic-ref {EXEC_BRANCH} {branch}") - else: - self._ssh_cmd(fs, f"git symbolic-ref -d {EXEC_BRANCH}", check=False) - - # checkout EXEC_HEAD and apply EXEC_MERGE on top of it without - # committing - head = EXEC_BRANCH if branch else EXEC_HEAD - self._ssh_cmd(fs, f"git checkout {head}") - merge_rev = scm.get_ref(EXEC_MERGE) - self._ssh_cmd(fs, f"git stash apply {merge_rev}") - - if self._setup_script: - self._init_setup_script(fs) - - @classmethod - def _setup_script_path(cls, dvc_dir: str): - return posixpath.join( - dvc_dir, - "tmp", - cls.SETUP_SCRIPT_FILENAME, - ) - - def _init_setup_script(self, fs: "SSHFileSystem"): - assert self._repo_abspath - script_path = self._setup_script_path( - posixpath.join(self._repo_abspath, self.dvc_dir) - ) - assert self._setup_script - fs.put_file(self._setup_script, script_path) - - def _ssh_cmd(self, sshfs, cmd, chdir=None, **kwargs): - working_dir = chdir or self.root_dir - return sshfs.fs.execute(f"cd {working_dir};{cmd}", **kwargs) - - def init_cache(self, repo: "Repo", rev: str, run_cache: bool = True): - from dvc.repo.push import push - - with self.get_odb() as odb: - push( - repo, - revs=[rev], - run_cache=run_cache, - odb=odb, - include_imports=True, - ) - - def collect_cache( - self, repo: "Repo", exp_ref: "ExpRefInfo", run_cache: bool = True - ): - """Collect DVC cache.""" - from dvc.repo.experiments.pull import _pull_cache - - _pull_cache(repo, exp_ref, run_cache=run_cache) - - @contextmanager - def get_odb(self): - from dvc.cachemgr import CacheManager, get_odb - - cache_path = posixpath.join( - self._repo_abspath, - self.dvc_dir, - CacheManager.CACHE_DIR, - ) - - with self.sshfs() as fs: - yield get_odb(fs, cache_path, **fs.config) - - def fetch_exps(self, *args, **kwargs) -> Iterable[str]: - with self.sshfs() as fs: - kwargs.update(self._git_client_args(fs)) - return super().fetch_exps(*args, **kwargs) - - @classmethod - def reproduce( - cls, - info: "ExecutorInfo", - rev: str, # noqa: ARG003 - queue: Optional["Queue"] = None, # noqa: ARG003 - infofile: Optional[str] = None, - log_errors: bool = True, - log_level: Optional[int] = None, - copy_paths: Optional[List[str]] = None, # noqa: ARG003 - message: Optional[str] = None, # noqa: ARG003 - **kwargs, - ) -> "ExecutorResult": - """Reproduce an experiment on a remote machine over SSH. - - Internally uses 'dvc exp exec-run' over SSH. - """ - import json - import time - from tempfile import TemporaryFile - - from asyncssh import ProcessError - - fs_factory: Optional[Callable] = kwargs.pop("fs_factory", None) - if log_errors and log_level is not None: - cls._set_log_level(log_level) - - with _sshfs(fs_factory) as fs: - while not fs.exists("/var/log/dvc-machine-init.log"): - logger.info("Waiting for dvc-machine startup script to complete...") - time.sleep(5) - logger.info("Reproducing experiment on '%s'", fs.fs_args.get("host")) - with TemporaryFile(mode="w+", encoding="utf-8") as fobj: - json.dump(info.asdict(), fobj) - fobj.seek(0) - fs.put_file(fobj, infofile) - cmd = ["source ~/.profile"] - script_path = cls._setup_script_path(info.dvc_dir) - if fs.exists(posixpath.join(info.root_dir, script_path)): - cmd.extend([f"pushd {info.root_dir}", f"source {script_path}", "popd"]) - exec_cmd = f"dvc exp exec-run --infofile {infofile}" - if log_level is not None: - if log_level <= logging.TRACE: # type: ignore[attr-defined] - exec_cmd += " -vv" - elif log_level <= logging.DEBUG: - exec_cmd += " -v" - cmd.append(exec_cmd) - try: - sys.stdout.flush() - sys.stderr.flush() - stdout = os.dup(sys.stdout.fileno()) - stderr = os.dup(sys.stderr.fileno()) - fs.fs.execute("; ".join(cmd), stdout=stdout, stderr=stderr) - with fs.open(infofile) as fobj: - result_info = ExecutorInfo.from_dict(json.load(fobj)) - if result_info.result_hash: - return result_info.result - except ProcessError: - pass - return ExecutorResult(None, None, False) diff --git a/pyproject.toml b/pyproject.toml index 325261eff3..da27692843 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ dependencies = [ [project.optional-dependencies] all = ["dvc[azure,gdrive,gs,hdfs,oss,s3,ssh,webdav,webhdfs]"] azure = ["dvc-azure>=2.21.2"] -dev = ["dvc[azure,gdrive,gs,hdfs,lint,oss,s3,ssh,terraform,tests,webdav,webhdfs]"] +dev = ["dvc[azure,gdrive,gs,hdfs,lint,oss,s3,ssh,tests,webdav,webhdfs]"] gdrive = ["dvc-gdrive==2.20"] gs = ["dvc-gs==2.22.1"] hdfs = ["dvc-hdfs==2.19"] @@ -89,7 +89,6 @@ oss = ["dvc-oss==2.19"] s3 = ["dvc-s3==2.23.0"] ssh = ["dvc-ssh>=2.22.1,<3"] ssh_gssapi = ["dvc-ssh[gssapi]>=2.22.1,<3"] -terraform = ["tpi[ssh]>=2.1"] testing = [ "pytest-test-utils", "pytest-benchmark[histogram]", diff --git a/tests/func/experiments/executor/test_ssh.py b/tests/func/experiments/executor/test_ssh.py deleted file mode 100644 index 7f4ecdd880..0000000000 --- a/tests/func/experiments/executor/test_ssh.py +++ /dev/null @@ -1,162 +0,0 @@ -import posixpath -from contextlib import contextmanager -from functools import partial -from urllib.parse import urlparse - -import pytest -from dvc_ssh import SSHFileSystem -from dvc_ssh.tests.cloud import TEST_SSH_KEY_PATH, TEST_SSH_USER - -from dvc.repo.experiments.executor.base import ExecutorInfo, ExecutorResult -from dvc.repo.experiments.executor.ssh import SSHExecutor -from dvc.repo.experiments.refs import EXEC_HEAD, EXEC_MERGE - -# pylint: disable-next=wildcard-import -from tests.func.machine.conftest import * # noqa: F403 - - -@contextmanager -def _ssh_factory(cloud): - yield SSHFileSystem( - host=cloud.host, - port=cloud.port, - user=TEST_SSH_USER, - keyfile=TEST_SSH_KEY_PATH, - ) - - -def test_init_from_stash(tmp_dir, scm, dvc, machine_instance, mocker): - mock = mocker.patch.object(SSHExecutor, "_from_stash_entry") - mock_entry = mocker.Mock() - mock_entry.name = "" - SSHExecutor.from_stash_entry( - dvc, - mock_entry, - machine_name="foo", - ) - _args, kwargs = mock.call_args - assert kwargs["host"] == machine_instance["instance_ip"] - - -@pytest.mark.needs_internet -@pytest.mark.parametrize("cloud", [pytest.lazy_fixture("git_ssh")]) -def test_init_git(tmp_dir, dvc, scm, cloud, mocker): - tmp_dir.scm_gen({"foo": "foo", "dir": {"bar": "bar"}}, commit="init") - baseline_rev = scm.get_rev() - tmp_dir.gen("foo", "stashed") - scm.gitpython.git.stash() - rev = scm.resolve_rev("stash@{0}") - - mock = mocker.Mock(baseline_rev=baseline_rev, head_rev=baseline_rev) - - root_url = cloud / SSHExecutor.gen_dirname() - - executor = SSHExecutor( - root_dir=root_url.path, - dvc_dir=".dvc", - baseline_rev=baseline_rev, - host=root_url.host, - port=root_url.port, - username=TEST_SSH_USER, - fs_factory=partial(_ssh_factory, cloud), - ) - infofile = str((root_url / "foo.run").path) - executor.init_git(dvc, scm, rev, mock, infofile=infofile) - assert root_url.path == executor._repo_abspath - - fs = cloud._ssh - assert fs.exists(posixpath.join(executor._repo_abspath, "foo")) - assert fs.exists(posixpath.join(executor._repo_abspath, "dir")) - assert fs.exists(posixpath.join(executor._repo_abspath, "dir", "bar")) - - -@pytest.mark.needs_internet -@pytest.mark.parametrize("cloud", [pytest.lazy_fixture("git_ssh")]) -def test_init_cache(tmp_dir, dvc, scm, cloud): - foo = tmp_dir.dvc_gen("foo", "foo", commit="init")[0].outs[0] - rev = scm.get_rev() - scm.set_ref(EXEC_HEAD, rev) - scm.set_ref(EXEC_MERGE, rev) - root_url = cloud / SSHExecutor.gen_dirname() - - executor = SSHExecutor( - root_dir=root_url.path, - dvc_dir=".dvc", - baseline_rev=rev, - host=root_url.host, - port=root_url.port, - username=TEST_SSH_USER, - fs_factory=partial(_ssh_factory, cloud), - ) - executor.init_cache(dvc, rev) - - fs = cloud._ssh - foo_hash = foo.hash_info.value - assert fs.exists( - posixpath.join( - executor._repo_abspath, ".dvc", "cache", foo_hash[:2], foo_hash[2:] - ) - ) - - -@pytest.mark.needs_internet -@pytest.mark.parametrize("cloud", [pytest.lazy_fixture("git_ssh")]) -def test_reproduce(tmp_dir, scm, dvc, cloud, exp_stage, mocker): - from sshfs import SSHFileSystem as _sshfs # noqa: N813 - - rev = scm.get_rev() - root_url = cloud / SSHExecutor.gen_dirname() - mocker.patch.object(SSHFileSystem, "exists", return_value=True) - mock_execute = mocker.patch.object(_sshfs, "execute") - info = ExecutorInfo( - str(root_url), - rev, - "machine-foo", - str(root_url.path), - ".dvc", - ) - infofile = str((root_url / "foo.run").path) - SSHExecutor.reproduce( - info, - rev, - fs_factory=partial(_ssh_factory, cloud), - ) - mock_execute.assert_called_once() - _name, args, _kwargs = mock_execute.mock_calls[0] - assert f"dvc exp exec-run --infofile {infofile}" in args[0] - - -@pytest.mark.needs_internet -@pytest.mark.parametrize("cloud", [pytest.lazy_fixture("git_ssh")]) -def test_run_machine(tmp_dir, scm, dvc, cloud, exp_stage, mocker): - baseline = scm.get_rev() - factory = partial(_ssh_factory, cloud) - mocker.patch.object( - dvc.machine, - "get_executor_kwargs", - return_value={ - "host": cloud.host, - "port": cloud.port, - "username": TEST_SSH_USER, - "fs_factory": factory, - }, - ) - mocker.patch.object(dvc.machine, "get_setup_script", return_value=None) - mock_repro = mocker.patch.object( - SSHExecutor, - "reproduce", - return_value=ExecutorResult("abc123", None, False), - ) - - tmp_dir.gen("params.yaml", "foo: 2") - dvc.experiments.run(exp_stage.addressing, machine="foo") - mock_repro.assert_called_once() - _name, _args, kwargs = mock_repro.mock_calls[0] - info = kwargs["info"] - url = urlparse(info.git_url) - assert url.scheme == "ssh" - assert url.hostname == cloud.host - assert url.port == cloud.port - assert info.baseline_rev == baseline - assert kwargs["infofile"] is not None - assert kwargs["fs_factory"] is not None diff --git a/tests/func/machine/__init__.py b/tests/func/machine/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/func/machine/conftest.py b/tests/func/machine/conftest.py deleted file mode 100644 index 011936c7a4..0000000000 --- a/tests/func/machine/conftest.py +++ /dev/null @@ -1,55 +0,0 @@ -import textwrap - -import pytest - -BASIC_CONFIG = textwrap.dedent( - """\ - [feature] - machine = true - ['machine "foo"'] - cloud = aws - """ -) - -TEST_INSTANCE = { - "aws_security_group": None, - "cloud": "aws", - "id": "iterative-2jyhw8j9ieov6", - "image": "ubuntu-bionic-18.04-amd64-server-20210818", - "instance_gpu": None, - "instance_hdd_size": 35, - "instance_ip": "123.123.123.123", - "instance_launch_time": "2021-08-25T07:13:03Z", - "instance_type": "m", - "name": "test-resource", - "region": "us-west", - "spot": False, - "spot_price": -1, - "ssh_name": None, - "ssh_private": "-----BEGIN RSA PRIVATE KEY-----\\n", - "startup_script": "IyEvYmluL2Jhc2g=", - "timeouts": None, -} - - -@pytest.fixture -def machine_config(tmp_dir): - (tmp_dir / ".dvc" / "config").write_text(BASIC_CONFIG) - return BASIC_CONFIG - - -@pytest.fixture -def machine_instance(tmp_dir, dvc, mocker): - with dvc.config.edit() as conf: - conf["machine"]["foo"] = {"cloud": "aws"} - - def mock_instances(name=None, **kwargs): - if name == "foo": - return iter([TEST_INSTANCE]) - return iter([]) - - mocker.patch( - "tpi.terraform.TerraformBackend.instances", - mocker.MagicMock(side_effect=mock_instances), - ) - return TEST_INSTANCE diff --git a/tests/func/machine/test_machine_config.py b/tests/func/machine/test_machine_config.py deleted file mode 100644 index 3202b50176..0000000000 --- a/tests/func/machine/test_machine_config.py +++ /dev/null @@ -1,163 +0,0 @@ -import os -import textwrap - -import pytest - -from dvc.cli import main -from dvc.ui import ui -from tests.utils import console_width - -from .conftest import BASIC_CONFIG - - -@pytest.mark.parametrize( - "slot,value", - [ - ("region", "us-west"), - ("image", "iterative-cml"), - ("spot", "True"), - ("spot_price", "1.2345"), - ("spot_price", "12345"), - ("instance_hdd_size", "10"), - ("instance_type", "l"), - ("instance_gpu", "tesla"), - ("ssh_private", "secret"), - ], -) -def test_machine_modify_susccess(tmp_dir, dvc, machine_config, slot, value): - assert main(["machine", "modify", "foo", slot, value]) == 0 - assert ( - tmp_dir / ".dvc" / "config" - ).read_text() == machine_config + f" {slot} = {value}\n" - assert main(["machine", "modify", "--unset", "foo", slot]) == 0 - assert (tmp_dir / ".dvc" / "config").read_text() == machine_config - - -def test_machine_modify_startup_script(tmp_dir, dvc, machine_config): - slot, value = "startup_script", "start.sh" - assert main(["machine", "modify", "foo", slot, value]) == 0 - assert ( - tmp_dir / ".dvc" / "config" - ).read_text() == machine_config + f" {slot} = ../{value}\n" - assert main(["machine", "modify", "--unset", "foo", slot]) == 0 - assert (tmp_dir / ".dvc" / "config").read_text() == machine_config - - -@pytest.mark.parametrize( - "slot,value,msg", - [ - ( - "region", - "other-west", - "expected one of us-west, us-east, eu-west, eu-north", - ), - ("spot_price", "NUM", "expected float"), - ("instance_hdd_size", "BIG", "expected int"), - ], -) -def test_machine_modify_fail(tmp_dir, dvc, machine_config, caplog, slot, value, msg): - assert main(["machine", "modify", "foo", slot, value]) == 251 - assert (tmp_dir / ".dvc" / "config").read_text() == machine_config - assert msg in caplog.text - - -FULL_CONFIG_TEXT = textwrap.dedent( - """\ - [feature] - machine = true - ['machine \"bar\"'] - cloud = azure - ['machine \"foo\"'] - cloud = aws - region = us-west - image = iterative-cml - spot = True - spot_price = 1.2345 - instance_hdd_size = 10 - instance_type = l - instance_gpu = tesla - ssh_private = secret - startup_script = {} - """.format( - os.path.join("..", "start.sh") - ) -) - - -def test_machine_list(tmp_dir, dvc, capsys): - from dvc.commands.machine import CmdMachineList - - (tmp_dir / ".dvc" / "config").write_text(FULL_CONFIG_TEXT) - - with console_width(ui.rich_console, 255): - assert main(["machine", "list"]) == 0 - out, _ = capsys.readouterr() - for key in CmdMachineList.TABLE_COLUMNS: - assert f"{key}" in out - assert "bar azure - -" in out - assert "foo aws us-west iterative-cml True 1.2345" in out - assert "10 l *** ***" in out - assert "tesla" in out - - with console_width(ui.rich_console, 255): - assert main(["machine", "list", "bar"]) == 0 - out, _ = capsys.readouterr() - assert "foo" not in out - assert "name cloud" in out - assert "bar azure" in out - - -def test_machine_rename_success(tmp_dir, scm, dvc, machine_config, capsys, mocker): - import tpi - - config_file = tmp_dir / ".dvc" / "config" - - mocker.patch.object( - tpi.terraform.TerraformBackend, - "state_mv", - autospec=True, - return_value=True, - ) - - os.makedirs(tmp_dir / ".dvc" / "tmp" / "machine" / "terraform" / "foo") - - assert main(["machine", "rename", "foo", "bar"]) == 0 - cap = capsys.readouterr() - assert "Rename machine 'foo' to 'bar'." in cap.out - assert config_file.read_text() == machine_config.replace("foo", "bar") - assert not (tmp_dir / ".dvc" / "tmp" / "machine" / "terraform" / "foo").exists() - assert (tmp_dir / ".dvc" / "tmp" / "machine" / "terraform" / "bar").exists() - - -def test_machine_rename_none_exist(tmp_dir, scm, dvc, caplog): - config_alice = BASIC_CONFIG.replace("foo", "alice") - config_file = tmp_dir / ".dvc" / "config" - config_file.write_text(config_alice) - assert main(["machine", "rename", "foo", "bar"]) == 251 - assert config_file.read_text() == config_alice - assert "machine 'foo' doesn't exist." in caplog.text - - -def test_machine_rename_exist(tmp_dir, scm, dvc, caplog): - config_bar = BASIC_CONFIG + "['machine \"bar\"']\n cloud = aws" - config_file = tmp_dir / ".dvc" / "config" - config_file.write_text(config_bar) - assert main(["machine", "rename", "foo", "bar"]) == 251 - assert config_file.read_text() == config_bar - assert "Machine 'bar' already exists." in caplog.text - - -def test_machine_rename_error(tmp_dir, scm, dvc, machine_config, caplog, mocker): - import tpi - - config_file = tmp_dir / ".dvc" / "config" - os.makedirs(tmp_dir / ".dvc" / "tmp" / "machine" / "terraform" / "foo") - - def cmd_error(self, source, destination, **kwargs): - raise tpi.TPIError("test error") - - mocker.patch.object(tpi.terraform.TerraformBackend, "state_mv", cmd_error) - - assert main(["machine", "rename", "foo", "bar"]) == 251 - assert config_file.read_text() == machine_config - assert "rename failed" in caplog.text diff --git a/tests/func/machine/test_machine_status.py b/tests/func/machine/test_machine_status.py deleted file mode 100644 index d4980ff0a2..0000000000 --- a/tests/func/machine/test_machine_status.py +++ /dev/null @@ -1,22 +0,0 @@ -from dvc.cli import main -from dvc.ui import ui -from tests.utils import console_width - - -def test_status(tmp_dir, scm, dvc, machine_config, machine_instance, capsys): - assert main(["machine", "add", "bar", "aws"]) == 0 - with console_width(ui.rich_console, 255): - assert main(["machine", "status"]) == 0 - cap = capsys.readouterr() - assert ( - "name instance status cloud instance_ip " - "instance_type instance_hdd_size instance_gpu" in cap.out - ) - assert ( - "bar - offline - - " - "- - -" in cap.out - ) - assert ( - "foo num_1 running aws 123.123.123.123 " - "m 35 None" in cap.out - ) diff --git a/tests/unit/command/test_experiments.py b/tests/unit/command/test_experiments.py index fc587251d8..582b9b45e2 100644 --- a/tests/unit/command/test_experiments.py +++ b/tests/unit/command/test_experiments.py @@ -125,7 +125,6 @@ def test_experiments_run(dvc, scm, mocker): "run_all": False, "jobs": 1, "tmp_dir": False, - "machine": None, "copy_paths": [], "message": None, } diff --git a/tests/unit/command/test_machine.py b/tests/unit/command/test_machine.py deleted file mode 100644 index e37d505ff1..0000000000 --- a/tests/unit/command/test_machine.py +++ /dev/null @@ -1,167 +0,0 @@ -import os - -import configobj -import pytest - -from dvc.cli import parse_args -from dvc.commands.machine import ( - CmdMachineAdd, - CmdMachineCreate, - CmdMachineDestroy, - CmdMachineList, - CmdMachineModify, - CmdMachineRemove, - CmdMachineRename, - CmdMachineSsh, - CmdMachineStatus, -) - -DATA = { - ".dvc": { - "config": ( - "[feature]\n" - " machine = true\n" - "['machine \"foo\"']\n" - " cloud = aws\n" - "['machine \"myaws\"']\n" - " cloud = aws" - ) - } -} - - -def test_add(tmp_dir): - tmp_dir.gen({".dvc": {"config": "[feature]\n machine = true"}}) - cli_args = parse_args(["machine", "add", "foo", "aws"]) - assert cli_args.func == CmdMachineAdd - cmd = cli_args.func(cli_args) - assert cmd.run() == 0 - config = configobj.ConfigObj(str(tmp_dir / ".dvc" / "config")) - assert config['machine "foo"']["cloud"] == "aws" - - -def test_remove(tmp_dir): - tmp_dir.gen(DATA) - cli_args = parse_args(["machine", "remove", "foo"]) - assert cli_args.func == CmdMachineRemove - cmd = cli_args.func(cli_args) - assert cmd.run() == 0 - config = configobj.ConfigObj(str(tmp_dir / ".dvc" / "config")) - assert list(config.keys()) == ["feature", 'machine "myaws"'] - - -def test_create(tmp_dir, dvc, mocker): - cli_args = parse_args(["machine", "create", "foo"]) - assert cli_args.func == CmdMachineCreate - - cmd = cli_args.func(cli_args) - m = mocker.patch.object(cmd.repo.machine, "create", autospec=True, return_value=0) - - assert cmd.run() == 0 - m.assert_called_once_with("foo") - - -def test_status(tmp_dir, scm, dvc, mocker): - tmp_dir.gen(DATA) - cli_args = parse_args(["machine", "status", "foo"]) - assert cli_args.func == CmdMachineStatus - - cmd = cli_args.func(cli_args) - m = mocker.patch.object(cmd.repo.machine, "status", autospec=True, return_value=[]) - assert cmd.run() == 0 - m.assert_called_once_with("foo") - - cli_args = parse_args(["machine", "status"]) - cmd = cli_args.func(cli_args) - m = mocker.patch.object(cmd.repo.machine, "status", autospec=True, return_value=[]) - assert cmd.run() == 0 - assert m.call_count == 2 - m.assert_has_calls([mocker.call("foo"), mocker.call("myaws")]) - - -def test_destroy(tmp_dir, dvc, mocker): - cli_args = parse_args(["machine", "destroy", "foo"]) - assert cli_args.func == CmdMachineDestroy - - cmd = cli_args.func(cli_args) - m = mocker.patch.object(cmd.repo.machine, "destroy", autospec=True, return_value=0) - - assert cmd.run() == 0 - m.assert_called_once_with("foo") - - -def test_ssh(tmp_dir, dvc, mocker): - cli_args = parse_args(["machine", "ssh", "foo"]) - assert cli_args.func == CmdMachineSsh - - cmd = cli_args.func(cli_args) - m = mocker.patch.object( - cmd.repo.machine, "run_shell", autospec=True, return_value=0 - ) - - assert cmd.run() == 0 - m.assert_called_once_with("foo") - - -@pytest.mark.parametrize("show_origin", [["--show-origin"], []]) -def test_list(tmp_dir, mocker, show_origin): - from dvc.compare import TabularData - from dvc.ui import ui - - tmp_dir.gen(DATA) - cli_args = parse_args(["machine", "list", *show_origin, "foo"]) - assert cli_args.func == CmdMachineList - cmd = cli_args.func(cli_args) - if show_origin: - m = mocker.patch.object(ui, "write", autospec=True) - else: - m = mocker.patch.object(TabularData, "render", autospec=True) - assert cmd.run() == 0 - if show_origin: - m.assert_called_once_with(f".dvc{os.sep}config cloud=aws") - else: - m.assert_called_once() - - -def test_modified(tmp_dir): - tmp_dir.gen(DATA) - cli_args = parse_args(["machine", "modify", "foo", "cloud", "azure"]) - assert cli_args.func == CmdMachineModify - cmd = cli_args.func(cli_args) - assert cmd.run() == 0 - config = configobj.ConfigObj(str(tmp_dir / ".dvc" / "config")) - assert config['machine "foo"']["cloud"] == "azure" - - -def test_rename(tmp_dir, scm, dvc): - tmp_dir.gen(DATA) - cli_args = parse_args(["machine", "rename", "foo", "bar"]) - assert cli_args.func == CmdMachineRename - cmd = cli_args.func(cli_args) - assert cmd.run() == 0 - config = configobj.ConfigObj(str(tmp_dir / ".dvc" / "config")) - assert config['machine "bar"']["cloud"] == "aws" - - -@pytest.mark.parametrize( - "cmd,use_config", - [ - ("add", True), - ("default", True), - ("remove", True), - ("list", True), - ("modify", True), - ("create", False), - ("destroy", False), - ("rename", True), - ("status", False), - ("ssh", False), - ], -) -def test_help_message(tmp_dir, scm, dvc, cmd, use_config, capsys): - try: - parse_args(["machine", cmd, "--help"]) - except SystemExit: - pass - cap = capsys.readouterr() - assert ("--global" in cap.out) is use_config diff --git a/tests/unit/machine/__init__.py b/tests/unit/machine/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/unit/machine/test_machine.py b/tests/unit/machine/test_machine.py deleted file mode 100644 index 69e0754b58..0000000000 --- a/tests/unit/machine/test_machine.py +++ /dev/null @@ -1,36 +0,0 @@ -import pytest - - -def test_validate_name(): - from dvc.exceptions import InvalidArgumentError - from dvc.machine import RESERVED_NAMES, validate_name - - for name in RESERVED_NAMES: - with pytest.raises(InvalidArgumentError): - validate_name(name) - - -@pytest.mark.parametrize("cloud", ["aws", "azure"]) -def test_get_config_and_backend(tmp_dir, dvc, cloud): - from dvc.machine.backend.terraform import TerraformBackend - - name = "foo" - with dvc.config.edit() as conf: - conf["machine"][name] = {"cloud": cloud} - config, backend = dvc.machine.get_config_and_backend(name) - assert config == {"cloud": cloud, "name": name} - assert isinstance(backend, TerraformBackend) - - -def test_get_config_and_backend_nonexistent(tmp_dir, dvc): - from dvc.config import MachineNotFoundError - - with pytest.raises(MachineNotFoundError): - dvc.machine.get_config_and_backend("foo") - - -def test_get_config_and_backend_default(tmp_dir, dvc): - from dvc.config import NoMachineError - - with pytest.raises(NoMachineError): - dvc.machine.get_config_and_backend()