From f18053b4d1597978be1bc5841f763d9223d75f43 Mon Sep 17 00:00:00 2001 From: Sergei Borodin Date: Tue, 3 Dec 2024 12:31:02 +0100 Subject: [PATCH] feat: BI-5953 schema version attribute (#731) * feat: BI-5953 schema version attribute * feat: BI-5953 schema_version crawler * feat: BI-5953 schema_version storage schema --- lib/dl_core/dl_core/us_connection_base.py | 5 +++ lib/dl_core/dl_core/us_dataset.py | 5 +++ .../us_manager/storage_schemas/connection.py | 1 + .../us_manager/storage_schemas/dataset.py | 1 + .../api/crawlers/schema_version_migration.py | 39 +++++++++++++++++++ 5 files changed, 51 insertions(+) create mode 100644 lib/dl_maintenance/dl_maintenance/api/crawlers/schema_version_migration.py diff --git a/lib/dl_core/dl_core/us_connection_base.py b/lib/dl_core/dl_core/us_connection_base.py index 8e443da4a..589fcb3ae 100644 --- a/lib/dl_core/dl_core/us_connection_base.py +++ b/lib/dl_core/dl_core/us_connection_base.py @@ -202,11 +202,16 @@ class DataModel(ConnectionDataModelBase): sample_table_name: Optional[str] = attr.ib(default=None) name: Optional[str] = attr.ib(default=None) data_export_forbidden: bool = attr.ib(default=False) + schema_version: str = attr.ib(default="1") @property def data_export_forbidden(self) -> bool: return self.data.data_export_forbidden if hasattr(self.data, "data_export_forbidden") else False + @property + def schema_version(self) -> str: + return self.data.schema_version + @classmethod def get_provided_source_types(cls) -> frozenset[DataSourceType]: if cls.allowed_source_types is not None: diff --git a/lib/dl_core/dl_core/us_dataset.py b/lib/dl_core/dl_core/us_dataset.py index 2c3cf8bb8..7313f1eda 100644 --- a/lib/dl_core/dl_core/us_dataset.py +++ b/lib/dl_core/dl_core/us_dataset.py @@ -65,6 +65,7 @@ class DataModel(BaseAttrsDataModel): name: str = attr.ib() revision_id: Optional[str] = attr.ib(default=None) load_preview_by_default: Optional[bool] = attr.ib(default=True) + schema_version: str = attr.ib(default="1") result_schema: ResultSchema = attr.ib(factory=ResultSchema) source_collections: list[DataSourceCollectionSpecBase] = attr.ib(factory=list) source_avatars: list[multisource.SourceAvatar] = attr.ib(factory=list) @@ -220,3 +221,7 @@ def load_preview_by_default(self) -> Optional[bool]: def rename_field_id_usages(self, old_id: str, new_id: str) -> None: self.error_registry.rename_pack(old_id=old_id, new_id=new_id) + + @property + def schema_version(self) -> str: + return self.data.schema_version diff --git a/lib/dl_core/dl_core/us_manager/storage_schemas/connection.py b/lib/dl_core/dl_core/us_manager/storage_schemas/connection.py index cd9a7d1e1..90b167e51 100644 --- a/lib/dl_core/dl_core/us_manager/storage_schemas/connection.py +++ b/lib/dl_core/dl_core/us_manager/storage_schemas/connection.py @@ -29,6 +29,7 @@ class ConnectionBaseDataStorageSchema(BaseConnectionDataStorageSchema[_CB_DATA_T sample_table_name = ma_fields.String(required=False, allow_none=True, load_default=None, dump_default=None) name = ma_fields.String(required=False, allow_none=True, load_default=None, dump_default=None) data_export_forbidden = ma_fields.Boolean(required=False, allow_none=False, load_default=False, dump_default=False) + schema_version = ma_fields.String(required=False, allow_none=False, load_default="1", dump_default="1") class CacheableConnectionDataSchemaMixin: diff --git a/lib/dl_core/dl_core/us_manager/storage_schemas/dataset.py b/lib/dl_core/dl_core/us_manager/storage_schemas/dataset.py index 456d3a99f..87c46f147 100644 --- a/lib/dl_core/dl_core/us_manager/storage_schemas/dataset.py +++ b/lib/dl_core/dl_core/us_manager/storage_schemas/dataset.py @@ -444,3 +444,4 @@ class DatasetStorageSchema(DefaultStorageSchema): rls = ma_fields.Nested(RLSSchema, allow_none=False) component_errors = ma_fields.Nested(ComponentErrorListSchema) obligatory_filters = ma_fields.List(ma_fields.Nested(ObligatoryFilterSchema)) + schema_version = ma_fields.String(required=False, allow_none=False, load_default="1", dump_default="1") diff --git a/lib/dl_maintenance/dl_maintenance/api/crawlers/schema_version_migration.py b/lib/dl_maintenance/dl_maintenance/api/crawlers/schema_version_migration.py new file mode 100644 index 000000000..4b1bae67b --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/api/crawlers/schema_version_migration.py @@ -0,0 +1,39 @@ +from typing import ( + Any, + AsyncIterable, + Optional, +) + +import attr + +from dl_core.us_entry import ( + USEntry, + USMigrationEntry, +) +from dl_core.us_manager.us_manager_async import AsyncUSManager +from dl_maintenance.core.us_crawler_base import USEntryCrawler + + +ALLOWED_ENTRY_SCOPES = ("connection", "dataset") + + +@attr.s +class SchemaVersionCrawler(USEntryCrawler): + ENTRY_TYPE = USMigrationEntry + + entry_scope: str = attr.ib(kw_only=True, validator=attr.validators.in_(ALLOWED_ENTRY_SCOPES)) + + def get_raw_entry_iterator(self, crawl_all_tenants: bool = True) -> AsyncIterable[dict[str, Any]]: + return self.usm.get_raw_collection( + entry_scope=self.entry_scope, + all_tenants=crawl_all_tenants, + ) + + async def process_entry_get_save_flag( + self, entry: USEntry, logging_extra: dict[str, Any], usm: Optional[AsyncUSManager] = None + ) -> tuple[bool, str]: + if entry.data.get("schema_version") is None: + entry.data["schema_version"] = "1" + return True, "Schema version is set" + + return False, "Entry skipped"