diff --git a/dvc/repo/push.py b/dvc/repo/push.py index e43b5c97706..1323be142b6 100644 --- a/dvc/repo/push.py +++ b/dvc/repo/push.py @@ -1,16 +1,10 @@ -from contextlib import suppress -from typing import TYPE_CHECKING, Optional, Sequence +from typing import TYPE_CHECKING, Optional -from dvc.config import NoRemoteError -from dvc.exceptions import InvalidArgumentError, UploadError -from dvc.utils import glob_targets +from dvc.exceptions import UploadError from . import locked if TYPE_CHECKING: - from dvc.data_cloud import Remote - from dvc.repo import Repo - from dvc.types import TargetType from dvc_objects.db import ObjectDB @@ -29,91 +23,75 @@ def push( # noqa: C901, PLR0913 revs=None, glob=False, odb: Optional["ObjectDB"] = None, - include_imports=False, + # include_imports=False, ): - worktree_remote: Optional["Remote"] = None - with suppress(NoRemoteError): - _remote = self.cloud.get_remote(name=remote) - if _remote and (_remote.worktree or _remote.fs.version_aware): - worktree_remote = _remote + from fsspec.utils import tokenize + + from dvc.fs.callbacks import Callback + from dvc.utils import glob_targets + from dvc_data.index.fetch import collect + from dvc_data.index.push import push as ipush + + from .fetch import _collect_indexes + + failed_count = 0 + transferred_count = 0 - pushed = 0 used_run_cache = self.stage_cache.push(remote, odb=odb) if run_cache else [] - pushed += len(used_run_cache) + transferred_count += len(used_run_cache) if isinstance(targets, str): targets = [targets] - expanded_targets = glob_targets(targets, glob=glob) - - if worktree_remote is not None: - pushed += _push_worktree( - self, - worktree_remote, - revs=revs, - all_branches=all_branches, - all_tags=all_tags, - all_commits=all_commits, - targets=expanded_targets, - jobs=jobs, - with_deps=with_deps, - recursive=recursive, - ) - else: - used = self.used_objs( - expanded_targets, - all_branches=all_branches, - all_tags=all_tags, - all_commits=all_commits, - with_deps=with_deps, - force=True, - remote=remote, - jobs=jobs, - recursive=recursive, - used_run_cache=used_run_cache, - revs=revs, - push=True, - ) + indexes = _collect_indexes( + self, + targets=glob_targets(targets, glob=glob), + remote=remote, + all_branches=all_branches, + with_deps=with_deps, + all_tags=all_tags, + recursive=recursive, + all_commits=all_commits, + revs=revs, + ) - if odb: - all_ids = set() - for dest_odb, obj_ids in used.items(): - if not include_imports and dest_odb and dest_odb.read_only: - continue - all_ids.update(obj_ids) - result = self.cloud.push(all_ids, jobs, remote=remote, odb=odb) - if result.failed: - raise UploadError(len(result.failed)) - pushed += len(result.transferred) - else: - for dest_odb, obj_ids in used.items(): - if dest_odb and dest_odb.read_only: - continue - result = self.cloud.push( - obj_ids, jobs, remote=remote, odb=odb or dest_odb - ) - if result.failed: - raise UploadError(len(result.failed)) - pushed += len(result.transferred) - return pushed - - -def _push_worktree( - repo: "Repo", - remote: "Remote", - revs: Optional[Sequence[str]] = None, - all_branches: bool = False, - all_tags: bool = False, - all_commits: bool = False, - targets: Optional["TargetType"] = None, - jobs: Optional[int] = None, - **kwargs, -) -> int: - from dvc.repo.worktree import push_worktree - - if revs or all_branches or all_tags or all_commits: - raise InvalidArgumentError( - "Multiple rev push is unsupported for cloud versioned remotes" + cache_key = ("fetch", tokenize(sorted(indexes.keys()))) + + with Callback.as_tqdm_callback( + desc="Collecting", + unit="entry", + ) as cb: + data = collect( + indexes.values(), + "remote", + cache_index=self.data_index, + cache_key=cache_key, + callback=cb, ) + # data, unversioned_count = _log_unversioned(data) + # failed_count += unversioned_count + + with Callback.as_tqdm_callback( + desc="Pushing", + unit="file", + ) as cb: + try: + push_transferred, push_failed = ipush( + data, + jobs=jobs, + callback=cb, + ) # pylint: disable=assignment-from-no-return + finally: + for fs_index in data: + fs_index.close() + + # if fetch_transferred: + # # NOTE: dropping cached index to force reloading from newly saved cache + # self.drop_data_index() + + transferred_count += push_transferred + failed_count += push_failed + if failed_count: + raise UploadError(failed_count) - return push_worktree(repo, remote, targets=targets, jobs=jobs, **kwargs) + return transferred_count diff --git a/dvc/repo/worktree.py b/dvc/repo/worktree.py index 9cf39253b68..b5e68e4874d 100644 --- a/dvc/repo/worktree.py +++ b/dvc/repo/worktree.py @@ -1,10 +1,9 @@ import logging from functools import partial -from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Set, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Tuple, Union from funcy import first -from dvc.exceptions import DvcException from dvc.fs.callbacks import Callback from dvc.stage.exceptions import StageUpdateError @@ -104,90 +103,6 @@ def _get_remote( return repo.cloud.get_remote(name, command) -def push_worktree( - repo: "Repo", - remote: "Remote", - targets: Optional["TargetType"] = None, - jobs: Optional[int] = None, - **kwargs: Any, -) -> int: - from dvc.repo.index import build_data_index - from dvc_data.index.checkout import VersioningNotSupported, apply, compare - - pushed = 0 - stages: Set["Stage"] = set() - - for remote_name, view in worktree_view_by_remotes( - repo.index, push=True, targets=targets, **kwargs - ): - remote_obj = _get_remote(repo, remote_name, remote, "push") - new_index = view.data["repo"] - if remote_obj.worktree: - logger.debug("indexing latest worktree for '%s'", remote_obj.path) - old_index = build_data_index(view, remote_obj.path, remote_obj.fs) - logger.debug("Pushing worktree changes to '%s'", remote_obj.path) - else: - old_index = None - logger.debug("Pushing version-aware files to '%s'", remote_obj.path) - - if remote_obj.worktree: - diff_kwargs: Dict[str, Any] = { - "meta_only": True, - "meta_cmp_key": partial(_meta_checksum, remote_obj.fs), - } - else: - diff_kwargs = {} - - with Callback.as_tqdm_callback( - unit="entry", - desc=f"Comparing indexes for remote {remote_obj.name!r}", - ) as cb: - diff = compare( - old_index, - new_index, - callback=cb, - delete=remote_obj.worktree, - **diff_kwargs, - ) - - total = len(new_index) - with Callback.as_tqdm_callback( - unit="file", - desc=f"Pushing to remote {remote_obj.name!r}", - disable=total == 0, - ) as cb: - cb.set_size(total) - try: - apply( - diff, - remote_obj.path, - remote_obj.fs, - callback=cb, - latest_only=remote_obj.worktree, - jobs=jobs, - ) - pushed += sum(len(changes) for changes in diff.changes.values()) - except VersioningNotSupported: - logger.exception("") - raise DvcException( - f"remote {remote_obj.name!r} does not support versioning" - ) from None - - if remote_obj.index is not None: - for key, entry in new_index.iteritems(): - remote_obj.index[key] = entry - remote_obj.index.commit() - - for out in view.outs: - workspace, _key = out.index_key - _merge_push_meta(out, repo.index.data[workspace], remote_obj.name) - stages.add(out.stage) - - for stage in stages: - stage.dump(with_files=True, update_pipeline=False) - return pushed - - def _merge_push_meta( out: "Output", index: Union["DataIndex", "DataIndexView"],