From 4ac8a5a38ab57f894b470f2e49b17706caad69fa Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Wed, 1 Mar 2023 11:33:00 +0200 Subject: [PATCH] checkout: use index checkout Needed for https://github.com/iterative/dvc/pull/9424 --- dvc/repo/checkout.py | 122 +++++++++++++++++++++--------------- dvc/repo/imports.py | 2 +- dvc/repo/index.py | 36 +++++++++-- dvc/stage/__init__.py | 5 -- pyproject.toml | 2 +- tests/func/test_checkout.py | 84 +++++++++++++++---------- 6 files changed, 158 insertions(+), 93 deletions(-) diff --git a/dvc/repo/checkout.py b/dvc/repo/checkout.py index 51c3dc5f7a5..b865698abca 100644 --- a/dvc/repo/checkout.py +++ b/dvc/repo/checkout.py @@ -1,16 +1,13 @@ import logging import os -from typing import TYPE_CHECKING, Dict, List, Set +from itertools import chain +from typing import Dict, List from dvc.exceptions import CheckoutError, CheckoutErrorSuggestGit, NoOutputOrStageError from dvc.utils import relpath from . import locked -if TYPE_CHECKING: - from . import Repo - from .stage import StageInfo - logger = logging.getLogger(__name__) @@ -30,37 +27,8 @@ def _remove_unused_links(repo): return ret -def get_all_files_numbers(pairs): - return sum(stage.get_all_files_number(filter_info) for stage, filter_info in pairs) - - -def _collect_pairs( - self: "Repo", targets, with_deps: bool, recursive: bool -) -> Set["StageInfo"]: - from dvc.stage.exceptions import StageFileBadNameError, StageFileDoesNotExistError - - pairs: Set["StageInfo"] = set() - for target in targets: - try: - pairs.update( - self.stage.collect_granular( - target, with_deps=with_deps, recursive=recursive - ) - ) - except ( - StageFileDoesNotExistError, - StageFileBadNameError, - NoOutputOrStageError, - ) as exc: - if not target: - raise - raise CheckoutErrorSuggestGit(target) from exc - - return pairs - - @locked -def checkout( +def checkout( # noqa: C901 self, targets=None, with_deps=False, @@ -70,13 +38,18 @@ def checkout( allow_missing=False, **kwargs, ): + from dvc import prompt from dvc.fs.callbacks import Callback + from dvc.repo.index import build_data_index + from dvc.stage.exceptions import StageFileBadNameError, StageFileDoesNotExistError + from dvc_data.index.checkout import ADD, DELETE, MODIFY + from dvc_data.index.checkout import CheckoutError as IndexCheckoutError + from dvc_data.index.checkout import checkout as icheckout stats: Dict[str, List[str]] = { "added": [], "deleted": [], "modified": [], - "failed": [], } if not targets: targets = [None] @@ -85,28 +58,79 @@ def checkout( if isinstance(targets, str): targets = [targets] - pairs = _collect_pairs(self, targets, with_deps, recursive) - total = get_all_files_numbers(pairs) + def onerror(target, exc): + if target and isinstance( + exc, + ( + StageFileDoesNotExistError, + StageFileBadNameError, + NoOutputOrStageError, + ), + ): + raise CheckoutErrorSuggestGit(target) from exc + raise # pylint: disable=misplaced-bare-raise + + view = self.index.targets_view( + targets, + recursive=recursive, + with_deps=with_deps, + onerror=onerror, + ) + + old = build_data_index(view, self.root_dir, self.fs, compute_hash=True) + new = view.data["repo"] + + total = len(new) with Callback.as_tqdm_callback( unit="file", desc="Checkout", disable=total == 0, ) as cb: - cb.set_size(total) - for stage, filter_info in pairs: - result = stage.checkout( - force=force, - progress_callback=cb, + try: + changes = icheckout( + new, + self.root_dir, + self.fs, + old=old, + callback=cb, + delete=True, + prompt=prompt.confirm, + update_meta=False, relink=relink, - filter_info=filter_info, + force=force, allow_missing=allow_missing, **kwargs, ) - for key, items in result.items(): - stats[key].extend(_fspath_dir(path) for path in items) + except IndexCheckoutError as exc: + raise CheckoutError([], {}) from exc + + def _adapt_path(entry): + ret = _fspath_dir(self.fs.path.join(self.root_dir, *entry.key)) + ret = relpath(ret) + if entry.meta and entry.meta.isdir: + return self.fs.path.join(ret, "") + return ret + + top_keys = {key for key, _ in new.iteritems(shallow=True)} - if stats.get("failed"): - raise CheckoutError(stats["failed"], stats) + stats["added"].extend(_adapt_path(change.new) for change in changes[ADD]) + stats["deleted"].extend(_adapt_path(change.old) for change in changes[DELETE]) + stats["modified"].extend(_adapt_path(change.new) for change in changes[MODIFY]) + + changed_keys = { + change.key for change in chain(changes[ADD], changes[DELETE], changes[MODIFY]) + } + for key in top_keys: + for changed_key in changed_keys: + if len(changed_key) >= len(key) and changed_key[: len(key)] == key: + self.state.save_link(self.fs.path.join(self.root_dir, *key), self.fs) + + failed = [ + entry + for _, entry in new.iteritems() + if not entry.hash_info and not (entry.meta and entry.meta.isdir) + ] + if not allow_missing and failed: + raise CheckoutError([_adapt_path(entry) for entry in failed], stats) - del stats["failed"] return stats diff --git a/dvc/repo/imports.py b/dvc/repo/imports.py index d6f27a90518..87154cca091 100644 --- a/dvc/repo/imports.py +++ b/dvc/repo/imports.py @@ -110,7 +110,7 @@ def save_imports( desc="Downloading imports from source", unit="files", ) as cb: - checkout(data_view, tmpdir, cache.fs, callback=cb, storage="data") + checkout(data_view, tmpdir, cache.fs, callback=cb, storage="remote") md5(data_view) save(data_view, odb=cache, hardlink=True) diff --git a/dvc/repo/index.py b/dvc/repo/index.py index 1a3227c991c..7a9141d804e 100644 --- a/dvc/repo/index.py +++ b/dvc/repo/index.py @@ -113,14 +113,18 @@ def is_out_or_ignored(root, directory): def _load_data_from_outs(index, prefix, outs): - from dvc_data.index import DataIndexEntry + from dvc_data.index import DataIndexEntry, Meta + parents = set() for out in outs: if not out.use_cache: continue ws, key = out.index_key + for key_len in range(1, len(key)): + parents.add((ws, key[:key_len])) + entry = DataIndexEntry( key=key, meta=out.meta, @@ -137,6 +141,8 @@ def _load_data_from_outs(index, prefix, outs): # index.view() work correctly. index[(*prefix, ws, *key)] = entry + for ws, key in parents: + index[(*prefix, ws, *key)] = DataIndexEntry(key=key, meta=Meta(isdir=True), loaded=True) def _load_storage_from_out(storage_map, key, out): from dvc.config import NoRemoteError @@ -163,7 +169,7 @@ def _load_storage_from_out(storage_map, key, out): if out.stage.is_import: dep = out.stage.deps[0] - storage_map.add_data(FileStorage(key, dep.fs, dep.fs_path)) + storage_map.add_remote(FileStorage(key, dep.fs, dep.fs_path)) class Index: @@ -534,6 +540,8 @@ def _data_prefixes(self) -> Dict[str, "_DataPrefixes"]: lambda: _DataPrefixes(set(), set()) ) for out, filter_info in self._filtered_outs: + if not out.use_cache: + continue workspace, key = out.index_key if filter_info and out.fs.path.isin(filter_info, out.fs_path): key = key + out.fs.path.relparts(filter_info, out.fs_path) @@ -550,6 +558,9 @@ def data_keys(self) -> Dict[str, Set["DataIndexKey"]]: ret: Dict[str, Set["DataIndexKey"]] = defaultdict(set) for out, filter_info in self._filtered_outs: + if not out.use_cache: + continue + workspace, key = out.index_key if filter_info and out.fs.path.isin(filter_info, out.fs_path): key = key + out.fs.path.relparts(filter_info, out.fs_path) @@ -579,14 +590,14 @@ def key_filter(workspace: str, key: "DataIndexKey"): return data -def build_data_index( +def build_data_index( # noqa: C901 index: Union["Index", "IndexView"], path: str, fs: "FileSystem", workspace: str = "repo", compute_hash: Optional[bool] = False, ) -> "DataIndex": - from dvc_data.index import DataIndex, DataIndexEntry + from dvc_data.index import DataIndex, DataIndexEntry, Meta from dvc_data.index.build import build_entries, build_entry from dvc_data.index.save import build_tree @@ -595,9 +606,16 @@ def build_data_index( ignore = index.repo.dvcignore data = DataIndex() + parents = set() for key in index.data_keys.get(workspace, set()): out_path = fs.path.join(path, *key) + for key_len in range(1, len(key)): + parents.add(key[:key_len]) + + if not fs.exists(out_path): + continue + try: out_entry = build_entry( out_path, @@ -606,7 +624,8 @@ def build_data_index( state=index.repo.state, ) except FileNotFoundError: - out_entry = DataIndexEntry() + continue +# out_entry = DataIndexEntry() out_entry.key = key data.add(out_entry) @@ -636,4 +655,11 @@ def build_data_index( out_entry.loaded = True data.add(out_entry) + for key in parents: + parent_path = fs.path.join(path, *key) + if not fs.exists(parent_path): + continue + direntry = DataIndexEntry(key=key, meta=Meta(isdir=True), loaded=True) + data.add(direntry) + return data diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 1f003678455..b7e8d6e1083 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -728,11 +728,6 @@ def outs_cached(self) -> bool: for out in self.outs ) - def get_all_files_number(self, filter_info=None) -> int: - return sum( - out.get_files_number(filter_info) for out in self.filter_outs(filter_info) - ) - def get_used_objs( self, *args, **kwargs ) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]: diff --git a/pyproject.toml b/pyproject.toml index ec8ca2be519..ec26ea727f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "configobj>=5.0.6", "distro>=1.3", "dpath<3,>=2.1.0", - "dvc-data>=0.49.1,<0.50", + "dvc-data>=0.50.0,<0.51", "dvc-http>=2.29.0", "dvc-render>=0.3.1,<1", "dvc-studio-client>=0.9.0,<1", diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index d308b633fc3..ce4e6f0c259 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -8,12 +8,7 @@ from dvc.cli import main from dvc.dvcfile import PROJECT_FILE, load_file -from dvc.exceptions import ( - CheckoutError, - CheckoutErrorSuggestGit, - ConfirmRemoveError, - NoOutputOrStageError, -) +from dvc.exceptions import CheckoutError, CheckoutErrorSuggestGit, NoOutputOrStageError from dvc.fs import LocalFileSystem, system from dvc.stage import Stage from dvc.stage.exceptions import StageFileDoesNotExistError @@ -294,19 +289,6 @@ def test_checkout_directory(tmp_dir, dvc): assert os.path.exists("data") -def test_checkout_hook(mocker, tmp_dir, dvc): - """Test that dvc checkout handles EOFError gracefully, which is what - it will experience when running in a git hook. - """ - tmp_dir.dvc_gen({"data": {"foo": "foo"}}) - mocker.patch("sys.stdout.isatty", return_value=True) - mocker.patch("dvc.prompt.input", side_effect=EOFError) - - (tmp_dir / "data").gen("test", "test") - with pytest.raises(ConfirmRemoveError): - dvc.checkout() - - def test_checkout_suggest_git(tmp_dir, dvc, scm): with pytest.raises(CheckoutErrorSuggestGit) as e: dvc.checkout(targets="gitbranch") @@ -432,7 +414,11 @@ def test_partial_checkout(tmp_dir, dvc, target): tmp_dir.dvc_gen({"dir": {"subdir": {"file": "file"}, "other": "other"}}) shutil.rmtree("dir") stats = dvc.checkout([target]) - assert stats["added"] == ["dir" + os.sep] + assert set(stats["added"]) == { + os.path.join("dir", ""), + os.path.join("dir", "subdir", ""), + os.path.join("dir", "subdir", "file"), + } assert list(walk_files("dir")) == [os.path.join("dir", "subdir", "file")] @@ -463,7 +449,14 @@ def test_stats_on_checkout(tmp_dir, dvc, scm): scm.checkout("-") stats = dvc.checkout() - assert set(stats["added"]) == {"bar", "dir" + os.sep, "foo"} + assert set(stats["added"]) == { + "bar", + "dir" + os.sep, + os.path.join("dir", "subdir", ""), + os.path.join("dir", "subdir", "file"), + os.path.join("dir", "other"), + "foo", + } tmp_dir.gen({"lorem": "lorem", "bar": "new bar", "dir2": {"file": "file"}}) (tmp_dir / "foo").unlink() @@ -479,7 +472,11 @@ def test_stats_on_checkout(tmp_dir, dvc, scm): scm.checkout("-") stats = dvc.checkout() assert set(stats["modified"]) == {"bar"} - assert set(stats["added"]) == {"dir2" + os.sep, "lorem"} + assert set(stats["added"]) == { + "dir2" + os.sep, + os.path.join("dir2", "file"), + "lorem", + } assert set(stats["deleted"]) == {"foo"} @@ -518,11 +515,17 @@ def test_stats_on_added_file_from_tracked_dir(tmp_dir, dvc, scm): tmp_dir.gen("dir/subdir/newfile", "newfile") tmp_dir.dvc_add("dir", commit="add newfile") scm.checkout("HEAD~") - assert dvc.checkout() == {**empty_checkout, "modified": ["dir" + os.sep]} + assert dvc.checkout() == { + **empty_checkout, + "deleted": [os.path.join("dir", "subdir", "newfile")], + } assert dvc.checkout() == empty_checkout scm.checkout("-") - assert dvc.checkout() == {**empty_checkout, "modified": ["dir" + os.sep]} + assert dvc.checkout() == { + **empty_checkout, + "added": [os.path.join("dir", "subdir", "newfile")], + } assert dvc.checkout() == empty_checkout @@ -535,11 +538,17 @@ def test_stats_on_updated_file_from_tracked_dir(tmp_dir, dvc, scm): tmp_dir.gen("dir/subdir/file", "what file?") tmp_dir.dvc_add("dir", commit="update file") scm.checkout("HEAD~") - assert dvc.checkout() == {**empty_checkout, "modified": ["dir" + os.sep]} + assert dvc.checkout() == { + **empty_checkout, + "modified": [os.path.join("dir", "subdir", "file")], + } assert dvc.checkout() == empty_checkout scm.checkout("-") - assert dvc.checkout() == {**empty_checkout, "modified": ["dir" + os.sep]} + assert dvc.checkout() == { + **empty_checkout, + "modified": [os.path.join("dir", "subdir", "file")], + } assert dvc.checkout() == empty_checkout @@ -552,11 +561,17 @@ def test_stats_on_removed_file_from_tracked_dir(tmp_dir, dvc, scm): (tmp_dir / "dir" / "subdir" / "file").unlink() tmp_dir.dvc_add("dir", commit="removed file from subdir") scm.checkout("HEAD~") - assert dvc.checkout() == {**empty_checkout, "modified": ["dir" + os.sep]} + assert dvc.checkout() == { + **empty_checkout, + "added": [os.path.join("dir", "subdir", "file")], + } assert dvc.checkout() == empty_checkout scm.checkout("-") - assert dvc.checkout() == {**empty_checkout, "modified": ["dir" + os.sep]} + assert dvc.checkout() == { + **empty_checkout, + "deleted": [os.path.join("dir", "subdir", ""), os.path.join("dir", "subdir", "file")], + } assert dvc.checkout() == empty_checkout @@ -805,7 +820,7 @@ def test_checkout_partial_unchanged(tmp_dir, dvc): # Relevant change, one modified bar.unlink() stats = dvc.checkout(str(bar)) - assert len(stats["modified"]) == 1 + assert stats == {**empty_checkout, "added": [os.path.join("data", "bar")]} # No changes inside data/sub stats = dvc.checkout(str(sub_dir)) @@ -814,10 +829,15 @@ def test_checkout_partial_unchanged(tmp_dir, dvc): # Relevant change, one modified sub_dir_file.unlink() stats = dvc.checkout(str(sub_dir)) - assert len(stats["modified"]) == 1 + assert stats == { + **empty_checkout, + "added": [os.path.join("data", "sub_dir", "baz")], + } - stats = dvc.checkout(str(data_dir / "empty_sub_dir")) - assert not any(stats.values()) + # FIXME OLOLO this file is not present in the index, what the hell are we even doing? + # currently it will report as data and empty_sub_dir deleted, which is half correct + # stats = dvc.checkout(str(data_dir / "empty_sub_dir")) + # assert not any(stats.values()) dvc.checkout(str(data_dir))