Skip to content

Commit

Permalink
push: use index push
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Aug 6, 2023
1 parent 43b371c commit b657943
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 172 deletions.
150 changes: 64 additions & 86 deletions dvc/repo/push.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
from contextlib import suppress
from typing import TYPE_CHECKING, Optional, Sequence
from typing import TYPE_CHECKING, Optional

from dvc.config import NoRemoteError
from dvc.exceptions import InvalidArgumentError, UploadError
from dvc.utils import glob_targets
from dvc.exceptions import UploadError

from . import locked

if TYPE_CHECKING:
from dvc.data_cloud import Remote
from dvc.repo import Repo
from dvc.types import TargetType
from dvc_objects.db import ObjectDB


Expand All @@ -29,91 +23,75 @@ def push( # noqa: C901, PLR0913
revs=None,
glob=False,
odb: Optional["ObjectDB"] = None,
include_imports=False,
# include_imports=False,
):
worktree_remote: Optional["Remote"] = None
with suppress(NoRemoteError):
_remote = self.cloud.get_remote(name=remote)
if _remote and (_remote.worktree or _remote.fs.version_aware):
worktree_remote = _remote
from fsspec.utils import tokenize

from dvc.fs.callbacks import Callback
from dvc.utils import glob_targets
from dvc_data.index.fetch import collect
from dvc_data.index.push import push as ipush

from .fetch import _collect_indexes

failed_count = 0
transferred_count = 0

pushed = 0
used_run_cache = self.stage_cache.push(remote, odb=odb) if run_cache else []
pushed += len(used_run_cache)
transferred_count += len(used_run_cache)

if isinstance(targets, str):
targets = [targets]

expanded_targets = glob_targets(targets, glob=glob)

if worktree_remote is not None:
pushed += _push_worktree(
self,
worktree_remote,
revs=revs,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
targets=expanded_targets,
jobs=jobs,
with_deps=with_deps,
recursive=recursive,
)
else:
used = self.used_objs(
expanded_targets,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
with_deps=with_deps,
force=True,
remote=remote,
jobs=jobs,
recursive=recursive,
used_run_cache=used_run_cache,
revs=revs,
push=True,
)
indexes = _collect_indexes(
self,
targets=glob_targets(targets, glob=glob),
remote=remote,
all_branches=all_branches,
with_deps=with_deps,
all_tags=all_tags,
recursive=recursive,
all_commits=all_commits,
revs=revs,
)

if odb:
all_ids = set()
for dest_odb, obj_ids in used.items():
if not include_imports and dest_odb and dest_odb.read_only:
continue
all_ids.update(obj_ids)
result = self.cloud.push(all_ids, jobs, remote=remote, odb=odb)
if result.failed:
raise UploadError(len(result.failed))
pushed += len(result.transferred)
else:
for dest_odb, obj_ids in used.items():
if dest_odb and dest_odb.read_only:
continue
result = self.cloud.push(
obj_ids, jobs, remote=remote, odb=odb or dest_odb
)
if result.failed:
raise UploadError(len(result.failed))
pushed += len(result.transferred)
return pushed


def _push_worktree(
repo: "Repo",
remote: "Remote",
revs: Optional[Sequence[str]] = None,
all_branches: bool = False,
all_tags: bool = False,
all_commits: bool = False,
targets: Optional["TargetType"] = None,
jobs: Optional[int] = None,
**kwargs,
) -> int:
from dvc.repo.worktree import push_worktree

if revs or all_branches or all_tags or all_commits:
raise InvalidArgumentError(
"Multiple rev push is unsupported for cloud versioned remotes"
cache_key = ("fetch", tokenize(sorted(indexes.keys())))

with Callback.as_tqdm_callback(
desc="Collecting",
unit="entry",
) as cb:
data = collect(
indexes.values(),
"remote",
cache_index=self.data_index,
cache_key=cache_key,
callback=cb,
)
# data, unversioned_count = _log_unversioned(data)
# failed_count += unversioned_count

with Callback.as_tqdm_callback(
desc="Pushing",
unit="file",
) as cb:
try:
push_transferred, push_failed = ipush(
data,
jobs=jobs,
callback=cb,
) # pylint: disable=assignment-from-no-return
finally:
for fs_index in data:
fs_index.close()

# if fetch_transferred:
# # NOTE: dropping cached index to force reloading from newly saved cache
# self.drop_data_index()

transferred_count += push_transferred
failed_count += push_failed
if failed_count:
raise UploadError(failed_count)

return push_worktree(repo, remote, targets=targets, jobs=jobs, **kwargs)
return transferred_count
87 changes: 1 addition & 86 deletions dvc/repo/worktree.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import logging
from functools import partial
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Set, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Tuple, Union

from funcy import first

from dvc.exceptions import DvcException
from dvc.fs.callbacks import Callback
from dvc.stage.exceptions import StageUpdateError

Expand Down Expand Up @@ -104,90 +103,6 @@ def _get_remote(
return repo.cloud.get_remote(name, command)


def push_worktree(
repo: "Repo",
remote: "Remote",
targets: Optional["TargetType"] = None,
jobs: Optional[int] = None,
**kwargs: Any,
) -> int:
from dvc.repo.index import build_data_index
from dvc_data.index.checkout import VersioningNotSupported, apply, compare

pushed = 0
stages: Set["Stage"] = set()

for remote_name, view in worktree_view_by_remotes(
repo.index, push=True, targets=targets, **kwargs
):
remote_obj = _get_remote(repo, remote_name, remote, "push")
new_index = view.data["repo"]
if remote_obj.worktree:
logger.debug("indexing latest worktree for '%s'", remote_obj.path)
old_index = build_data_index(view, remote_obj.path, remote_obj.fs)
logger.debug("Pushing worktree changes to '%s'", remote_obj.path)
else:
old_index = None
logger.debug("Pushing version-aware files to '%s'", remote_obj.path)

if remote_obj.worktree:
diff_kwargs: Dict[str, Any] = {
"meta_only": True,
"meta_cmp_key": partial(_meta_checksum, remote_obj.fs),
}
else:
diff_kwargs = {}

with Callback.as_tqdm_callback(
unit="entry",
desc=f"Comparing indexes for remote {remote_obj.name!r}",
) as cb:
diff = compare(
old_index,
new_index,
callback=cb,
delete=remote_obj.worktree,
**diff_kwargs,
)

total = len(new_index)
with Callback.as_tqdm_callback(
unit="file",
desc=f"Pushing to remote {remote_obj.name!r}",
disable=total == 0,
) as cb:
cb.set_size(total)
try:
apply(
diff,
remote_obj.path,
remote_obj.fs,
callback=cb,
latest_only=remote_obj.worktree,
jobs=jobs,
)
pushed += sum(len(changes) for changes in diff.changes.values())
except VersioningNotSupported:
logger.exception("")
raise DvcException(
f"remote {remote_obj.name!r} does not support versioning"
) from None

if remote_obj.index is not None:
for key, entry in new_index.iteritems():
remote_obj.index[key] = entry
remote_obj.index.commit()

for out in view.outs:
workspace, _key = out.index_key
_merge_push_meta(out, repo.index.data[workspace], remote_obj.name)
stages.add(out.stage)

for stage in stages:
stage.dump(with_files=True, update_pipeline=False)
return pushed


def _merge_push_meta(
out: "Output",
index: Union["DataIndex", "DataIndexView"],
Expand Down

0 comments on commit b657943

Please sign in to comment.