From 9aebf2a69a53528b19325ce10bca038975143fa3 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 12 Nov 2024 11:46:59 +0100 Subject: [PATCH 01/27] Move TableOwnership to separate module --- .../labs/ucx/contexts/application.py | 3 +- .../labs/ucx/hive_metastore/ownership.py | 76 +--------------- .../ucx/hive_metastore/table_ownership.py | 89 +++++++++++++++++++ src/databricks/labs/ucx/progress/tables.py | 2 +- .../unit/hive_metastore/test_table_migrate.py | 3 +- tests/unit/hive_metastore/test_tables.py | 2 +- 6 files changed, 96 insertions(+), 79 deletions(-) create mode 100644 src/databricks/labs/ucx/hive_metastore/table_ownership.py diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 889fba1eec..0e878583d5 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -44,7 +44,8 @@ ) from databricks.labs.ucx.hive_metastore.mapping import TableMapping from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.table_migrate import ( TableMigrationStatusRefresher, TablesMigrator, diff --git a/src/databricks/labs/ucx/hive_metastore/ownership.py b/src/databricks/labs/ucx/hive_metastore/ownership.py index b11f5f6e81..6ebaef637d 100644 --- a/src/databricks/labs/ucx/hive_metastore/ownership.py +++ b/src/databricks/labs/ucx/hive_metastore/ownership.py @@ -1,90 +1,16 @@ import logging -from functools import cached_property from databricks.labs.ucx.framework.owners import ( Ownership, - AdministratorLocator, - LegacyQueryOwnership, - WorkspacePathOwnership, ) from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.source_code.base import UsedTable -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler logger = logging.getLogger(__name__) -class TableOwnership(Ownership[Table]): - """Determine ownership of tables in the inventory based on the following rules: - - If a table is owned by a principal in the grants table, then that principal is the owner. - - If a table is written to by a query, then the owner of that query is the owner of the table. - - If a table is written to by a notebook or file, then the owner of the path is the owner of the table. - """ - - def __init__( - self, - administrator_locator: AdministratorLocator, - grants_crawler: GrantsCrawler, - used_tables_in_paths: UsedTablesCrawler, - used_tables_in_queries: UsedTablesCrawler, - legacy_query_ownership: LegacyQueryOwnership, - workspace_path_ownership: WorkspacePathOwnership, - ) -> None: - super().__init__(administrator_locator) - self._grants_crawler = grants_crawler - self._used_tables_in_paths = used_tables_in_paths - self._used_tables_in_queries = used_tables_in_queries - self._legacy_query_ownership = legacy_query_ownership - self._workspace_path_ownership = workspace_path_ownership - - def _maybe_direct_owner(self, record: Table) -> str | None: - owner = self._maybe_from_grants(record) - if owner: - return owner - return self._maybe_from_sources(record) - - def _maybe_from_sources(self, record: Table) -> str | None: - used_table = self._used_tables_snapshot.get((record.catalog, record.database, record.name)) - if not used_table: - return None - # If something writes to a table, then it's an owner of it - if not used_table.is_write: - return None - if used_table.source_type == 'QUERY' and used_table.query_id: - return self._legacy_query_ownership.owner_of(used_table.query_id) - if used_table.source_type in {'NOTEBOOK', 'FILE'}: - return self._workspace_path_ownership.owner_of_path(used_table.source_id) - logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") - return None - - @cached_property - def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: - index = {} - for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): - for used_table in collection: - key = used_table.catalog_name, used_table.schema_name, used_table.table_name - index[key] = used_table - return index - - def _maybe_from_grants(self, record: Table) -> str | None: - for grant in self._grants_snapshot: - if not grant.action_type == 'OWN': - continue - object_type, full_name = grant.this_type_and_key() - if object_type == 'TABLE' and full_name == record.key: - return grant.principal - if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}": - return grant.principal - return None - - @cached_property - def _grants_snapshot(self): - return self._grants_crawler.snapshot() - - class TableMigrationOwnership(Ownership[TableMigrationStatus]): """Determine ownership of table migration records in the inventory. diff --git a/src/databricks/labs/ucx/hive_metastore/table_ownership.py b/src/databricks/labs/ucx/hive_metastore/table_ownership.py new file mode 100644 index 0000000000..daf6034fa9 --- /dev/null +++ b/src/databricks/labs/ucx/hive_metastore/table_ownership.py @@ -0,0 +1,89 @@ +""" +Separate table ownership module to resolve circular dependency for `:mod:hive_metastore.grants` using `:class:Table` +and `:class:TableOwnership` using the `GrantsCrawler`. +""" + +import logging +from functools import cached_property + +from databricks.labs.ucx.framework.owners import ( + Ownership, + AdministratorLocator, + LegacyQueryOwnership, + WorkspacePathOwnership, +) +from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler +from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.source_code.base import UsedTable +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler + + +logger = logging.getLogger(__name__) + + +class TableOwnership(Ownership[Table]): + """Determine ownership of tables in the inventory based on the following rules: + - If a table is owned by a principal in the grants table, then that principal is the owner. + - If a table is written to by a query, then the owner of that query is the owner of the table. + - If a table is written to by a notebook or file, then the owner of the path is the owner of the table. + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + grants_crawler: GrantsCrawler, + used_tables_in_paths: UsedTablesCrawler, + used_tables_in_queries: UsedTablesCrawler, + legacy_query_ownership: LegacyQueryOwnership, + workspace_path_ownership: WorkspacePathOwnership, + ) -> None: + super().__init__(administrator_locator) + self._grants_crawler = grants_crawler + self._used_tables_in_paths = used_tables_in_paths + self._used_tables_in_queries = used_tables_in_queries + self._legacy_query_ownership = legacy_query_ownership + self._workspace_path_ownership = workspace_path_ownership + + def _maybe_direct_owner(self, record: Table) -> str | None: + owner = self._maybe_from_grants(record) + if owner: + return owner + return self._maybe_from_sources(record) + + def _maybe_from_sources(self, record: Table) -> str | None: + used_table = self._used_tables_snapshot.get((record.catalog, record.database, record.name)) + if not used_table: + return None + # If something writes to a table, then it's an owner of it + if not used_table.is_write: + return None + if used_table.source_type == 'QUERY' and used_table.query_id: + return self._legacy_query_ownership.owner_of(used_table.query_id) + if used_table.source_type in {'NOTEBOOK', 'FILE'}: + return self._workspace_path_ownership.owner_of_path(used_table.source_id) + logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") + return None + + @cached_property + def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: + index = {} + for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): + for used_table in collection: + key = used_table.catalog_name, used_table.schema_name, used_table.table_name + index[key] = used_table + return index + + def _maybe_from_grants(self, record: Table) -> str | None: + for grant in self._grants_snapshot: + if not grant.action_type == 'OWN': + continue + object_type, full_name = grant.this_type_and_key() + if object_type == 'TABLE' and full_name == record.key: + return grant.principal + if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}": + return grant.principal + return None + + @cached_property + def _grants_snapshot(self): + return self._grants_crawler.snapshot() diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 6dc76132e2..22378030bf 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -4,7 +4,7 @@ from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.hive_metastore.ownership import TableOwnership +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 5b7e7a66a4..e3a32a092b 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -31,7 +31,8 @@ TableMigrationStatus, TableView, ) -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.tables import ( Table, TablesCrawler, diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 057389d49b..9b69f14c39 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -20,7 +20,7 @@ TablesCrawler, What, ) -from databricks.labs.ucx.hive_metastore.ownership import TableOwnership +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler From 1add260f7a5a0259b1d294ae0a46ae285ac3e320 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 12 Nov 2024 11:52:46 +0100 Subject: [PATCH 02/27] Move table migration ownership to table_migrate module --- .../labs/ucx/contexts/application.py | 2 +- .../labs/ucx/hive_metastore/ownership.py | 37 ------------------- .../labs/ucx/hive_metastore/table_migrate.py | 35 +++++++++++++++++- .../hive_metastore/test_table_migrate.py | 2 +- .../unit/hive_metastore/test_table_migrate.py | 2 +- 5 files changed, 36 insertions(+), 42 deletions(-) delete mode 100644 src/databricks/labs/ucx/hive_metastore/ownership.py diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 0e878583d5..b97f887277 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -44,11 +44,11 @@ ) from databricks.labs.ucx.hive_metastore.mapping import TableMapping from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.table_migrate import ( TableMigrationStatusRefresher, TablesMigrator, + TableMigrationOwnership, ) from databricks.labs.ucx.hive_metastore.table_move import TableMove from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership diff --git a/src/databricks/labs/ucx/hive_metastore/ownership.py b/src/databricks/labs/ucx/hive_metastore/ownership.py deleted file mode 100644 index 6ebaef637d..0000000000 --- a/src/databricks/labs/ucx/hive_metastore/ownership.py +++ /dev/null @@ -1,37 +0,0 @@ -import logging - -from databricks.labs.ucx.framework.owners import ( - Ownership, -) -from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus -from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership -from databricks.labs.ucx.hive_metastore.tables import Table - -logger = logging.getLogger(__name__) - - -class TableMigrationOwnership(Ownership[TableMigrationStatus]): - """Determine ownership of table migration records in the inventory. - - This is the owner of the source table, if (and only if) the source table is present in the inventory. - """ - - def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: - super().__init__(table_ownership._administrator_locator) # TODO: Fix this - self._tables_crawler = tables_crawler - self._table_ownership = table_ownership - self._indexed_tables: dict[tuple[str, str], Table] | None = None - - def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: - index = self._indexed_tables - if index is None or reindex: - snapshot = self._tables_crawler.snapshot() - index = {(table.database, table.name): table for table in snapshot} - self._indexed_tables = index - return index - - def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: - index = self._tables_snapshot_index() - source_table = index.get((record.src_schema, record.src_table), None) - return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 3dedc12e78..be576e5ce0 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -9,6 +9,7 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.errors.platform import DatabricksError +from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.grants import MigrateGrants @@ -18,8 +19,12 @@ TableMapping, TableToMigrate, ) - -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationStatusRefresher, + TableMigrationIndex, + TableMigrationStatus, +) +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.tables import ( MigrationCount, Table, @@ -594,3 +599,29 @@ def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int): f"('upgraded_from' = '{source}'" f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');" ) + + +class TableMigrationOwnership(Ownership[TableMigrationStatus]): + """Determine ownership of table migration records in the inventory. + + This is the owner of the source table, if (and only if) the source table is present in the inventory. + """ + + def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: + super().__init__(table_ownership._administrator_locator) # TODO: Fix this + self._tables_crawler = tables_crawler + self._table_ownership = table_ownership + self._indexed_tables: dict[tuple[str, str], Table] | None = None + + def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: + index = self._indexed_tables + if index is None or reindex: + snapshot = self._tables_crawler.snapshot() + index = {(table.database, table.name): table for table in snapshot} + self._indexed_tables = index + return index + + def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: + index = self._tables_snapshot_index() + source_table = index.get((record.src_schema, record.src_table), None) + return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/tests/integration/hive_metastore/test_table_migrate.py b/tests/integration/hive_metastore/test_table_migrate.py index 61f87ac8b2..b8db48bd33 100644 --- a/tests/integration/hive_metastore/test_table_migrate.py +++ b/tests/integration/hive_metastore/test_table_migrate.py @@ -5,7 +5,7 @@ TableMigrationStatus, TableMigrationStatusRefresher, ) -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership +from databricks.labs.ucx.hive_metastore.table_migrate import TableMigrationOwnership def test_table_migration_ownership(ws, runtime_ctx, inventory_schema, sql_backend) -> None: diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index e3a32a092b..cfb9bf7399 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -24,6 +24,7 @@ ) from databricks.labs.ucx.hive_metastore.table_migrate import ( TablesMigrator, + TableMigrationOwnership, ) from databricks.labs.ucx.hive_metastore.table_migration_status import ( TableMigrationStatusRefresher, @@ -31,7 +32,6 @@ TableMigrationStatus, TableView, ) -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.tables import ( Table, From a989e73e4bf1688807f8e989fcae7d87bc7600aa Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 12 Nov 2024 12:09:03 +0100 Subject: [PATCH 03/27] Add UsedTableOwnership --- .../labs/ucx/source_code/used_table.py | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index b5cdb77c0b..fa3265fdf3 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -2,12 +2,19 @@ import logging from collections.abc import Sequence, Iterable +from functools import cached_property from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.lsql.backends import SqlBackend from databricks.sdk.errors import DatabricksError from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.framework.owners import ( + AdministratorLocator, + LegacyQueryOwnership, + Ownership, + WorkspacePathOwnership, +) from databricks.labs.ucx.source_code.base import UsedTable logger = logging.getLogger(__name__) @@ -52,3 +59,44 @@ def _try_fetch(self) -> Iterable[UsedTable]: def _crawl(self) -> Iterable[UsedTable]: return [] # TODO raise NotImplementedError() once CrawlerBase supports empty snapshots + + +class UsedTableOwnership(Ownership[UsedTable]): + """Used table ownership.""" + + def __init__( + self, + administrator_locator: AdministratorLocator, + used_tables_in_paths: UsedTablesCrawler, + used_tables_in_queries: UsedTablesCrawler, + legacy_query_ownership: LegacyQueryOwnership, + workspace_path_ownership: WorkspacePathOwnership, + ) -> None: + super().__init__(administrator_locator) + self._used_tables_in_paths = used_tables_in_paths + self._used_tables_in_queries = used_tables_in_queries + self._legacy_query_ownership = legacy_query_ownership + self._workspace_path_ownership = workspace_path_ownership + + @cached_property + def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: + index = {} + for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): + for used_table in collection: + key = used_table.catalog_name, used_table.schema_name, used_table.table_name + index[key] = used_table + return index + + def _maybe_direct_owner(self, record: UsedTable) -> str | None: + used_table = self._used_tables_snapshot.get((record.catalog_name, record.schema_name, record.table_name)) + if not used_table: + return None + # If something writes to a table, then it's an owner of it + if not used_table.is_write: + return None + if used_table.source_type == 'QUERY' and used_table.query_id: + return self._legacy_query_ownership.owner_of(used_table.query_id) + if used_table.source_type in {'NOTEBOOK', 'FILE'}: + return self._workspace_path_ownership.owner_of_path(used_table.source_id) + logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") + return None From b26bde943ae2e7ce75b51d2bd807e6a10d4d4992 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 12 Nov 2024 12:32:48 +0100 Subject: [PATCH 04/27] Test UsedTableOwnership to fall back on workspace administrator --- .../source_code/test_used_tables.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 tests/integration/source_code/test_used_tables.py diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py new file mode 100644 index 0000000000..a85e8f405c --- /dev/null +++ b/tests/integration/source_code/test_used_tables.py @@ -0,0 +1,28 @@ +from unittest.mock import create_autospec + +from databricks.labs.ucx.framework.owners import ( + AdministratorLocator, + LegacyQueryOwnership, + WorkspacePathOwnership +) +from databricks.labs.ucx.source_code.base import UsedTable +from databricks.labs.ucx.source_code.used_table import UsedTableOwnership, UsedTablesCrawler + + +def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot() -> None: + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(UsedTable()) + + assert owner == "John Doe" From 0f90dc5f5ff6a579459c3b1af2e27d5ffe809701 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 12 Nov 2024 14:28:07 +0100 Subject: [PATCH 05/27] Test UsedTableOwnership to fallback on workspace administrator if not write --- .../source_code/test_used_tables.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index a85e8f405c..5c1f35ef7e 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -26,3 +26,22 @@ def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot( owner = ownership.owner_of(UsedTable()) assert owner == "John Doe" + + +def test_used_table_ownership_is_workspace_admin_if_not_write() -> None: + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(UsedTable(is_write=False)) + + assert owner == "John Doe" From cd758f0675f0ba9bef9a4e4d6569477e4814fcec Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 12 Nov 2024 15:28:20 +0100 Subject: [PATCH 06/27] Test UsedTableOwnership gets owner from query --- .../source_code/test_used_tables.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index 5c1f35ef7e..a62a7cbae4 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -5,7 +5,7 @@ LegacyQueryOwnership, WorkspacePathOwnership ) -from databricks.labs.ucx.source_code.base import UsedTable +from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable from databricks.labs.ucx.source_code.used_table import UsedTableOwnership, UsedTablesCrawler @@ -45,3 +45,33 @@ def test_used_table_ownership_is_workspace_admin_if_not_write() -> None: owner = ownership.owner_of(UsedTable(is_write=False)) assert owner == "John Doe" + + +def test_used_table_ownership_from_query() -> None: + used_table = UsedTable( + catalog_name="test-catalog", + schema_name="test-schema", + table_name="test-table", + is_write=True, + source_id="test", + source_lineage=[LineageAtom(object_type="QUERY", object_id="dashboard/query")], + ) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + legacy_query_ownership.owner_of.side_effect = lambda id_: "Query Owner" if id_ == "query" else None + + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(used_table) + + assert owner == "Query Owner" From 6398f39e210ae6b796da0e126d4ad46581525423 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 09:45:05 +0100 Subject: [PATCH 07/27] Format --- tests/integration/source_code/test_used_tables.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index a62a7cbae4..f49aec653d 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -1,10 +1,6 @@ from unittest.mock import create_autospec -from databricks.labs.ucx.framework.owners import ( - AdministratorLocator, - LegacyQueryOwnership, - WorkspacePathOwnership -) +from databricks.labs.ucx.framework.owners import AdministratorLocator, LegacyQueryOwnership, WorkspacePathOwnership from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable from databricks.labs.ucx.source_code.used_table import UsedTableOwnership, UsedTablesCrawler From e91a2837efe6f678bde6779713fefdc627f06d9c Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 09:50:30 +0100 Subject: [PATCH 08/27] Move used table to fixture --- .../source_code/test_used_tables.py | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index f49aec653d..a6a21fa8d4 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -1,14 +1,30 @@ +import dataclasses from unittest.mock import create_autospec +import pytest + from databricks.labs.ucx.framework.owners import AdministratorLocator, LegacyQueryOwnership, WorkspacePathOwnership from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable from databricks.labs.ucx.source_code.used_table import UsedTableOwnership, UsedTablesCrawler -def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot() -> None: +@pytest.fixture +def used_table() -> UsedTable: + return UsedTable( + catalog_name="test-catalog", + schema_name="test-schema", + table_name="test-table", + is_write=True, + source_id="test", + source_lineage=[LineageAtom(object_type="QUERY", object_id="dashboard/query")], + ) + + +def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot(used_table: UsedTable) -> None: administrator_locator = create_autospec(AdministratorLocator) administrator_locator.get_workspace_administrator.return_value = "John Doe" used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) workspace_path_ownership = create_autospec(WorkspacePathOwnership) ownership = UsedTableOwnership( @@ -24,10 +40,12 @@ def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot( assert owner == "John Doe" -def test_used_table_ownership_is_workspace_admin_if_not_write() -> None: +def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTable) -> None: + used_table = dataclasses.replace(used_table, is_write=False) administrator_locator = create_autospec(AdministratorLocator) administrator_locator.get_workspace_administrator.return_value = "John Doe" used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) workspace_path_ownership = create_autospec(WorkspacePathOwnership) ownership = UsedTableOwnership( @@ -38,26 +56,18 @@ def test_used_table_ownership_is_workspace_admin_if_not_write() -> None: workspace_path_ownership, ) - owner = ownership.owner_of(UsedTable(is_write=False)) + owner = ownership.owner_of(used_table) assert owner == "John Doe" -def test_used_table_ownership_from_query() -> None: - used_table = UsedTable( - catalog_name="test-catalog", - schema_name="test-schema", - table_name="test-table", - is_write=True, - source_id="test", - source_lineage=[LineageAtom(object_type="QUERY", object_id="dashboard/query")], - ) +def test_used_table_ownership_from_query(used_table: UsedTable) -> None: administrator_locator = create_autospec(AdministratorLocator) administrator_locator.get_workspace_administrator.return_value = "John Doe" used_tables_crawler = create_autospec(UsedTablesCrawler) used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) - legacy_query_ownership.owner_of.side_effect = lambda id_: "Query Owner" if id_ == "query" else None + legacy_query_ownership.owner_of.side_effect = lambda id_: "Query Owner" if id_ == used_table.query_id else None workspace_path_ownership = create_autospec(WorkspacePathOwnership) ownership = UsedTableOwnership( From 9d90fb38d4754dd21b1cf823d18614dc2199c83d Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 09:59:41 +0100 Subject: [PATCH 09/27] Test propagating used table ownership from code --- src/databricks/labs/ucx/framework/owners.py | 2 +- tests/integration/source_code/test_used_tables.py | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index 55a1ddac98..4dd089cff6 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -201,7 +201,7 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli super().__init__(administrator_locator) self._ws = ws - def owner_of_path(self, path: str) -> str: + def owner_of_path(self, path: str) -> str: # TODO: Why is this `owner_of_path` and not return self.owner_of(WorkspacePath(self._ws, path)) @retried(on=[InternalError], timeout=timedelta(minutes=1)) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index a6a21fa8d4..a43bad1882 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -61,15 +61,19 @@ def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTa assert owner == "John Doe" -def test_used_table_ownership_from_query(used_table: UsedTable) -> None: +@pytest.mark.parametrize("object_type", ["QUERY", "NOTEBOOK", "FILE"]) +def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) -> None: + source_lineage = [LineageAtom(object_type=object_type, object_id="dashboard/query")] + used_table = dataclasses.replace(used_table, source_lineage=source_lineage) administrator_locator = create_autospec(AdministratorLocator) administrator_locator.get_workspace_administrator.return_value = "John Doe" used_tables_crawler = create_autospec(UsedTablesCrawler) used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) - legacy_query_ownership.owner_of.side_effect = lambda id_: "Query Owner" if id_ == used_table.query_id else None - + legacy_query_ownership.owner_of.side_effect = lambda id_: "Mary Jane" if id_ == used_table.query_id else None workspace_path_ownership = create_autospec(WorkspacePathOwnership) + workspace_path_ownership.owner_of_path.side_effect = lambda id_: "Mary Jane" if id_ == used_table.source_id else None + ownership = UsedTableOwnership( administrator_locator, used_tables_crawler, @@ -80,4 +84,4 @@ def test_used_table_ownership_from_query(used_table: UsedTable) -> None: owner = ownership.owner_of(used_table) - assert owner == "Query Owner" + assert owner == "Mary Jane" From 45cdcd58f6651c46bf2692af758ea92a070f8ef8 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 10:02:49 +0100 Subject: [PATCH 10/27] Test unknown code source type fallback to workspace admin --- .../labs/ucx/source_code/used_table.py | 2 +- .../source_code/test_used_tables.py | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index fa3265fdf3..5270b4e15d 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -98,5 +98,5 @@ def _maybe_direct_owner(self, record: UsedTable) -> str | None: return self._legacy_query_ownership.owner_of(used_table.query_id) if used_table.source_type in {'NOTEBOOK', 'FILE'}: return self._workspace_path_ownership.owner_of_path(used_table.source_id) - logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") + logger.warning(f"Unknown source type '{used_table.source_type}' for {used_table.source_id}") return None diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index a43bad1882..819028aef6 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -1,4 +1,5 @@ import dataclasses +import logging from unittest.mock import create_autospec import pytest @@ -85,3 +86,30 @@ def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) owner = ownership.owner_of(used_table) assert owner == "Mary Jane" + + +def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTable) -> None: + source_lineage = [LineageAtom(object_type="UNKNOWN", object_id="dashboard/query")] + used_table = dataclasses.replace(used_table, source_lineage=source_lineage) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + legacy_query_ownership.owner_of.side_effect = lambda id_: "Mary Jane" if id_ == used_table.query_id else None + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + workspace_path_ownership.owner_of_path.side_effect = lambda id_: "Mary Jane" if id_ == used_table.source_id else None + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.source_code.used_table"): + owner = ownership.owner_of(used_table) + assert owner == "John Doe" + assert f"Unknown source type 'UNKNOWN' for {used_table.source_id}" in caplog.messages From 14ebeec7f3a2acff6b400ddb50dfebbde67a2db3 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 10:04:22 +0100 Subject: [PATCH 11/27] Format --- tests/integration/source_code/test_used_tables.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index 819028aef6..a68aa9f2b8 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -28,6 +28,7 @@ def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot( used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = UsedTableOwnership( administrator_locator, used_tables_crawler, @@ -49,6 +50,7 @@ def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTa used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) workspace_path_ownership = create_autospec(WorkspacePathOwnership) + ownership = UsedTableOwnership( administrator_locator, used_tables_crawler, @@ -71,9 +73,9 @@ def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) used_tables_crawler = create_autospec(UsedTablesCrawler) used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) - legacy_query_ownership.owner_of.side_effect = lambda id_: "Mary Jane" if id_ == used_table.query_id else None + legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None workspace_path_ownership = create_autospec(WorkspacePathOwnership) - workspace_path_ownership.owner_of_path.side_effect = lambda id_: "Mary Jane" if id_ == used_table.source_id else None + workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None ownership = UsedTableOwnership( administrator_locator, @@ -96,9 +98,9 @@ def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTab used_tables_crawler = create_autospec(UsedTablesCrawler) used_tables_crawler.snapshot.return_value = [used_table] legacy_query_ownership = create_autospec(LegacyQueryOwnership) - legacy_query_ownership.owner_of.side_effect = lambda id_: "Mary Jane" if id_ == used_table.query_id else None + legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None workspace_path_ownership = create_autospec(WorkspacePathOwnership) - workspace_path_ownership.owner_of_path.side_effect = lambda id_: "Mary Jane" if id_ == used_table.source_id else None + workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None ownership = UsedTableOwnership( administrator_locator, @@ -108,7 +110,6 @@ def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTab workspace_path_ownership, ) - with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.source_code.used_table"): owner = ownership.owner_of(used_table) assert owner == "John Doe" From 5fd8a63c81f7b5be7e79f952e047ee55fd723025 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 10:10:18 +0100 Subject: [PATCH 12/27] Add asserts on autospec mocks --- .../integration/source_code/test_used_tables.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index a68aa9f2b8..83e43bc383 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -40,6 +40,9 @@ def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot( owner = ownership.owner_of(UsedTable()) assert owner == "John Doe" + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTable) -> None: @@ -62,6 +65,9 @@ def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTa owner = ownership.owner_of(used_table) assert owner == "John Doe" + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() @pytest.mark.parametrize("object_type", ["QUERY", "NOTEBOOK", "FILE"]) @@ -88,6 +94,14 @@ def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) owner = ownership.owner_of(used_table) assert owner == "Mary Jane" + administrator_locator.get_workspace_administrator.assert_not_called() + if object_type == "QUERY": + legacy_query_ownership.owner_of.assert_called_once_with(used_table.query_id) + workspace_path_ownership.owner_of_path.assert_not_called() + else: + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_called_once_with(used_table.source_id) + def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTable) -> None: @@ -114,3 +128,6 @@ def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTab owner = ownership.owner_of(used_table) assert owner == "John Doe" assert f"Unknown source type 'UNKNOWN' for {used_table.source_id}" in caplog.messages + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() From a6c5deb1c710f61004fd8f2b48a7f01e7568bbb7 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 10:16:04 +0100 Subject: [PATCH 13/27] Add docstring to UsedTableOwnership --- src/databricks/labs/ucx/source_code/used_table.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index 5270b4e15d..7d61dd57ff 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -62,7 +62,10 @@ def _crawl(self) -> Iterable[UsedTable]: class UsedTableOwnership(Ownership[UsedTable]): - """Used table ownership.""" + """Used table ownership. + + Ownership of code resources propagate to the `UsedTable` if that resource writes to the table. + """ def __init__( self, From 059d0f7639525da7b22668bd63ea0ad024c92fc6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 10:17:00 +0100 Subject: [PATCH 14/27] Format --- tests/integration/source_code/test_used_tables.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py index 83e43bc383..3e5dd5c439 100644 --- a/tests/integration/source_code/test_used_tables.py +++ b/tests/integration/source_code/test_used_tables.py @@ -103,7 +103,6 @@ def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) workspace_path_ownership.owner_of_path.assert_called_once_with(used_table.source_id) - def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTable) -> None: source_lineage = [LineageAtom(object_type="UNKNOWN", object_id="dashboard/query")] used_table = dataclasses.replace(used_table, source_lineage=source_lineage) From 35177f8863a934cb279c6f335647e50309899c2a Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 10:57:38 +0100 Subject: [PATCH 15/27] Add `from_table` classmethod to UsedTable --- src/databricks/labs/ucx/source_code/base.py | 14 +++++++++++++- tests/integration/source_code/test_base.py | 14 ++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 tests/integration/source_code/test_base.py diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 659b38b2b7..306ebaa5cf 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -18,9 +18,10 @@ from databricks.sdk.service import compute from databricks.sdk.service.workspace import Language - from databricks.labs.blueprint.paths import WorkspacePath +from databricks.labs.ucx.hive_metastore.tables import Table + if sys.version_info >= (3, 11): from typing import Self @@ -260,6 +261,17 @@ def parse(cls, value: str, default_schema: str, is_read=True, is_write=False) -> catalog_name=catalog_name, schema_name=schema_name, table_name=parts[0], is_read=is_read, is_write=is_write ) + @classmethod + def from_table(cls, table: Table, *, is_read: bool, is_write: bool) -> UsedTable: + """Create a `:class:UsedTable` from a Hive `:class:Table`.""" + return cls( + catalog_name=table.catalog, + schema_name=table.database, + table_name=table.name, + is_read=is_read, + is_write=is_write, + ) + catalog_name: str = SourceInfo.UNKNOWN schema_name: str = SourceInfo.UNKNOWN table_name: str = SourceInfo.UNKNOWN diff --git a/tests/integration/source_code/test_base.py b/tests/integration/source_code/test_base.py new file mode 100644 index 0000000000..e92c6b05dd --- /dev/null +++ b/tests/integration/source_code/test_base.py @@ -0,0 +1,14 @@ +from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.source_code.used_table import UsedTable + + +def test_used_table_from_table() -> None: + table = Table("catalog", "schema", "table", "MANAGED", "DELTA") + + used_table = UsedTable.from_table(table, is_read=False, is_write=True) + + assert used_table.catalog_name == "catalog" + assert used_table.schema_name == "schema" + assert used_table.table_name == "table" + assert not used_table.is_read + assert used_table.is_write From a0f016ece9f4db2d39b26ed0835c7c46058f6b04 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 11:16:25 +0100 Subject: [PATCH 16/27] Reuse UsedTableOwnership in TableOwnership --- .../labs/ucx/contexts/application.py | 11 ++-- .../ucx/hive_metastore/table_ownership.py | 56 +++++-------------- tests/unit/hive_metastore/test_tables.py | 10 +++- 3 files changed, 27 insertions(+), 50 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index b97f887277..fb5f99c699 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -19,7 +19,7 @@ from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccessOwnership from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler, UsedTableOwnership from databricks.sdk import AccountClient, WorkspaceClient, core from databricks.sdk.service import sql @@ -263,16 +263,19 @@ def tables_crawler(self) -> TablesCrawler: return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases) @cached_property - def table_ownership(self) -> TableOwnership: - return TableOwnership( + def used_table_ownership(self) -> UsedTableOwnership: + return UsedTableOwnership( self.administrator_locator, - self.grants_crawler, self.used_tables_crawler_for_paths, self.used_tables_crawler_for_queries, self.legacy_query_ownership, self.workspace_path_ownership, ) + @cached_property + def table_ownership(self) -> TableOwnership: + return TableOwnership(self.administrator_locator, self.grants_crawler, self.used_table_ownership) + @cached_property def workspace_path_ownership(self) -> WorkspacePathOwnership: return WorkspacePathOwnership(self.administrator_locator, self.workspace_client) diff --git a/src/databricks/labs/ucx/hive_metastore/table_ownership.py b/src/databricks/labs/ucx/hive_metastore/table_ownership.py index daf6034fa9..dc5e395686 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_ownership.py +++ b/src/databricks/labs/ucx/hive_metastore/table_ownership.py @@ -6,72 +6,42 @@ import logging from functools import cached_property -from databricks.labs.ucx.framework.owners import ( - Ownership, - AdministratorLocator, - LegacyQueryOwnership, - WorkspacePathOwnership, -) +from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.source_code.base import UsedTable -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.source_code.used_table import UsedTableOwnership logger = logging.getLogger(__name__) class TableOwnership(Ownership[Table]): - """Determine ownership of tables in the inventory based on the following rules: - - If a table is owned by a principal in the grants table, then that principal is the owner. - - If a table is written to by a query, then the owner of that query is the owner of the table. - - If a table is written to by a notebook or file, then the owner of the path is the owner of the table. + """Table ownership + + Determine ownership of tables in the inventory based on the following rules: + - If a table is owned by a principal through grants, then that principal is the owner. + - Otherwise, fallback on the `UsedTableOwnership`. """ def __init__( self, administrator_locator: AdministratorLocator, grants_crawler: GrantsCrawler, - used_tables_in_paths: UsedTablesCrawler, - used_tables_in_queries: UsedTablesCrawler, - legacy_query_ownership: LegacyQueryOwnership, - workspace_path_ownership: WorkspacePathOwnership, + used_table_ownership: UsedTableOwnership, ) -> None: super().__init__(administrator_locator) self._grants_crawler = grants_crawler - self._used_tables_in_paths = used_tables_in_paths - self._used_tables_in_queries = used_tables_in_queries - self._legacy_query_ownership = legacy_query_ownership - self._workspace_path_ownership = workspace_path_ownership + self._used_table_ownership = used_table_ownership def _maybe_direct_owner(self, record: Table) -> str | None: owner = self._maybe_from_grants(record) if owner: return owner - return self._maybe_from_sources(record) - - def _maybe_from_sources(self, record: Table) -> str | None: - used_table = self._used_tables_snapshot.get((record.catalog, record.database, record.name)) - if not used_table: - return None - # If something writes to a table, then it's an owner of it - if not used_table.is_write: - return None - if used_table.source_type == 'QUERY' and used_table.query_id: - return self._legacy_query_ownership.owner_of(used_table.query_id) - if used_table.source_type in {'NOTEBOOK', 'FILE'}: - return self._workspace_path_ownership.owner_of_path(used_table.source_id) - logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") - return None - - @cached_property - def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: - index = {} - for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): - for used_table in collection: - key = used_table.catalog_name, used_table.schema_name, used_table.table_name - index[key] = used_table - return index + # The `is_write` and `is_read` has no effect as the actual `UsedTable` definition comes from snapshots + used_table = UsedTable.from_table(record, is_read=False, is_write=False) + # This call defers the `administrator_locator` to the one of `UsedTableOwnership`, we expect them to be the same + return self._used_table_ownership.owner_of(used_table) def _maybe_from_grants(self, record: Table) -> str | None: for grant in self._grants_snapshot: diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 9b69f14c39..ea34c8cb24 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -21,7 +21,7 @@ What, ) from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler, UsedTableOwnership def test_is_delta_true(): @@ -688,14 +688,18 @@ def test_table_owner() -> None: legacy_query_ownership = create_autospec(LegacyQueryOwnership) workspace_path_ownership = create_autospec(WorkspacePathOwnership) - ownership = TableOwnership( + used_table_ownership = UsedTableOwnership( admin_locator, - grants_crawler, used_tables_in_paths, used_tables_in_queries, legacy_query_ownership, workspace_path_ownership, ) + ownership = TableOwnership( + admin_locator, + grants_crawler, + used_table_ownership, + ) table = Table(catalog="main", database="foo", name="bar", object_type="TABLE", table_format="DELTA") owner = ownership.owner_of(table) From 7d4b7129e92e0c0fadaee908082ff786162d97e3 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 11:22:48 +0100 Subject: [PATCH 17/27] Add UsedTableProgressEncoder --- src/databricks/labs/ucx/progress/tables.py | 42 ++++++++++++++- src/databricks/labs/ucx/source_code/base.py | 4 +- tests/unit/progress/test_tables.py | 58 ++++++++++++++++++++- 3 files changed, 101 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 22378030bf..27f71ae218 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -3,10 +3,11 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical +from databricks.labs.ucx.source_code.used_table import UsedTable, UsedTableOwnership class TableProgressEncoder(ProgressEncoder[Table]): @@ -52,3 +53,42 @@ def _encode_record_as_historical(self, record: Table) -> Historical: if not self._table_migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures) + + +class UsedTableProgressEncoder(ProgressEncoder[UsedTable]): + """Encoder class:UsedTable to class:History. + + A used table has a failure if its catalog is the hive_metastore. + """ + + def __init__( + self, + sql_backend: SqlBackend, + ownership: UsedTableOwnership, + run_id: int, + workspace_id: int, + catalog: str, + schema: str = "multiworkspace", + table: str = "historical", + ) -> None: + super().__init__( + sql_backend, + ownership, + UsedTable, + run_id, + workspace_id, + catalog, + schema, + table, + ) + + def _encode_record_as_historical(self, record: UsedTable) -> Historical: + """Encode record as historical. + + A table failure means that the table is pending migration. + """ + historical = super()._encode_record_as_historical(record) + failures = [] + if record.catalog_name == "hive_metastore": + failures.append("Pending migration") + return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 306ebaa5cf..b6418ea6bd 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Any, BinaryIO, TextIO +from typing import Any, BinaryIO, ClassVar, TextIO from astroid import NodeNG # type: ignore from sqlglot import Expression, parse as parse_sql @@ -278,6 +278,8 @@ def from_table(cls, table: Table, *, is_read: bool, is_write: bool) -> UsedTable is_read: bool = True is_write: bool = False + __id_attributes__: ClassVar[tuple[str, ...]] = ("catalog_name", "schema_name", "table_name") + class TableCollector(ABC): diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index a859bf5c03..76062df509 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -1,3 +1,4 @@ +import datetime as dt from unittest.mock import create_autospec import pytest @@ -7,7 +8,8 @@ from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.progress.grants import GrantProgressEncoder -from databricks.labs.ucx.progress.tables import TableProgressEncoder +from databricks.labs.ucx.progress.tables import TableProgressEncoder, UsedTableProgressEncoder +from databricks.labs.ucx.source_code.used_table import UsedTable @pytest.mark.parametrize( @@ -60,3 +62,57 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T ownership.owner_of.assert_called_once() table_migration_index.is_migrated.assert_called_with(table.database, table.name) grant_progress_encoder.assert_not_called() + + +@pytest.mark.parametrize( + "used_table", + [ + UsedTable( + catalog_name="catalog", + schema_name="schema", + table_name="table", + source_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), + ), + ], +) +def test_used_table_progress_encoder_no_failures(mock_backend, used_table: UsedTable) -> None: + """No failures when the table is not in the Hive metastore.""" + ownership = create_autospec(Ownership) + ownership.owner_of.return_value = "user" + encoder = UsedTableProgressEncoder(mock_backend, ownership, run_id=1, workspace_id=123456789, catalog="test") + + encoder.append_inventory_snapshot([used_table]) + + rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") + assert len(rows) > 0, f"No rows written for: {encoder.full_name}" + assert len(rows[0].failures) == 0 + ownership.owner_of.assert_called_once() + + +@pytest.mark.parametrize( + "used_table", + [ + UsedTable( + catalog_name="hive_metastore", + schema_name="schema", + table_name="table", + source_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), + ), + ], +) +def test_used_table_progress_encoder_pending_migration_failure(mock_backend, used_table: UsedTable) -> None: + """Failures when the table is in the Hive metastore.""" + ownership = create_autospec(Ownership) + ownership.owner_of.return_value = "user" + encoder = UsedTableProgressEncoder(mock_backend, ownership, run_id=1, workspace_id=123456789, catalog="test") + + encoder.append_inventory_snapshot([used_table]) + + rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") + assert len(rows) > 0, f"No rows written for: {encoder.full_name}" + assert rows[0].failures == ["Pending migration"] + ownership.owner_of.assert_called_once() From bdc42338b0bbd4bc120d4af96b022aece9f9bbf1 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 11:28:51 +0100 Subject: [PATCH 18/27] Add UsedTableProgressEncoder to RuntimeContext --- src/databricks/labs/ucx/contexts/workflow_task.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index a3656b290b..d8d12e2c24 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -28,7 +28,7 @@ from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.jobs import JobsProgressEncoder -from databricks.labs.ucx.progress.tables import TableProgressEncoder +from databricks.labs.ucx.progress.tables import TableProgressEncoder, UsedTableProgressEncoder from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder # As with GlobalContext, service factories unavoidably have a lot of public methods. @@ -231,6 +231,16 @@ def tables_progress(self) -> TableProgressEncoder: self.config.ucx_catalog, ) + @cached_property + def used_table_progress(self) -> UsedTableProgressEncoder: + return UsedTableProgressEncoder( + self.sql_backend, + self.used_table_ownership, + self.parent_run_id, + self.workspace_id, + self.config.ucx_catalog, + ) + @cached_property def udfs_progress(self) -> ProgressEncoder[Udf]: return ProgressEncoder( From 57df37f39a6a4603392b6bce45561170d7221d64 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 11:39:03 +0100 Subject: [PATCH 19/27] Add UsedTableProgressEncoder to MigrationProgress workflow --- src/databricks/labs/ucx/progress/workflows.py | 1 + tests/unit/progress/test_workflows.py | 41 +++++++++++++------ 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 060b2fdccf..2833d3c451 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -149,6 +149,7 @@ def assess_dashboards(self, ctx: RuntimeContext): Also stores direct filesystem accesses for display in the migration dashboard.""" # TODO: Ensure these are captured in the history log. ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + ctx.used_table_progress.append_inventory_snapshot(ctx.used_tables_crawler_for_queries.snapshot()) @job_task(depends_on=[verify_prerequisites]) def assess_workflows(self, ctx: RuntimeContext): diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 6ce5174847..5cc92dffba 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -11,6 +11,10 @@ from databricks.labs.ucx.progress.workflows import MigrationProgress from databricks.labs.ucx.contexts.workflow_task import RuntimeContext +from databricks.labs.ucx.progress.tables import UsedTableProgressEncoder +from databricks.labs.ucx.source_code.jobs import WorkflowLinter +from databricks.labs.ucx.source_code.queries import QueryLinter +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler @pytest.mark.parametrize( @@ -66,19 +70,30 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: mock_history_log.append_inventory_snapshot.assert_called_once() -@pytest.mark.parametrize( - "task, linter", - ( - (MigrationProgress.assess_dashboards, RuntimeContext.query_linter), - (MigrationProgress.assess_workflows, RuntimeContext.workflow_linter), - ), -) -def test_linter_runtime_refresh(run_workflow, task, linter) -> None: - linter_class = get_type_hints(linter.func)["return"] - mock_linter = create_autospec(linter_class) - linter_name = linter.attrname - ctx = run_workflow(task, **{linter_name: mock_linter}) - mock_linter.refresh_report.assert_called_once_with(ctx.sql_backend, ctx.inventory_database) +def test_migration_progress_assess_dashboards_refreshes_report(run_workflow) -> None: + query_linter = create_autospec(QueryLinter) + used_table_progress_encoder = create_autospec(UsedTableProgressEncoder) + used_tables_crawler = create_autospec(UsedTablesCrawler) + + run_workflow( + MigrationProgress.assess_dashboards, + named_parameters={"parent_run_id": 1}, + query_linter=query_linter, + used_table_progress=used_table_progress_encoder, + used_tables_crawler_for_queries=used_tables_crawler, + ) + + query_linter.refresh_report.assert_called_once() + used_tables_crawler.snapshot.assert_called_once() + used_table_progress_encoder.append_inventory_snapshot.assert_called_once() + + +def test_migration_progress_assess_workflows_refreshes_report(run_workflow) -> None: + workflow_linter = create_autospec(WorkflowLinter) + + run_workflow(MigrationProgress.assess_workflows, workflow_linter=workflow_linter) + + workflow_linter.refresh_report.assert_called_once() def test_migration_progress_with_valid_prerequisites(run_workflow) -> None: From 709e3fd6c82180a6512b05f1a2746b82040480b8 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 12:10:30 +0100 Subject: [PATCH 20/27] Move Used table unit tests to unit tests module --- .../source_code/test_used_tables.py | 132 ---------------- tests/unit/source_code/test_used_table.py | 148 ++++++++++++++++-- 2 files changed, 132 insertions(+), 148 deletions(-) delete mode 100644 tests/integration/source_code/test_used_tables.py diff --git a/tests/integration/source_code/test_used_tables.py b/tests/integration/source_code/test_used_tables.py deleted file mode 100644 index 3e5dd5c439..0000000000 --- a/tests/integration/source_code/test_used_tables.py +++ /dev/null @@ -1,132 +0,0 @@ -import dataclasses -import logging -from unittest.mock import create_autospec - -import pytest - -from databricks.labs.ucx.framework.owners import AdministratorLocator, LegacyQueryOwnership, WorkspacePathOwnership -from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable -from databricks.labs.ucx.source_code.used_table import UsedTableOwnership, UsedTablesCrawler - - -@pytest.fixture -def used_table() -> UsedTable: - return UsedTable( - catalog_name="test-catalog", - schema_name="test-schema", - table_name="test-table", - is_write=True, - source_id="test", - source_lineage=[LineageAtom(object_type="QUERY", object_id="dashboard/query")], - ) - - -def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot(used_table: UsedTable) -> None: - administrator_locator = create_autospec(AdministratorLocator) - administrator_locator.get_workspace_administrator.return_value = "John Doe" - used_tables_crawler = create_autospec(UsedTablesCrawler) - used_tables_crawler.snapshot.return_value = [used_table] - legacy_query_ownership = create_autospec(LegacyQueryOwnership) - workspace_path_ownership = create_autospec(WorkspacePathOwnership) - - ownership = UsedTableOwnership( - administrator_locator, - used_tables_crawler, - used_tables_crawler, - legacy_query_ownership, - workspace_path_ownership, - ) - - owner = ownership.owner_of(UsedTable()) - - assert owner == "John Doe" - administrator_locator.get_workspace_administrator.assert_called_once() - legacy_query_ownership.owner_of.assert_not_called() - workspace_path_ownership.owner_of_path.assert_not_called() - - -def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTable) -> None: - used_table = dataclasses.replace(used_table, is_write=False) - administrator_locator = create_autospec(AdministratorLocator) - administrator_locator.get_workspace_administrator.return_value = "John Doe" - used_tables_crawler = create_autospec(UsedTablesCrawler) - used_tables_crawler.snapshot.return_value = [used_table] - legacy_query_ownership = create_autospec(LegacyQueryOwnership) - workspace_path_ownership = create_autospec(WorkspacePathOwnership) - - ownership = UsedTableOwnership( - administrator_locator, - used_tables_crawler, - used_tables_crawler, - legacy_query_ownership, - workspace_path_ownership, - ) - - owner = ownership.owner_of(used_table) - - assert owner == "John Doe" - administrator_locator.get_workspace_administrator.assert_called_once() - legacy_query_ownership.owner_of.assert_not_called() - workspace_path_ownership.owner_of_path.assert_not_called() - - -@pytest.mark.parametrize("object_type", ["QUERY", "NOTEBOOK", "FILE"]) -def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) -> None: - source_lineage = [LineageAtom(object_type=object_type, object_id="dashboard/query")] - used_table = dataclasses.replace(used_table, source_lineage=source_lineage) - administrator_locator = create_autospec(AdministratorLocator) - administrator_locator.get_workspace_administrator.return_value = "John Doe" - used_tables_crawler = create_autospec(UsedTablesCrawler) - used_tables_crawler.snapshot.return_value = [used_table] - legacy_query_ownership = create_autospec(LegacyQueryOwnership) - legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None - workspace_path_ownership = create_autospec(WorkspacePathOwnership) - workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None - - ownership = UsedTableOwnership( - administrator_locator, - used_tables_crawler, - used_tables_crawler, - legacy_query_ownership, - workspace_path_ownership, - ) - - owner = ownership.owner_of(used_table) - - assert owner == "Mary Jane" - administrator_locator.get_workspace_administrator.assert_not_called() - if object_type == "QUERY": - legacy_query_ownership.owner_of.assert_called_once_with(used_table.query_id) - workspace_path_ownership.owner_of_path.assert_not_called() - else: - legacy_query_ownership.owner_of.assert_not_called() - workspace_path_ownership.owner_of_path.assert_called_once_with(used_table.source_id) - - -def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTable) -> None: - source_lineage = [LineageAtom(object_type="UNKNOWN", object_id="dashboard/query")] - used_table = dataclasses.replace(used_table, source_lineage=source_lineage) - administrator_locator = create_autospec(AdministratorLocator) - administrator_locator.get_workspace_administrator.return_value = "John Doe" - used_tables_crawler = create_autospec(UsedTablesCrawler) - used_tables_crawler.snapshot.return_value = [used_table] - legacy_query_ownership = create_autospec(LegacyQueryOwnership) - legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None - workspace_path_ownership = create_autospec(WorkspacePathOwnership) - workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None - - ownership = UsedTableOwnership( - administrator_locator, - used_tables_crawler, - used_tables_crawler, - legacy_query_ownership, - workspace_path_ownership, - ) - - with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.source_code.used_table"): - owner = ownership.owner_of(used_table) - assert owner == "John Doe" - assert f"Unknown source type 'UNKNOWN' for {used_table.source_id}" in caplog.messages - administrator_locator.get_workspace_administrator.assert_called_once() - legacy_query_ownership.owner_of.assert_not_called() - workspace_path_ownership.owner_of_path.assert_not_called() diff --git a/tests/unit/source_code/test_used_table.py b/tests/unit/source_code/test_used_table.py index e432a923a8..34680b29e5 100644 --- a/tests/unit/source_code/test_used_table.py +++ b/tests/unit/source_code/test_used_table.py @@ -1,29 +1,145 @@ +import dataclasses import datetime as dt +import logging +from unittest.mock import create_autospec +import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.ucx.framework.owners import AdministratorLocator, LegacyQueryOwnership, WorkspacePathOwnership from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.source_code.used_table import UsedTableOwnership, UsedTablesCrawler -def test_crawler_appends_tables() -> None: +@pytest.fixture +def used_table() -> UsedTable: + return UsedTable( + catalog_name="test-catalog", + schema_name="test-schema", + table_name="test-table", + is_write=True, + source_id="test", + source_lineage=[LineageAtom(object_type="QUERY", object_id="dashboard/query")], + ) + + +def test_crawler_appends_tables(used_table: UsedTable) -> None: backend = MockBackend() crawler = UsedTablesCrawler.for_paths(backend, "schema") existing = list(crawler.snapshot()) assert not existing - now = dt.datetime.now(tz=dt.timezone.utc) - dfsas = list( - UsedTable( - catalog_name="catalog", - schema_name="schema", - table_name=name, - source_timestamp=now, - source_lineage=[LineageAtom(object_type="LINEAGE", object_id="ID")], - assessment_start_timestamp=now, - assessment_end_timestamp=now, - ) - for name in ("a", "b", "c") - ) - crawler.dump_all(dfsas) + + crawler.dump_all([used_table] * 3) rows = backend.rows_written_for(crawler.full_name, "append") assert len(rows) == 3 + + +def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot(used_table: UsedTable) -> None: + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(UsedTable()) + + assert owner == "John Doe" + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() + + +def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTable) -> None: + used_table = dataclasses.replace(used_table, is_write=False) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(used_table) + + assert owner == "John Doe" + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() + + +@pytest.mark.parametrize("object_type", ["QUERY", "NOTEBOOK", "FILE"]) +def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) -> None: + source_lineage = [LineageAtom(object_type=object_type, object_id="dashboard/query")] + used_table = dataclasses.replace(used_table, source_lineage=source_lineage) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(used_table) + + assert owner == "Mary Jane" + administrator_locator.get_workspace_administrator.assert_not_called() + if object_type == "QUERY": + legacy_query_ownership.owner_of.assert_called_once_with(used_table.query_id) + workspace_path_ownership.owner_of_path.assert_not_called() + else: + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_called_once_with(used_table.source_id) + + +def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTable) -> None: + source_lineage = [LineageAtom(object_type="UNKNOWN", object_id="dashboard/query")] + used_table = dataclasses.replace(used_table, source_lineage=source_lineage) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.source_code.used_table"): + owner = ownership.owner_of(used_table) + assert owner == "John Doe" + assert f"Unknown source type 'UNKNOWN' for {used_table.source_id}" in caplog.messages + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() From fd95c15555591528a3d6b823f375436d51369620 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 12:11:51 +0100 Subject: [PATCH 21/27] Move base unit tests to unit tests module --- tests/integration/source_code/test_base.py | 14 -------------- tests/unit/source_code/test_base.py | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 15 deletions(-) delete mode 100644 tests/integration/source_code/test_base.py diff --git a/tests/integration/source_code/test_base.py b/tests/integration/source_code/test_base.py deleted file mode 100644 index e92c6b05dd..0000000000 --- a/tests/integration/source_code/test_base.py +++ /dev/null @@ -1,14 +0,0 @@ -from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.source_code.used_table import UsedTable - - -def test_used_table_from_table() -> None: - table = Table("catalog", "schema", "table", "MANAGED", "DELTA") - - used_table = UsedTable.from_table(table, is_read=False, is_write=True) - - assert used_table.catalog_name == "catalog" - assert used_table.schema_name == "schema" - assert used_table.table_name == "table" - assert not used_table.is_read - assert used_table.is_write diff --git a/tests/unit/source_code/test_base.py b/tests/unit/source_code/test_base.py index e1e1cebde5..d9ab8a50e5 100644 --- a/tests/unit/source_code/test_base.py +++ b/tests/unit/source_code/test_base.py @@ -1,11 +1,12 @@ import dataclasses +from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.source_code.base import ( Advice, Advisory, Convention, Deprecation, - Failure, + Failure, UsedTable, ) @@ -40,3 +41,15 @@ def test_deprecation_initialization() -> None: def test_convention_initialization() -> None: convention = Convention('code5', 'This is a convention', 1, 1, 2, 2) assert isinstance(convention, Advice) + + +def test_used_table_from_table() -> None: + table = Table("catalog", "schema", "table", "MANAGED", "DELTA") + + used_table = UsedTable.from_table(table, is_read=False, is_write=True) + + assert used_table.catalog_name == "catalog" + assert used_table.schema_name == "schema" + assert used_table.table_name == "table" + assert not used_table.is_read + assert used_table.is_write From 1f733a8acd2daeab2d122d270999cce0ffb23018 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 12:12:13 +0100 Subject: [PATCH 22/27] Format --- tests/unit/source_code/test_base.py | 3 ++- tests/unit/source_code/test_used_table.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/source_code/test_base.py b/tests/unit/source_code/test_base.py index d9ab8a50e5..b65db44c19 100644 --- a/tests/unit/source_code/test_base.py +++ b/tests/unit/source_code/test_base.py @@ -6,7 +6,8 @@ Advisory, Convention, Deprecation, - Failure, UsedTable, + Failure, + UsedTable, ) diff --git a/tests/unit/source_code/test_used_table.py b/tests/unit/source_code/test_used_table.py index 34680b29e5..6a6694942a 100644 --- a/tests/unit/source_code/test_used_table.py +++ b/tests/unit/source_code/test_used_table.py @@ -1,5 +1,4 @@ import dataclasses -import datetime as dt import logging from unittest.mock import create_autospec From 0f5ae17475705766f74dd3c4ea1d8d254ffc415e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 16:42:18 +0100 Subject: [PATCH 23/27] Add missing part to comment --- src/databricks/labs/ucx/framework/owners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index 4dd089cff6..6327aa8537 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -201,7 +201,7 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli super().__init__(administrator_locator) self._ws = ws - def owner_of_path(self, path: str) -> str: # TODO: Why is this `owner_of_path` and not + def owner_of_path(self, path: str) -> str: # TODO: Why is this `owner_of_path` and not `owner_of` return self.owner_of(WorkspacePath(self._ws, path)) @retried(on=[InternalError], timeout=timedelta(minutes=1)) From b7f32de792dbb24d980ea169673b28e7691aff4e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 13 Nov 2024 16:46:47 +0100 Subject: [PATCH 24/27] Avoid indirect administrator locator --- src/databricks/labs/ucx/contexts/application.py | 2 +- .../labs/ucx/hive_metastore/table_migrate.py | 11 ++++++++--- .../integration/hive_metastore/test_table_migrate.py | 2 +- tests/unit/hive_metastore/test_table_migrate.py | 9 +++------ 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index fb5f99c699..74af624046 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -409,7 +409,7 @@ def migration_status_refresher(self) -> TableMigrationStatusRefresher: @cached_property def table_migration_ownership(self) -> TableMigrationOwnership: - return TableMigrationOwnership(self.tables_crawler, self.table_ownership) + return TableMigrationOwnership(self.administrator_locator, self.tables_crawler, self.table_ownership) @cached_property def iam_credential_manager(self) -> CredentialManager: diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index be576e5ce0..8c414f6ce4 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -9,7 +9,7 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.errors.platform import DatabricksError -from databricks.labs.ucx.framework.owners import Ownership +from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.grants import MigrateGrants @@ -607,8 +607,13 @@ class TableMigrationOwnership(Ownership[TableMigrationStatus]): This is the owner of the source table, if (and only if) the source table is present in the inventory. """ - def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: - super().__init__(table_ownership._administrator_locator) # TODO: Fix this + def __init__( + self, + administrator_locator: AdministratorLocator, + tables_crawler: TablesCrawler, + table_ownership: TableOwnership, + ) -> None: + super().__init__(administrator_locator) self._tables_crawler = tables_crawler self._table_ownership = table_ownership self._indexed_tables: dict[tuple[str, str], Table] | None = None diff --git a/tests/integration/hive_metastore/test_table_migrate.py b/tests/integration/hive_metastore/test_table_migrate.py index b8db48bd33..1a3e567303 100644 --- a/tests/integration/hive_metastore/test_table_migrate.py +++ b/tests/integration/hive_metastore/test_table_migrate.py @@ -32,7 +32,7 @@ def is_migration_record_for_table(record: TableMigrationStatus) -> bool: # Verify for the table that the table owner and the migration status are a match. table_ownership = runtime_ctx.table_ownership - table_migration_ownership = TableMigrationOwnership(tables_crawler, table_ownership) + table_migration_ownership = TableMigrationOwnership(runtime_ctx.administrator_locator, tables_crawler, table_ownership) assert table_migration_ownership.owner_of(table_migration_record) == table_ownership.owner_of(table_record) # Verify the owner of the migration record that corresponds to an unknown table. diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index cfb9bf7399..ec65f24e7f 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1450,10 +1450,9 @@ def test_table_migration_status_owner() -> None: ) tables_crawler.snapshot.return_value = [the_table] table_ownership = create_autospec(TableOwnership) - table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access table_ownership.owner_of.return_value = "bob" - ownership = TableMigrationOwnership(tables_crawler, table_ownership) + ownership = TableMigrationOwnership(admin_locator, tables_crawler, table_ownership) owner = ownership.owner_of( TableMigrationStatus( src_schema="foo", @@ -1493,10 +1492,9 @@ def test_table_migration_status_owner_caches_tables_snapshot() -> None: ) tables_crawler.snapshot.return_value = [a_table, b_table] table_ownership = create_autospec(TableOwnership) - table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access table_ownership.owner_of.return_value = "bob" - ownership = TableMigrationOwnership(tables_crawler, table_ownership) + ownership = TableMigrationOwnership(admin_locator, tables_crawler, table_ownership) # Verify the snapshot() hasn't been loaded yet: it isn't needed. tables_crawler.snapshot.assert_not_called() @@ -1519,9 +1517,8 @@ def test_table_migration_status_source_table_unknown() -> None: tables_crawler = create_autospec(TablesCrawler) tables_crawler.snapshot.return_value = [] table_ownership = create_autospec(TableOwnership) - table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access - ownership = TableMigrationOwnership(tables_crawler, table_ownership) + ownership = TableMigrationOwnership(admin_locator, tables_crawler, table_ownership) unknown_table = TableMigrationStatus( src_schema="foo", From f07aebb136afe47dbd6d78a54be63c13e9194b15 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 14 Nov 2024 14:24:54 +0100 Subject: [PATCH 25/27] Format --- tests/integration/hive_metastore/test_table_migrate.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/hive_metastore/test_table_migrate.py b/tests/integration/hive_metastore/test_table_migrate.py index 1a3e567303..fd3d8348ee 100644 --- a/tests/integration/hive_metastore/test_table_migrate.py +++ b/tests/integration/hive_metastore/test_table_migrate.py @@ -32,7 +32,9 @@ def is_migration_record_for_table(record: TableMigrationStatus) -> bool: # Verify for the table that the table owner and the migration status are a match. table_ownership = runtime_ctx.table_ownership - table_migration_ownership = TableMigrationOwnership(runtime_ctx.administrator_locator, tables_crawler, table_ownership) + table_migration_ownership = TableMigrationOwnership( + runtime_ctx.administrator_locator, tables_crawler, table_ownership + ) assert table_migration_ownership.owner_of(table_migration_record) == table_ownership.owner_of(table_record) # Verify the owner of the migration record that corresponds to an unknown table. From 61bf68c6474f4afbe80008826ed7cb8409796c1e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 14 Nov 2024 14:29:52 +0100 Subject: [PATCH 26/27] Remove redundant pytest.mark.paramtrize --- tests/unit/progress/test_tables.py | 64 ++++++++++-------------------- 1 file changed, 22 insertions(+), 42 deletions(-) diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index 76062df509..dd0a0ad84f 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -12,13 +12,8 @@ from databricks.labs.ucx.source_code.used_table import UsedTable -@pytest.mark.parametrize( - "table", - [ - Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"), - ], -) -def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: +def test_table_progress_encoder_no_failures(mock_backend) -> None: + table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA") ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" table_migration_index = create_autospec(TableMigrationIndex) @@ -38,13 +33,8 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: grant_progress_encoder.assert_not_called() -@pytest.mark.parametrize( - "table", - [ - Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"), - ], -) -def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None: +def test_table_progress_encoder_pending_migration_failure(mock_backend) -> None: + table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA") ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" table_migration_index = create_autospec(TableMigrationIndex) @@ -64,21 +54,16 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T grant_progress_encoder.assert_not_called() -@pytest.mark.parametrize( - "used_table", - [ - UsedTable( - catalog_name="catalog", - schema_name="schema", - table_name="table", - source_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), - ), - ], -) -def test_used_table_progress_encoder_no_failures(mock_backend, used_table: UsedTable) -> None: +def test_used_table_progress_encoder_no_failures(mock_backend) -> None: """No failures when the table is not in the Hive metastore.""" + used_table = UsedTable( + catalog_name="catalog", + schema_name="schema", + table_name="table", + source_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), + ) ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" encoder = UsedTableProgressEncoder(mock_backend, ownership, run_id=1, workspace_id=123456789, catalog="test") @@ -91,21 +76,16 @@ def test_used_table_progress_encoder_no_failures(mock_backend, used_table: UsedT ownership.owner_of.assert_called_once() -@pytest.mark.parametrize( - "used_table", - [ - UsedTable( - catalog_name="hive_metastore", - schema_name="schema", - table_name="table", - source_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), - ), - ], -) -def test_used_table_progress_encoder_pending_migration_failure(mock_backend, used_table: UsedTable) -> None: +def test_used_table_progress_encoder_pending_migration_failure(mock_backend) -> None: """Failures when the table is in the Hive metastore.""" + used_table = UsedTable( + catalog_name="hive_metastore", + schema_name="schema", + table_name="table", + source_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), + ) ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" encoder = UsedTableProgressEncoder(mock_backend, ownership, run_id=1, workspace_id=123456789, catalog="test") From 743847d6e260ed9d2f4e4f8046bae37c76353cb3 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 19 Nov 2024 13:52:31 +0100 Subject: [PATCH 27/27] Remove UsedTableProgressEncoder --- .../labs/ucx/contexts/workflow_task.py | 12 +---- src/databricks/labs/ucx/progress/tables.py | 40 --------------- src/databricks/labs/ucx/progress/workflows.py | 1 - tests/unit/progress/test_tables.py | 49 +------------------ tests/unit/progress/test_workflows.py | 8 --- 5 files changed, 2 insertions(+), 108 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index d8d12e2c24..a3656b290b 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -28,7 +28,7 @@ from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.jobs import JobsProgressEncoder -from databricks.labs.ucx.progress.tables import TableProgressEncoder, UsedTableProgressEncoder +from databricks.labs.ucx.progress.tables import TableProgressEncoder from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder # As with GlobalContext, service factories unavoidably have a lot of public methods. @@ -231,16 +231,6 @@ def tables_progress(self) -> TableProgressEncoder: self.config.ucx_catalog, ) - @cached_property - def used_table_progress(self) -> UsedTableProgressEncoder: - return UsedTableProgressEncoder( - self.sql_backend, - self.used_table_ownership, - self.parent_run_id, - self.workspace_id, - self.config.ucx_catalog, - ) - @cached_property def udfs_progress(self) -> ProgressEncoder[Udf]: return ProgressEncoder( diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 27f71ae218..2e01fe6d1c 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -7,7 +7,6 @@ from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical -from databricks.labs.ucx.source_code.used_table import UsedTable, UsedTableOwnership class TableProgressEncoder(ProgressEncoder[Table]): @@ -53,42 +52,3 @@ def _encode_record_as_historical(self, record: Table) -> Historical: if not self._table_migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures) - - -class UsedTableProgressEncoder(ProgressEncoder[UsedTable]): - """Encoder class:UsedTable to class:History. - - A used table has a failure if its catalog is the hive_metastore. - """ - - def __init__( - self, - sql_backend: SqlBackend, - ownership: UsedTableOwnership, - run_id: int, - workspace_id: int, - catalog: str, - schema: str = "multiworkspace", - table: str = "historical", - ) -> None: - super().__init__( - sql_backend, - ownership, - UsedTable, - run_id, - workspace_id, - catalog, - schema, - table, - ) - - def _encode_record_as_historical(self, record: UsedTable) -> Historical: - """Encode record as historical. - - A table failure means that the table is pending migration. - """ - historical = super()._encode_record_as_historical(record) - failures = [] - if record.catalog_name == "hive_metastore": - failures.append("Pending migration") - return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 2833d3c451..060b2fdccf 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -149,7 +149,6 @@ def assess_dashboards(self, ctx: RuntimeContext): Also stores direct filesystem accesses for display in the migration dashboard.""" # TODO: Ensure these are captured in the history log. ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) - ctx.used_table_progress.append_inventory_snapshot(ctx.used_tables_crawler_for_queries.snapshot()) @job_task(depends_on=[verify_prerequisites]) def assess_workflows(self, ctx: RuntimeContext): diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index dd0a0ad84f..9881fef386 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -1,15 +1,12 @@ -import datetime as dt from unittest.mock import create_autospec -import pytest from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.progress.grants import GrantProgressEncoder -from databricks.labs.ucx.progress.tables import TableProgressEncoder, UsedTableProgressEncoder -from databricks.labs.ucx.source_code.used_table import UsedTable +from databricks.labs.ucx.progress.tables import TableProgressEncoder def test_table_progress_encoder_no_failures(mock_backend) -> None: @@ -52,47 +49,3 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend) -> None: ownership.owner_of.assert_called_once() table_migration_index.is_migrated.assert_called_with(table.database, table.name) grant_progress_encoder.assert_not_called() - - -def test_used_table_progress_encoder_no_failures(mock_backend) -> None: - """No failures when the table is not in the Hive metastore.""" - used_table = UsedTable( - catalog_name="catalog", - schema_name="schema", - table_name="table", - source_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), - ) - ownership = create_autospec(Ownership) - ownership.owner_of.return_value = "user" - encoder = UsedTableProgressEncoder(mock_backend, ownership, run_id=1, workspace_id=123456789, catalog="test") - - encoder.append_inventory_snapshot([used_table]) - - rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") - assert len(rows) > 0, f"No rows written for: {encoder.full_name}" - assert len(rows[0].failures) == 0 - ownership.owner_of.assert_called_once() - - -def test_used_table_progress_encoder_pending_migration_failure(mock_backend) -> None: - """Failures when the table is in the Hive metastore.""" - used_table = UsedTable( - catalog_name="hive_metastore", - schema_name="schema", - table_name="table", - source_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), - assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), - ) - ownership = create_autospec(Ownership) - ownership.owner_of.return_value = "user" - encoder = UsedTableProgressEncoder(mock_backend, ownership, run_id=1, workspace_id=123456789, catalog="test") - - encoder.append_inventory_snapshot([used_table]) - - rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") - assert len(rows) > 0, f"No rows written for: {encoder.full_name}" - assert rows[0].failures == ["Pending migration"] - ownership.owner_of.assert_called_once() diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 5cc92dffba..86d87e30c5 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -11,10 +11,8 @@ from databricks.labs.ucx.progress.workflows import MigrationProgress from databricks.labs.ucx.contexts.workflow_task import RuntimeContext -from databricks.labs.ucx.progress.tables import UsedTableProgressEncoder from databricks.labs.ucx.source_code.jobs import WorkflowLinter from databricks.labs.ucx.source_code.queries import QueryLinter -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler @pytest.mark.parametrize( @@ -72,20 +70,14 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: def test_migration_progress_assess_dashboards_refreshes_report(run_workflow) -> None: query_linter = create_autospec(QueryLinter) - used_table_progress_encoder = create_autospec(UsedTableProgressEncoder) - used_tables_crawler = create_autospec(UsedTablesCrawler) run_workflow( MigrationProgress.assess_dashboards, named_parameters={"parent_run_id": 1}, query_linter=query_linter, - used_table_progress=used_table_progress_encoder, - used_tables_crawler_for_queries=used_tables_crawler, ) query_linter.refresh_report.assert_called_once() - used_tables_crawler.snapshot.assert_called_once() - used_table_progress_encoder.append_inventory_snapshot.assert_called_once() def test_migration_progress_assess_workflows_refreshes_report(run_workflow) -> None: