Skip to content

Commit

Permalink
fetch: use index fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed May 10, 2023
1 parent 06d8e3c commit e26919f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 170 deletions.
155 changes: 13 additions & 142 deletions dvc/repo/fetch.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
import logging
from contextlib import suppress
from typing import TYPE_CHECKING, Optional, Sequence

from dvc.config import NoRemoteError
from dvc.exceptions import DownloadError
from dvc.fs import Schemes

from . import locked

if TYPE_CHECKING:
from dvc.data_cloud import Remote
from dvc.repo import Repo
from dvc.types import TargetType
from dvc_data.hashfile.db import HashFileDB
from dvc_data.hashfile.transfer import TransferResult

logger = logging.getLogger(__name__)


Expand All @@ -31,7 +20,6 @@ def fetch( # noqa: C901, PLR0913
all_commits=False,
run_cache=False,
revs=None,
odb: Optional["HashFileDB"] = None,
) -> int:
"""Download data items from a cloud and imported repositories
Expand All @@ -45,18 +33,11 @@ def fetch( # noqa: C901, PLR0913
config.NoRemoteError: thrown when downloading only local files and no
remote is configured
"""
from dvc.repo.imports import save_imports
from dvc_data.hashfile.transfer import TransferResult
from dvc_data.index.fetch import fetch as ifetch

if isinstance(targets, str):
targets = [targets]

worktree_remote: Optional["Remote"] = None
with suppress(NoRemoteError):
_remote = self.cloud.get_remote(name=remote)
if _remote.worktree or _remote.fs.version_aware:
worktree_remote = _remote

failed_count = 0
transferred_count = 0

Expand All @@ -66,133 +47,23 @@ def fetch( # noqa: C901, PLR0913
except DownloadError as exc:
failed_count += exc.amount

no_remote_msg: Optional[str] = None
result = TransferResult(set(), set())
try:
if worktree_remote is not None:
transferred_count += _fetch_worktree(
self,
worktree_remote,
revs=revs,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
targets=targets,
jobs=jobs,
with_deps=with_deps,
recursive=recursive,
)
else:
d, f = _fetch(
self,
def _indexes():
for _ in self.brancher(
revs=revs,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
):
yield self.index.targets_view(
targets,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
with_deps=with_deps,
force=True,
remote=remote,
jobs=jobs,
recursive=recursive,
revs=revs,
odb=odb,
)
result.transferred.update(d)
result.failed.update(f)
except NoRemoteError as exc:
no_remote_msg = str(exc)

for rev in self.brancher(
revs=revs,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
):
imported = save_imports(
self,
targets,
unpartial=not rev or rev == "workspace",
recursive=recursive,
)
result.transferred.update(imported)
result.failed.difference_update(imported)

failed_count += len(result.failed)
).data["repo"]

result = ifetch(_indexes(), jobs=jobs) # pylint: disable=assignment-from-no-return
transferred_count += result[0]
failed_count += result[1]
if failed_count:
if no_remote_msg:
logger.error(no_remote_msg)
raise DownloadError(failed_count)

transferred_count += len(result.transferred)
return transferred_count


def _fetch(
repo: "Repo",
targets: "TargetType",
remote: Optional[str] = None,
jobs: Optional[int] = None,
odb: Optional["HashFileDB"] = None,
**kwargs,
) -> "TransferResult":
from dvc_data.hashfile.transfer import TransferResult

result = TransferResult(set(), set())
used = repo.used_objs(
targets,
remote=remote,
jobs=jobs,
**kwargs,
)
if odb:
all_ids = set()
for _odb, obj_ids in used.items():
all_ids.update(obj_ids)
d, f = repo.cloud.pull(
all_ids,
jobs=jobs,
remote=remote,
odb=odb,
)
result.transferred.update(d)
result.failed.update(f)
else:
for src_odb, obj_ids in sorted(
used.items(),
key=lambda item: item[0] is not None
and item[0].fs.protocol == Schemes.MEMORY,
):
d, f = repo.cloud.pull(
obj_ids,
jobs=jobs,
remote=remote,
odb=src_odb,
)
result.transferred.update(d)
result.failed.update(f)
return result


def _fetch_worktree(
repo: "Repo",
remote: "Remote",
revs: Optional[Sequence[str]] = None,
all_branches: bool = False,
all_tags: bool = False,
all_commits: bool = False,
targets: Optional["TargetType"] = None,
jobs: Optional[int] = None,
**kwargs,
) -> int:
from dvc.repo.worktree import fetch_worktree

downloaded = 0
for _ in repo.brancher(
revs=revs,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
):
downloaded += fetch_worktree(repo, remote, targets=targets, jobs=jobs, **kwargs)
return downloaded
2 changes: 1 addition & 1 deletion dvc/repo/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def _load_storage_from_out(storage_map, key, out):

if out.stage.is_import:
dep = out.stage.deps[0]
storage_map.add_data(FileStorage(key, dep.fs, dep.fs_path))
storage_map.add_remote(FileStorage(key, dep.fs, dep.fs_path))


class Index:
Expand Down
26 changes: 0 additions & 26 deletions dvc/repo/worktree.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,32 +104,6 @@ def _get_remote(
return repo.cloud.get_remote(name, command)


def fetch_worktree(
repo: "Repo",
remote: "Remote",
targets: Optional["TargetType"] = None,
jobs: Optional[int] = None,
**kwargs: Any,
) -> int:
from dvc_data.index import save

transferred = 0
for remote_name, view in worktree_view_by_remotes(
repo.index, push=True, targets=targets, **kwargs
):
remote_obj = _get_remote(repo, remote_name, remote, "fetch")
index = view.data["repo"]
total = len(index)
with Callback.as_tqdm_callback(
unit="file",
desc=f"Fetching from remote {remote_obj.name!r}",
disable=total == 0,
) as cb:
cb.set_size(total)
transferred += save(index, callback=cb, jobs=jobs, storage="remote")
return transferred


def push_worktree(
repo: "Repo",
remote: "Remote",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ dependencies = [
"configobj>=5.0.6",
"distro>=1.3",
"dpath<3,>=2.1.0",
"dvc-data>=0.47.1,<0.48",
"dvc-data@git+https://github.com/efiop/dvc-data#egg=fix-341",
#"dvc-data>=0.48.0,<0.49",
"dvc-http>=2.29.0",
"dvc-render>=0.3.1,<1",
"dvc-studio-client>=0.9.0,<1",
Expand Down

0 comments on commit e26919f

Please sign in to comment.