From f2db666d6090dc963c71caa39e22a03d72f33d1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saugat=20Pachhai=20=28=E0=A4=B8=E0=A5=8C=E0=A4=97=E0=A4=BE?= =?UTF-8?q?=E0=A4=A4=29?= Date: Sat, 24 Feb 2024 17:51:42 +0545 Subject: [PATCH] improve deserializing; handle invalidation --- dvc/dependency/dataset.py | 19 +++++--- dvc/repo/datasets.py | 95 +++++++++++++++++++++++++++++---------- 2 files changed, 84 insertions(+), 30 deletions(-) diff --git a/dvc/dependency/dataset.py b/dvc/dependency/dataset.py index 167700a53e..e9f3128b6e 100644 --- a/dvc/dependency/dataset.py +++ b/dvc/dependency/dataset.py @@ -49,10 +49,14 @@ def fill_values(self, values=None): def workspace_status(self): ds = self.repo.datasets[self.name] - info: dict[str, Any] = self.hash_info.value if self.hash_info else {} # type: ignore[assignment] + if not ds.lock: + return {str(self): "not in sync"} - # TODO: what to do if dvc.lock and dvc.yaml are different - if not ds.lock or info != ds.lock.to_dict(): + info: dict[str, Any] = self.hash_info.value if self.hash_info else {} # type: ignore[assignment] + lock = self.repo.datasets._lock_from_info(info) + if not lock: + return {str(self): "new"} + if lock != ds.lock: return {str(self): "modified"} return {} @@ -62,9 +66,12 @@ def status(self): def get_hash(self): ds = self.repo.datasets[self.name] if not ds.lock: - raise DvcException( - f"Information missing for {self.name!r} dataset in dvc.lock" - ) + if ds._invalidated: + raise DvcException( + "Dataset information is not in sync. " + f"Run 'dvc ds update {self.name}' to sync." + ) + raise DvcException("Dataset information missing from dvc.lock file") return HashInfo(self.PARAM_DATASET, ds.lock.to_dict()) # type: ignore[arg-type] def save(self): diff --git a/dvc/repo/datasets.py b/dvc/repo/datasets.py index 65f490b3b7..4188d9f5af 100644 --- a/dvc/repo/datasets.py +++ b/dvc/repo/datasets.py @@ -61,11 +61,12 @@ def from_dict(cls: type["Self"], d: dict[str, Any]) -> "Self": class DatasetSpec(SerDe): name: str url: str - type: str + type: Literal["dvc", "dvcx", "url"] @frozen(kw_only=True) class DVCDatasetSpec(DatasetSpec): + type: Literal["dvc"] path: str = field(default="", converter=default_str) rev: Optional[str] = None @@ -97,11 +98,18 @@ class URLDatasetLock(DatasetSpec): ) +def to_spec(lock: "Lock") -> "Spec": + cls = DVCDatasetSpec if lock.type == "dvc" else DatasetSpec + return cls(**{f.name: getattr(lock, f.name) for f in fields(cls)}) + + @frozen(kw_only=True) class DVCDataset: manifest_path: str - spec: "DVCDatasetSpec" - lock: "Optional[DVCDatasetLock]" = None + spec: DVCDatasetSpec + lock: Optional[DVCDatasetLock] = None + _invalidated: bool = field(default=False, eq=False, repr=False) + type: ClassVar[Literal["dvc"]] = "dvc" def update(self, repo, rev: Optional[str] = None, **kwargs) -> "Self": @@ -131,6 +139,8 @@ class DVCXDataset: manifest_path: str spec: "DatasetSpec" lock: "Optional[DVCXDatasetLock]" = field(default=None) + _invalidated: bool = field(default=False, eq=False, repr=False) + type: ClassVar[Literal["dvcx"]] = "dvcx" @property @@ -179,6 +189,8 @@ class URLDataset: manifest_path: str spec: "DatasetSpec" lock: "Optional[URLDatasetLock]" = None + _invalidated: bool = field(default=False, eq=False, repr=False) + type: ClassVar[Literal["url"]] = "url" def update(self, repo, **kwargs): @@ -269,44 +281,79 @@ def _reset(self) -> None: self.__dict__.pop("_lock", None) self.__dict__.pop("_datasets", None) + @staticmethod + def _spec_from_info(spec: dict[str, Any]) -> Spec: + typ = spec.get("type") + if not typ: + raise ValueError("type should be present in spec") + if typ == "dvc": + return DVCDatasetSpec.from_dict(spec) + if typ in {"dvcx", "url"}: + return DatasetSpec.from_dict(spec) + raise ValueError(f"unknown dataset type: {spec.get('type', '')}") + + @staticmethod + def _lock_from_info(lock: Optional[dict[str, Any]]) -> Optional[Lock]: + kl = {"dvc": DVCDatasetLock, "dvcx": DVCXDatasetLock, "url": URLDatasetLock} + if lock and (cls := kl.get(lock.get("type", ""))): # type: ignore[assignment] + return cls.from_dict(lock) # type: ignore[attr-defined] + return None + + @classmethod def _build_dataset( - self, + cls, manifest_path: str, - spec: dict[str, Any], - lock: Optional[dict[str, Any]] = None, + spec_data: dict[str, Any], + lock_data: Optional[dict[str, Any]] = None, ) -> Dataset: - if lock is not None: - assert lock - assert spec["name"] == lock["name"] - assert spec["type"] == lock["type"] + _invalidated = False + spec = cls._spec_from_info(spec_data) + lock = cls._lock_from_info(lock_data) + # if dvc.lock and dvc.yaml file are not in sync, we invalidate the lock. + if lock is not None and to_spec(lock) != spec: + logger.debug( + "invalidated lock data for %s in %s", + spec.name, + manifest_path, + ) + _invalidated = True # signal is used during `dvc repro`/`dvc status`. + lock = None - if spec["type"] == "dvc": + assert isinstance(spec, DatasetSpec) + if spec.type == "dvc": + assert lock is None or isinstance(lock, DVCDatasetLock) + assert isinstance(spec, DVCDatasetSpec) return DVCDataset( manifest_path=manifest_path, - spec=DVCDatasetSpec.from_dict(spec), - lock=None if lock is None else DVCDatasetLock.from_dict(lock), + spec=spec, + lock=lock, + invalidated=_invalidated, ) - if spec["type"] == "dvcx": - return DVCXDataset( + if spec.type == "url": + assert lock is None or isinstance(lock, URLDatasetLock) + return URLDataset( manifest_path=manifest_path, - spec=DatasetSpec.from_dict(spec), - lock=None if lock is None else DVCXDatasetLock.from_dict(lock), + spec=spec, + lock=lock, + invalidated=_invalidated, ) - if spec["type"] == "url": - return URLDataset( + if spec.type == "dvcx": + assert lock is None or isinstance(lock, DVCXDatasetLock) + return DVCXDataset( manifest_path=manifest_path, - spec=DatasetSpec.from_dict(spec), - lock=None if lock is None else URLDatasetLock.from_dict(lock), + spec=spec, + lock=lock, + invalidated=_invalidated, ) - raise ValueError(f"unknown dataset type: {spec['type']}") + raise ValueError(f"unknown dataset type: {spec.type!r}") def add( self, url: str, name: str, manifest_path: str = "dvc.yaml", **kwargs: Any ) -> Dataset: kwargs.update({"url": url, "name": name}) - - dataset = self._build_dataset(manifest_path, spec=kwargs) + dataset = self._build_dataset(manifest_path, kwargs) dataset = dataset.update(self.repo) + self.dump(dataset) self[name] = dataset return dataset