Skip to content

Commit

Permalink
add: support virtual operation (iterative#9389)
Browse files Browse the repository at this point in the history
  • Loading branch information
skshetry authored May 11, 2023
1 parent 06d8e3c commit 2342099
Show file tree
Hide file tree
Showing 8 changed files with 458 additions and 73 deletions.
4 changes: 3 additions & 1 deletion dvc/commands/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def run(self):
jobs=self.args.jobs,
force=self.args.force,
)

except FileNotFoundError:
logger.exception("")
return 1
except DvcException:
logger.exception("")
return 1
Expand Down
176 changes: 166 additions & 10 deletions dvc/output.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import errno
import logging
import os
import posixpath
from collections import defaultdict
from contextlib import suppress
from operator import itemgetter
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type, Union
from urllib.parse import urlparse

from funcy import collecting, first, project
Expand All @@ -25,11 +26,12 @@
from dvc_data.hashfile import load as oload
from dvc_data.hashfile.build import build
from dvc_data.hashfile.checkout import checkout
from dvc_data.hashfile.db import HashFileDB, add_update_tree
from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.istextfile import istextfile
from dvc_data.hashfile.meta import Meta
from dvc_data.hashfile.transfer import transfer as otransfer
from dvc_data.hashfile.tree import Tree
from dvc_data.hashfile.tree import Tree, du
from dvc_objects.errors import ObjectFormatError

from .annotations import ANNOTATION_FIELDS, ANNOTATION_SCHEMA, Annotation
Expand All @@ -39,7 +41,6 @@
from .utils.fs import path_isin

if TYPE_CHECKING:
from dvc_data.hashfile.db import HashFileDB
from dvc_data.hashfile.obj import HashFile
from dvc_data.index import DataIndexKey
from dvc_objects.db import ObjectDB
Expand Down Expand Up @@ -417,8 +418,6 @@ def __init__( # noqa: PLR0913

def _compute_meta_hash_info_from_files(self) -> None:
if self.files:
from dvc_data.hashfile.db import HashFileDB

tree = Tree.from_list(self.files, hash_name=self.hash_name)
tree.digest(with_meta=True)
self.odb = HashFileDB(tree.fs, tree.path + ".odb")
Expand Down Expand Up @@ -1008,7 +1007,7 @@ def unprotect(self):
if self.exists:
self.cache.unprotect(self.fs_path)

def get_dir_cache(self, **kwargs):
def get_dir_cache(self, **kwargs) -> Optional["Tree"]:
if not self.is_dir_checksum:
raise DvcException("cannot get dir cache for file checksum")

Expand All @@ -1022,14 +1021,17 @@ def get_dir_cache(self, **kwargs):
self.repo.cloud.pull([obj.hash_info], **kwargs)

if self.obj:
assert isinstance(self.obj, Tree)
return self.obj

try:
self.obj = oload(self.cache, self.hash_info)
obj = oload(self.cache, self.hash_info)
assert isinstance(obj, Tree)
except (FileNotFoundError, ObjectFormatError):
self.obj = None
obj = None

return self.obj
self.obj = obj
return obj

def _collect_used_dir_cache(
self, remote=None, force=False, jobs=None, filter_info=None
Expand Down Expand Up @@ -1165,7 +1167,7 @@ def _check_can_merge(self, out):

def merge(self, ancestor, other, allowed=None):
from dvc_data.hashfile.tree import MergeError as TreeMergeError
from dvc_data.hashfile.tree import du, merge
from dvc_data.hashfile.tree import merge

assert other

Expand Down Expand Up @@ -1198,6 +1200,160 @@ def merge(self, ancestor, other, allowed=None):
nfiles=len(merged),
)

def unstage(self, path: str) -> Tuple["Meta", "Tree"]:
from pygtrie import Trie

from dvc_objects.fs.path import Path

assert isinstance(self.fs.path, Path)
rel_key = tuple(self.fs.path.parts(self.fs.path.relpath(path, self.fs_path)))

if not self.hash_info:
tree = Tree()
else:
tree = self.get_dir_cache() or Tree()

trie = tree.as_trie()
assert isinstance(trie, Trie)

try:
del trie[rel_key:] # type: ignore[misc]
except KeyError:
raise FileNotFoundError( # noqa: B904
errno.ENOENT,
os.strerror(errno.ENOENT),
self.fs.path.relpath(path),
)

new = tree.from_trie(trie)
new.digest()
return Meta(nfiles=len(new), isdir=True), new

def apply(
self,
path: str,
obj: Union["Tree", "HashFile"],
meta: "Meta",
) -> Tuple["Meta", "Tree"]:
from pygtrie import Trie

from dvc_objects.fs.path import Path

assert isinstance(self.fs.path, Path)
append_only = True
rel_key = tuple(self.fs.path.parts(self.fs.path.relpath(path, self.fs_path)))

if not self.hash_info:
tree = Tree()
else:
tree = self.get_dir_cache() or Tree()

trie = tree.as_trie()
assert isinstance(trie, Trie)

try:
del trie[rel_key:] # type: ignore[misc]
except KeyError:
pass
else:
append_only = False

items = {}
if isinstance(obj, Tree):
items = {(*rel_key, *key): (m, o) for key, m, o in obj}
else:
items = {rel_key: (meta, obj.hash_info)}
trie.update(items)

new = Tree.from_trie(trie)
new.digest()

size = self.meta.size if self.meta and self.meta.size else None
if append_only and size and meta.size is not None:
# if files were only appended, we can sum to the existing size
size += meta.size
elif self.hash_info and self.hash_info == new.hash_info:
# if hashes are same, sizes must have been the same
size = self.meta.size
else:
size = None

meta = Meta(nfiles=len(new), size=size, isdir=True)
return meta, new

def add( # noqa: C901
self, path: Optional[str] = None, no_commit: bool = False, relink: bool = True
) -> Optional["HashFile"]:
path = path or self.fs_path
if self.hash_info and not self.is_dir_checksum and self.fs_path != path:
raise DvcException(
f"Cannot modify '{self}' which is being tracked as a file"
)

assert self.repo
cache = self.cache if self.use_cache else self.repo.cache.local
assert isinstance(cache, HashFileDB)

new: "HashFile"
try:
assert self.hash_name
staging, meta, obj = build(
cache,
path,
self.fs,
self.hash_name,
ignore=self.dvcignore,
dry_run=not self.use_cache,
)
except FileNotFoundError as exc:
if self.fs_path == path:
raise self.DoesNotExistError(self) from exc
if not self.is_dir_checksum:
raise

meta, new = self.unstage(path)
staging, obj = None, None
else:
assert obj
assert staging
if self.fs_path != path:
meta, new = self.apply(path, obj, meta)
add_update_tree(staging, new)
else:
new = obj

self.obj = new
self.hash_info = self.obj.hash_info
self.meta = meta
self.files = None
self.ignore()

if no_commit or not self.use_cache:
return obj

if isinstance(new, Tree):
add_update_tree(cache, new)

if not obj:
return obj

assert staging
assert obj.hash_info
otransfer(staging, self.cache, {obj.hash_info}, hardlink=relink, shallow=False)

if relink:
self._checkout(
path,
self.fs,
obj,
self.cache,
relink=True,
state=self.repo.state,
prompt=prompt.confirm,
)
self.set_exec()
return obj

@property
def fspath(self):
return self.fs_path
Expand Down
63 changes: 40 additions & 23 deletions dvc/repo/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
import os
from contextlib import contextmanager
from itertools import tee
from typing import TYPE_CHECKING, Any, Iterator, List, Optional
from typing import TYPE_CHECKING, Any, Iterator, List, NamedTuple, Optional

import colorama

from dvc.exceptions import (
CacheLinkError,
DvcException,
InvalidArgumentError,
OutputDuplicationError,
OutputNotFoundError,
OverlappingOutputPathsError,
RecursiveAddingWhileUsingFilename,
)
Expand All @@ -29,6 +31,11 @@
logger = logging.getLogger(__name__)


class StageInfo(NamedTuple):
stage: "Stage"
output_exists: bool


OVERLAPPING_CHILD_FMT = (
"Cannot add '{out}', because it is overlapping with other "
"DVC tracked output: '{parent}'.\n"
Expand Down Expand Up @@ -114,18 +121,18 @@ def translate_graph_error(stages: Stages) -> Iterator[None]:
)


def progress_iter(stages: Stages) -> Iterator["Stage"]:
def progress_iter(stages: List[StageInfo]) -> Iterator["StageInfo"]:
total = len(stages)
desc = "Adding..."
with ui.progress(stages, total=total, desc=desc, unit="file", leave=True) as pbar:
if total == 1:
pbar.bar_format = desc
pbar.refresh()

for stage in pbar:
for item in pbar:
if total > 1:
pbar.set_msg(f"{stage.outs[0]}")
yield stage
pbar.set_msg(f"{item.stage.outs[0]}")
yield item
if total == 1: # restore bar format for stats
# pylint: disable=no-member
pbar.bar_format = pbar.BAR_FMT_DEFAULT
Expand Down Expand Up @@ -183,31 +190,33 @@ def add(
desc = "Collecting targets"
stages_it = create_stages(repo, add_targets, fname, transfer, **kwargs)
stages = list(ui.progress(stages_it, desc=desc, unit="file"))

stages_list = [stage for stage, _ in stages]
msg = "Collecting stages from the workspace"
with translate_graph_error(stages), ui.status(msg) as status:
with translate_graph_error(stages_list), ui.status(msg) as status:
# remove existing stages that are to-be replaced with these
# new stages for the graph checks.
repo.check_graph(
stages=stages, callback=lambda: status.update("Checking graph")
stages=stages_list, callback=lambda: status.update("Checking graph")
)

odb = None
if to_remote:
odb = repo.cloud.get_remote_odb(kwargs.get("remote"), "add")

with warn_link_failures() as link_failures:
for stage, source in zip(progress_iter(stages), sources):
for (stage, output_exists), source in zip(progress_iter(stages), sources):
out = stage.outs[0]
if to_remote or to_cache:
stage.transfer(source, to_remote=to_remote, odb=odb, **kwargs)
else:
try:
stage.save()
if not no_commit:
stage.commit()
path = out.fs.path.abspath(source) if output_exists else None
stage.add_outs(path, no_commit=no_commit)
except CacheLinkError:
link_failures.append(str(stage.relpath))
stage.dump()
return stages
return stages_list


LARGE_DIR_RECURSIVE_ADD_WARNING = (
Expand Down Expand Up @@ -263,24 +272,32 @@ def create_stages(
external: bool = False,
force: bool = False,
**kwargs: Any,
) -> Iterator["Stage"]:
) -> Iterator[StageInfo]:
for target in targets:
if kwargs.get("out"):
target = resolve_output(target, kwargs["out"], force=force)
path, wdir, out = resolve_paths(
repo, target, always_local=transfer and not kwargs.get("out")
)

stage = repo.stage.create(
single_stage=True,
validate=False,
fname=fname or path,
wdir=wdir,
outs=[out],
external=external,
force=force,
)
try:
(out_obj,) = repo.find_outs_by_path(target, strict=False)
stage = out_obj.stage
if not stage.is_data_source:
raise DvcException(f"cannot update {out!r}: not a data source")
output_exists = True
except OutputNotFoundError:
stage = repo.stage.create(
single_stage=True,
validate=False,
fname=fname or path,
wdir=wdir,
outs=[out],
external=external,
force=force,
)
output_exists = False

out_obj = stage.outs[0]
out_obj.annot.update(**kwargs)
yield stage
yield StageInfo(stage, output_exists)
Loading

0 comments on commit 2342099

Please sign in to comment.