diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 889fba1eec..74af624046 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -19,7 +19,7 @@ from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccessOwnership from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler, UsedTableOwnership from databricks.sdk import AccountClient, WorkspaceClient, core from databricks.sdk.service import sql @@ -44,10 +44,11 @@ ) from databricks.labs.ucx.hive_metastore.mapping import TableMapping from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.table_migrate import ( TableMigrationStatusRefresher, TablesMigrator, + TableMigrationOwnership, ) from databricks.labs.ucx.hive_metastore.table_move import TableMove from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership @@ -262,16 +263,19 @@ def tables_crawler(self) -> TablesCrawler: return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases) @cached_property - def table_ownership(self) -> TableOwnership: - return TableOwnership( + def used_table_ownership(self) -> UsedTableOwnership: + return UsedTableOwnership( self.administrator_locator, - self.grants_crawler, self.used_tables_crawler_for_paths, self.used_tables_crawler_for_queries, self.legacy_query_ownership, self.workspace_path_ownership, ) + @cached_property + def table_ownership(self) -> TableOwnership: + return TableOwnership(self.administrator_locator, self.grants_crawler, self.used_table_ownership) + @cached_property def workspace_path_ownership(self) -> WorkspacePathOwnership: return WorkspacePathOwnership(self.administrator_locator, self.workspace_client) @@ -405,7 +409,7 @@ def migration_status_refresher(self) -> TableMigrationStatusRefresher: @cached_property def table_migration_ownership(self) -> TableMigrationOwnership: - return TableMigrationOwnership(self.tables_crawler, self.table_ownership) + return TableMigrationOwnership(self.administrator_locator, self.tables_crawler, self.table_ownership) @cached_property def iam_credential_manager(self) -> CredentialManager: diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index 55a1ddac98..6327aa8537 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -201,7 +201,7 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli super().__init__(administrator_locator) self._ws = ws - def owner_of_path(self, path: str) -> str: + def owner_of_path(self, path: str) -> str: # TODO: Why is this `owner_of_path` and not `owner_of` return self.owner_of(WorkspacePath(self._ws, path)) @retried(on=[InternalError], timeout=timedelta(minutes=1)) diff --git a/src/databricks/labs/ucx/hive_metastore/ownership.py b/src/databricks/labs/ucx/hive_metastore/ownership.py deleted file mode 100644 index b11f5f6e81..0000000000 --- a/src/databricks/labs/ucx/hive_metastore/ownership.py +++ /dev/null @@ -1,111 +0,0 @@ -import logging -from functools import cached_property - -from databricks.labs.ucx.framework.owners import ( - Ownership, - AdministratorLocator, - LegacyQueryOwnership, - WorkspacePathOwnership, -) -from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus -from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.source_code.base import UsedTable -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler - -logger = logging.getLogger(__name__) - - -class TableOwnership(Ownership[Table]): - """Determine ownership of tables in the inventory based on the following rules: - - If a table is owned by a principal in the grants table, then that principal is the owner. - - If a table is written to by a query, then the owner of that query is the owner of the table. - - If a table is written to by a notebook or file, then the owner of the path is the owner of the table. - """ - - def __init__( - self, - administrator_locator: AdministratorLocator, - grants_crawler: GrantsCrawler, - used_tables_in_paths: UsedTablesCrawler, - used_tables_in_queries: UsedTablesCrawler, - legacy_query_ownership: LegacyQueryOwnership, - workspace_path_ownership: WorkspacePathOwnership, - ) -> None: - super().__init__(administrator_locator) - self._grants_crawler = grants_crawler - self._used_tables_in_paths = used_tables_in_paths - self._used_tables_in_queries = used_tables_in_queries - self._legacy_query_ownership = legacy_query_ownership - self._workspace_path_ownership = workspace_path_ownership - - def _maybe_direct_owner(self, record: Table) -> str | None: - owner = self._maybe_from_grants(record) - if owner: - return owner - return self._maybe_from_sources(record) - - def _maybe_from_sources(self, record: Table) -> str | None: - used_table = self._used_tables_snapshot.get((record.catalog, record.database, record.name)) - if not used_table: - return None - # If something writes to a table, then it's an owner of it - if not used_table.is_write: - return None - if used_table.source_type == 'QUERY' and used_table.query_id: - return self._legacy_query_ownership.owner_of(used_table.query_id) - if used_table.source_type in {'NOTEBOOK', 'FILE'}: - return self._workspace_path_ownership.owner_of_path(used_table.source_id) - logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") - return None - - @cached_property - def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: - index = {} - for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): - for used_table in collection: - key = used_table.catalog_name, used_table.schema_name, used_table.table_name - index[key] = used_table - return index - - def _maybe_from_grants(self, record: Table) -> str | None: - for grant in self._grants_snapshot: - if not grant.action_type == 'OWN': - continue - object_type, full_name = grant.this_type_and_key() - if object_type == 'TABLE' and full_name == record.key: - return grant.principal - if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}": - return grant.principal - return None - - @cached_property - def _grants_snapshot(self): - return self._grants_crawler.snapshot() - - -class TableMigrationOwnership(Ownership[TableMigrationStatus]): - """Determine ownership of table migration records in the inventory. - - This is the owner of the source table, if (and only if) the source table is present in the inventory. - """ - - def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: - super().__init__(table_ownership._administrator_locator) # TODO: Fix this - self._tables_crawler = tables_crawler - self._table_ownership = table_ownership - self._indexed_tables: dict[tuple[str, str], Table] | None = None - - def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: - index = self._indexed_tables - if index is None or reindex: - snapshot = self._tables_crawler.snapshot() - index = {(table.database, table.name): table for table in snapshot} - self._indexed_tables = index - return index - - def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: - index = self._tables_snapshot_index() - source_table = index.get((record.src_schema, record.src_table), None) - return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 3dedc12e78..8c414f6ce4 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -9,6 +9,7 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.errors.platform import DatabricksError +from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.grants import MigrateGrants @@ -18,8 +19,12 @@ TableMapping, TableToMigrate, ) - -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationStatusRefresher, + TableMigrationIndex, + TableMigrationStatus, +) +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.tables import ( MigrationCount, Table, @@ -594,3 +599,34 @@ def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int): f"('upgraded_from' = '{source}'" f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');" ) + + +class TableMigrationOwnership(Ownership[TableMigrationStatus]): + """Determine ownership of table migration records in the inventory. + + This is the owner of the source table, if (and only if) the source table is present in the inventory. + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + tables_crawler: TablesCrawler, + table_ownership: TableOwnership, + ) -> None: + super().__init__(administrator_locator) + self._tables_crawler = tables_crawler + self._table_ownership = table_ownership + self._indexed_tables: dict[tuple[str, str], Table] | None = None + + def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: + index = self._indexed_tables + if index is None or reindex: + snapshot = self._tables_crawler.snapshot() + index = {(table.database, table.name): table for table in snapshot} + self._indexed_tables = index + return index + + def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: + index = self._tables_snapshot_index() + source_table = index.get((record.src_schema, record.src_table), None) + return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/table_ownership.py b/src/databricks/labs/ucx/hive_metastore/table_ownership.py new file mode 100644 index 0000000000..dc5e395686 --- /dev/null +++ b/src/databricks/labs/ucx/hive_metastore/table_ownership.py @@ -0,0 +1,59 @@ +""" +Separate table ownership module to resolve circular dependency for `:mod:hive_metastore.grants` using `:class:Table` +and `:class:TableOwnership` using the `GrantsCrawler`. +""" + +import logging +from functools import cached_property + +from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator +from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler +from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.source_code.base import UsedTable +from databricks.labs.ucx.source_code.used_table import UsedTableOwnership + + +logger = logging.getLogger(__name__) + + +class TableOwnership(Ownership[Table]): + """Table ownership + + Determine ownership of tables in the inventory based on the following rules: + - If a table is owned by a principal through grants, then that principal is the owner. + - Otherwise, fallback on the `UsedTableOwnership`. + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + grants_crawler: GrantsCrawler, + used_table_ownership: UsedTableOwnership, + ) -> None: + super().__init__(administrator_locator) + self._grants_crawler = grants_crawler + self._used_table_ownership = used_table_ownership + + def _maybe_direct_owner(self, record: Table) -> str | None: + owner = self._maybe_from_grants(record) + if owner: + return owner + # The `is_write` and `is_read` has no effect as the actual `UsedTable` definition comes from snapshots + used_table = UsedTable.from_table(record, is_read=False, is_write=False) + # This call defers the `administrator_locator` to the one of `UsedTableOwnership`, we expect them to be the same + return self._used_table_ownership.owner_of(used_table) + + def _maybe_from_grants(self, record: Table) -> str | None: + for grant in self._grants_snapshot: + if not grant.action_type == 'OWN': + continue + object_type, full_name = grant.this_type_and_key() + if object_type == 'TABLE' and full_name == record.key: + return grant.principal + if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}": + return grant.principal + return None + + @cached_property + def _grants_snapshot(self): + return self._grants_crawler.snapshot() diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 6dc76132e2..2e01fe6d1c 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -3,8 +3,8 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.hive_metastore.ownership import TableOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 659b38b2b7..b6418ea6bd 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Any, BinaryIO, TextIO +from typing import Any, BinaryIO, ClassVar, TextIO from astroid import NodeNG # type: ignore from sqlglot import Expression, parse as parse_sql @@ -18,9 +18,10 @@ from databricks.sdk.service import compute from databricks.sdk.service.workspace import Language - from databricks.labs.blueprint.paths import WorkspacePath +from databricks.labs.ucx.hive_metastore.tables import Table + if sys.version_info >= (3, 11): from typing import Self @@ -260,12 +261,25 @@ def parse(cls, value: str, default_schema: str, is_read=True, is_write=False) -> catalog_name=catalog_name, schema_name=schema_name, table_name=parts[0], is_read=is_read, is_write=is_write ) + @classmethod + def from_table(cls, table: Table, *, is_read: bool, is_write: bool) -> UsedTable: + """Create a `:class:UsedTable` from a Hive `:class:Table`.""" + return cls( + catalog_name=table.catalog, + schema_name=table.database, + table_name=table.name, + is_read=is_read, + is_write=is_write, + ) + catalog_name: str = SourceInfo.UNKNOWN schema_name: str = SourceInfo.UNKNOWN table_name: str = SourceInfo.UNKNOWN is_read: bool = True is_write: bool = False + __id_attributes__: ClassVar[tuple[str, ...]] = ("catalog_name", "schema_name", "table_name") + class TableCollector(ABC): diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index b5cdb77c0b..7d61dd57ff 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -2,12 +2,19 @@ import logging from collections.abc import Sequence, Iterable +from functools import cached_property from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.lsql.backends import SqlBackend from databricks.sdk.errors import DatabricksError from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.framework.owners import ( + AdministratorLocator, + LegacyQueryOwnership, + Ownership, + WorkspacePathOwnership, +) from databricks.labs.ucx.source_code.base import UsedTable logger = logging.getLogger(__name__) @@ -52,3 +59,47 @@ def _try_fetch(self) -> Iterable[UsedTable]: def _crawl(self) -> Iterable[UsedTable]: return [] # TODO raise NotImplementedError() once CrawlerBase supports empty snapshots + + +class UsedTableOwnership(Ownership[UsedTable]): + """Used table ownership. + + Ownership of code resources propagate to the `UsedTable` if that resource writes to the table. + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + used_tables_in_paths: UsedTablesCrawler, + used_tables_in_queries: UsedTablesCrawler, + legacy_query_ownership: LegacyQueryOwnership, + workspace_path_ownership: WorkspacePathOwnership, + ) -> None: + super().__init__(administrator_locator) + self._used_tables_in_paths = used_tables_in_paths + self._used_tables_in_queries = used_tables_in_queries + self._legacy_query_ownership = legacy_query_ownership + self._workspace_path_ownership = workspace_path_ownership + + @cached_property + def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: + index = {} + for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): + for used_table in collection: + key = used_table.catalog_name, used_table.schema_name, used_table.table_name + index[key] = used_table + return index + + def _maybe_direct_owner(self, record: UsedTable) -> str | None: + used_table = self._used_tables_snapshot.get((record.catalog_name, record.schema_name, record.table_name)) + if not used_table: + return None + # If something writes to a table, then it's an owner of it + if not used_table.is_write: + return None + if used_table.source_type == 'QUERY' and used_table.query_id: + return self._legacy_query_ownership.owner_of(used_table.query_id) + if used_table.source_type in {'NOTEBOOK', 'FILE'}: + return self._workspace_path_ownership.owner_of_path(used_table.source_id) + logger.warning(f"Unknown source type '{used_table.source_type}' for {used_table.source_id}") + return None diff --git a/tests/integration/hive_metastore/test_table_migrate.py b/tests/integration/hive_metastore/test_table_migrate.py index 61f87ac8b2..fd3d8348ee 100644 --- a/tests/integration/hive_metastore/test_table_migrate.py +++ b/tests/integration/hive_metastore/test_table_migrate.py @@ -5,7 +5,7 @@ TableMigrationStatus, TableMigrationStatusRefresher, ) -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership +from databricks.labs.ucx.hive_metastore.table_migrate import TableMigrationOwnership def test_table_migration_ownership(ws, runtime_ctx, inventory_schema, sql_backend) -> None: @@ -32,7 +32,9 @@ def is_migration_record_for_table(record: TableMigrationStatus) -> bool: # Verify for the table that the table owner and the migration status are a match. table_ownership = runtime_ctx.table_ownership - table_migration_ownership = TableMigrationOwnership(tables_crawler, table_ownership) + table_migration_ownership = TableMigrationOwnership( + runtime_ctx.administrator_locator, tables_crawler, table_ownership + ) assert table_migration_ownership.owner_of(table_migration_record) == table_ownership.owner_of(table_record) # Verify the owner of the migration record that corresponds to an unknown table. diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 5b7e7a66a4..ec65f24e7f 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -24,6 +24,7 @@ ) from databricks.labs.ucx.hive_metastore.table_migrate import ( TablesMigrator, + TableMigrationOwnership, ) from databricks.labs.ucx.hive_metastore.table_migration_status import ( TableMigrationStatusRefresher, @@ -31,7 +32,7 @@ TableMigrationStatus, TableView, ) -from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership from databricks.labs.ucx.hive_metastore.tables import ( Table, TablesCrawler, @@ -1449,10 +1450,9 @@ def test_table_migration_status_owner() -> None: ) tables_crawler.snapshot.return_value = [the_table] table_ownership = create_autospec(TableOwnership) - table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access table_ownership.owner_of.return_value = "bob" - ownership = TableMigrationOwnership(tables_crawler, table_ownership) + ownership = TableMigrationOwnership(admin_locator, tables_crawler, table_ownership) owner = ownership.owner_of( TableMigrationStatus( src_schema="foo", @@ -1492,10 +1492,9 @@ def test_table_migration_status_owner_caches_tables_snapshot() -> None: ) tables_crawler.snapshot.return_value = [a_table, b_table] table_ownership = create_autospec(TableOwnership) - table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access table_ownership.owner_of.return_value = "bob" - ownership = TableMigrationOwnership(tables_crawler, table_ownership) + ownership = TableMigrationOwnership(admin_locator, tables_crawler, table_ownership) # Verify the snapshot() hasn't been loaded yet: it isn't needed. tables_crawler.snapshot.assert_not_called() @@ -1518,9 +1517,8 @@ def test_table_migration_status_source_table_unknown() -> None: tables_crawler = create_autospec(TablesCrawler) tables_crawler.snapshot.return_value = [] table_ownership = create_autospec(TableOwnership) - table_ownership._administrator_locator = admin_locator # pylint: disable=protected-access - ownership = TableMigrationOwnership(tables_crawler, table_ownership) + ownership = TableMigrationOwnership(admin_locator, tables_crawler, table_ownership) unknown_table = TableMigrationStatus( src_schema="foo", diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 057389d49b..ea34c8cb24 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -20,8 +20,8 @@ TablesCrawler, What, ) -from databricks.labs.ucx.hive_metastore.ownership import TableOwnership -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler, UsedTableOwnership def test_is_delta_true(): @@ -688,14 +688,18 @@ def test_table_owner() -> None: legacy_query_ownership = create_autospec(LegacyQueryOwnership) workspace_path_ownership = create_autospec(WorkspacePathOwnership) - ownership = TableOwnership( + used_table_ownership = UsedTableOwnership( admin_locator, - grants_crawler, used_tables_in_paths, used_tables_in_queries, legacy_query_ownership, workspace_path_ownership, ) + ownership = TableOwnership( + admin_locator, + grants_crawler, + used_table_ownership, + ) table = Table(catalog="main", database="foo", name="bar", object_type="TABLE", table_format="DELTA") owner = ownership.owner_of(table) diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index a859bf5c03..9881fef386 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -1,6 +1,5 @@ from unittest.mock import create_autospec -import pytest from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier @@ -10,13 +9,8 @@ from databricks.labs.ucx.progress.tables import TableProgressEncoder -@pytest.mark.parametrize( - "table", - [ - Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"), - ], -) -def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: +def test_table_progress_encoder_no_failures(mock_backend) -> None: + table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA") ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" table_migration_index = create_autospec(TableMigrationIndex) @@ -36,13 +30,8 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: grant_progress_encoder.assert_not_called() -@pytest.mark.parametrize( - "table", - [ - Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"), - ], -) -def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None: +def test_table_progress_encoder_pending_migration_failure(mock_backend) -> None: + table = Table("hive_metastore", "schema", "table", "MANAGED", "DELTA") ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" table_migration_index = create_autospec(TableMigrationIndex) diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 6ce5174847..86d87e30c5 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -11,6 +11,8 @@ from databricks.labs.ucx.progress.workflows import MigrationProgress from databricks.labs.ucx.contexts.workflow_task import RuntimeContext +from databricks.labs.ucx.source_code.jobs import WorkflowLinter +from databricks.labs.ucx.source_code.queries import QueryLinter @pytest.mark.parametrize( @@ -66,19 +68,24 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: mock_history_log.append_inventory_snapshot.assert_called_once() -@pytest.mark.parametrize( - "task, linter", - ( - (MigrationProgress.assess_dashboards, RuntimeContext.query_linter), - (MigrationProgress.assess_workflows, RuntimeContext.workflow_linter), - ), -) -def test_linter_runtime_refresh(run_workflow, task, linter) -> None: - linter_class = get_type_hints(linter.func)["return"] - mock_linter = create_autospec(linter_class) - linter_name = linter.attrname - ctx = run_workflow(task, **{linter_name: mock_linter}) - mock_linter.refresh_report.assert_called_once_with(ctx.sql_backend, ctx.inventory_database) +def test_migration_progress_assess_dashboards_refreshes_report(run_workflow) -> None: + query_linter = create_autospec(QueryLinter) + + run_workflow( + MigrationProgress.assess_dashboards, + named_parameters={"parent_run_id": 1}, + query_linter=query_linter, + ) + + query_linter.refresh_report.assert_called_once() + + +def test_migration_progress_assess_workflows_refreshes_report(run_workflow) -> None: + workflow_linter = create_autospec(WorkflowLinter) + + run_workflow(MigrationProgress.assess_workflows, workflow_linter=workflow_linter) + + workflow_linter.refresh_report.assert_called_once() def test_migration_progress_with_valid_prerequisites(run_workflow) -> None: diff --git a/tests/unit/source_code/test_base.py b/tests/unit/source_code/test_base.py index e1e1cebde5..b65db44c19 100644 --- a/tests/unit/source_code/test_base.py +++ b/tests/unit/source_code/test_base.py @@ -1,11 +1,13 @@ import dataclasses +from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.source_code.base import ( Advice, Advisory, Convention, Deprecation, Failure, + UsedTable, ) @@ -40,3 +42,15 @@ def test_deprecation_initialization() -> None: def test_convention_initialization() -> None: convention = Convention('code5', 'This is a convention', 1, 1, 2, 2) assert isinstance(convention, Advice) + + +def test_used_table_from_table() -> None: + table = Table("catalog", "schema", "table", "MANAGED", "DELTA") + + used_table = UsedTable.from_table(table, is_read=False, is_write=True) + + assert used_table.catalog_name == "catalog" + assert used_table.schema_name == "schema" + assert used_table.table_name == "table" + assert not used_table.is_read + assert used_table.is_write diff --git a/tests/unit/source_code/test_used_table.py b/tests/unit/source_code/test_used_table.py index e432a923a8..6a6694942a 100644 --- a/tests/unit/source_code/test_used_table.py +++ b/tests/unit/source_code/test_used_table.py @@ -1,29 +1,144 @@ -import datetime as dt +import dataclasses +import logging +from unittest.mock import create_autospec +import pytest from databricks.labs.lsql.backends import MockBackend +from databricks.labs.ucx.framework.owners import AdministratorLocator, LegacyQueryOwnership, WorkspacePathOwnership from databricks.labs.ucx.source_code.base import LineageAtom, UsedTable -from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.source_code.used_table import UsedTableOwnership, UsedTablesCrawler -def test_crawler_appends_tables() -> None: +@pytest.fixture +def used_table() -> UsedTable: + return UsedTable( + catalog_name="test-catalog", + schema_name="test-schema", + table_name="test-table", + is_write=True, + source_id="test", + source_lineage=[LineageAtom(object_type="QUERY", object_id="dashboard/query")], + ) + + +def test_crawler_appends_tables(used_table: UsedTable) -> None: backend = MockBackend() crawler = UsedTablesCrawler.for_paths(backend, "schema") existing = list(crawler.snapshot()) assert not existing - now = dt.datetime.now(tz=dt.timezone.utc) - dfsas = list( - UsedTable( - catalog_name="catalog", - schema_name="schema", - table_name=name, - source_timestamp=now, - source_lineage=[LineageAtom(object_type="LINEAGE", object_id="ID")], - assessment_start_timestamp=now, - assessment_end_timestamp=now, - ) - for name in ("a", "b", "c") - ) - crawler.dump_all(dfsas) + + crawler.dump_all([used_table] * 3) rows = backend.rows_written_for(crawler.full_name, "append") assert len(rows) == 3 + + +def test_used_table_ownership_is_workspace_admin_if_not_in_used_tables_snapshot(used_table: UsedTable) -> None: + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(UsedTable()) + + assert owner == "John Doe" + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() + + +def test_used_table_ownership_is_workspace_admin_if_not_write(used_table: UsedTable) -> None: + used_table = dataclasses.replace(used_table, is_write=False) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(used_table) + + assert owner == "John Doe" + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called() + + +@pytest.mark.parametrize("object_type", ["QUERY", "NOTEBOOK", "FILE"]) +def test_used_table_ownership_from_code(used_table: UsedTable, object_type: str) -> None: + source_lineage = [LineageAtom(object_type=object_type, object_id="dashboard/query")] + used_table = dataclasses.replace(used_table, source_lineage=source_lineage) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + owner = ownership.owner_of(used_table) + + assert owner == "Mary Jane" + administrator_locator.get_workspace_administrator.assert_not_called() + if object_type == "QUERY": + legacy_query_ownership.owner_of.assert_called_once_with(used_table.query_id) + workspace_path_ownership.owner_of_path.assert_not_called() + else: + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_called_once_with(used_table.source_id) + + +def test_used_table_ownership_from_unknown_code_type(caplog, used_table: UsedTable) -> None: + source_lineage = [LineageAtom(object_type="UNKNOWN", object_id="dashboard/query")] + used_table = dataclasses.replace(used_table, source_lineage=source_lineage) + administrator_locator = create_autospec(AdministratorLocator) + administrator_locator.get_workspace_administrator.return_value = "John Doe" + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.snapshot.return_value = [used_table] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + legacy_query_ownership.owner_of.side_effect = lambda i: "Mary Jane" if i == used_table.query_id else None + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + workspace_path_ownership.owner_of_path.side_effect = lambda i: "Mary Jane" if i == used_table.source_id else None + + ownership = UsedTableOwnership( + administrator_locator, + used_tables_crawler, + used_tables_crawler, + legacy_query_ownership, + workspace_path_ownership, + ) + + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.source_code.used_table"): + owner = ownership.owner_of(used_table) + assert owner == "John Doe" + assert f"Unknown source type 'UNKNOWN' for {used_table.source_id}" in caplog.messages + administrator_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of_path.assert_not_called()