Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UsedTableProgressEncoder #3266

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9aebf2a
Move TableOwnership to separate module
JCZuurmond Nov 12, 2024
1add260
Move table migration ownership to table_migrate module
JCZuurmond Nov 12, 2024
a989e73
Add UsedTableOwnership
JCZuurmond Nov 12, 2024
b26bde9
Test UsedTableOwnership to fall back on workspace administrator
JCZuurmond Nov 12, 2024
0f90dc5
Test UsedTableOwnership to fallback on workspace administrator if not…
JCZuurmond Nov 12, 2024
cd758f0
Test UsedTableOwnership gets owner from query
JCZuurmond Nov 12, 2024
6398f39
Format
JCZuurmond Nov 13, 2024
e91a283
Move used table to fixture
JCZuurmond Nov 13, 2024
9d90fb3
Test propagating used table ownership from code
JCZuurmond Nov 13, 2024
45cdcd5
Test unknown code source type fallback to workspace admin
JCZuurmond Nov 13, 2024
14ebeec
Format
JCZuurmond Nov 13, 2024
5fd8a63
Add asserts on autospec mocks
JCZuurmond Nov 13, 2024
a6c5deb
Add docstring to UsedTableOwnership
JCZuurmond Nov 13, 2024
059d0f7
Format
JCZuurmond Nov 13, 2024
35177f8
Add `from_table` classmethod to UsedTable
JCZuurmond Nov 13, 2024
a0f016e
Reuse UsedTableOwnership in TableOwnership
JCZuurmond Nov 13, 2024
7d4b712
Add UsedTableProgressEncoder
JCZuurmond Nov 13, 2024
bdc4233
Add UsedTableProgressEncoder to RuntimeContext
JCZuurmond Nov 13, 2024
57df37f
Add UsedTableProgressEncoder to MigrationProgress workflow
JCZuurmond Nov 13, 2024
709e3fd
Move Used table unit tests to unit tests module
JCZuurmond Nov 13, 2024
fd95c15
Move base unit tests to unit tests module
JCZuurmond Nov 13, 2024
1f733a8
Format
JCZuurmond Nov 13, 2024
0f5ae17
Add missing part to comment
JCZuurmond Nov 13, 2024
b7f32de
Avoid indirect administrator locator
JCZuurmond Nov 13, 2024
f07aebb
Format
JCZuurmond Nov 14, 2024
61bf68c
Remove redundant pytest.mark.paramtrize
JCZuurmond Nov 14, 2024
743847d
Remove UsedTableProgressEncoder
JCZuurmond Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler, DirectFsAccessOwnership
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler, UsedTableOwnership
from databricks.sdk import AccountClient, WorkspaceClient, core
from databricks.sdk.service import sql

Expand All @@ -44,10 +44,11 @@
)
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership
from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership
from databricks.labs.ucx.hive_metastore.table_migrate import (
TableMigrationStatusRefresher,
TablesMigrator,
TableMigrationOwnership,
)
from databricks.labs.ucx.hive_metastore.table_move import TableMove
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership
Expand Down Expand Up @@ -262,16 +263,19 @@ def tables_crawler(self) -> TablesCrawler:
return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)

@cached_property
def table_ownership(self) -> TableOwnership:
return TableOwnership(
def used_table_ownership(self) -> UsedTableOwnership:
return UsedTableOwnership(
self.administrator_locator,
self.grants_crawler,
self.used_tables_crawler_for_paths,
self.used_tables_crawler_for_queries,
self.legacy_query_ownership,
self.workspace_path_ownership,
)

@cached_property
def table_ownership(self) -> TableOwnership:
return TableOwnership(self.administrator_locator, self.grants_crawler, self.used_table_ownership)
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved

@cached_property
def workspace_path_ownership(self) -> WorkspacePathOwnership:
return WorkspacePathOwnership(self.administrator_locator, self.workspace_client)
Expand Down Expand Up @@ -405,7 +409,7 @@ def migration_status_refresher(self) -> TableMigrationStatusRefresher:

@cached_property
def table_migration_ownership(self) -> TableMigrationOwnership:
return TableMigrationOwnership(self.tables_crawler, self.table_ownership)
return TableMigrationOwnership(self.administrator_locator, self.tables_crawler, self.table_ownership)

@cached_property
def iam_credential_manager(self) -> CredentialManager:
Expand Down
12 changes: 11 additions & 1 deletion src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder, UsedTableProgressEncoder
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder

# As with GlobalContext, service factories unavoidably have a lot of public methods.
Expand Down Expand Up @@ -231,6 +231,16 @@ def tables_progress(self) -> TableProgressEncoder:
self.config.ucx_catalog,
)

@cached_property
def used_table_progress(self) -> UsedTableProgressEncoder:
return UsedTableProgressEncoder(
self.sql_backend,
self.used_table_ownership,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def udfs_progress(self) -> ProgressEncoder[Udf]:
return ProgressEncoder(
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/framework/owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli
super().__init__(administrator_locator)
self._ws = ws

def owner_of_path(self, path: str) -> str:
def owner_of_path(self, path: str) -> str: # TODO: Why is this `owner_of_path` and not `owner_of`
return self.owner_of(WorkspacePath(self._ws, path))

@retried(on=[InternalError], timeout=timedelta(minutes=1))
Expand Down
111 changes: 0 additions & 111 deletions src/databricks/labs/ucx/hive_metastore/ownership.py

This file was deleted.

40 changes: 38 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors.platform import DatabricksError

from databricks.labs.ucx.framework.owners import AdministratorLocator, Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import MigrateGrants
Expand All @@ -18,8 +19,12 @@
TableMapping,
TableToMigrate,
)

from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationStatusRefresher,
TableMigrationIndex,
TableMigrationStatus,
)
from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership
from databricks.labs.ucx.hive_metastore.tables import (
MigrationCount,
Table,
Expand Down Expand Up @@ -594,3 +599,34 @@ def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int):
f"('upgraded_from' = '{source}'"
f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');"
)


class TableMigrationOwnership(Ownership[TableMigrationStatus]):
"""Determine ownership of table migration records in the inventory.

This is the owner of the source table, if (and only if) the source table is present in the inventory.
"""

def __init__(
self,
administrator_locator: AdministratorLocator,
tables_crawler: TablesCrawler,
table_ownership: TableOwnership,
) -> None:
super().__init__(administrator_locator)
self._tables_crawler = tables_crawler
self._table_ownership = table_ownership
self._indexed_tables: dict[tuple[str, str], Table] | None = None

def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]:
index = self._indexed_tables
if index is None or reindex:
snapshot = self._tables_crawler.snapshot()
index = {(table.database, table.name): table for table in snapshot}
self._indexed_tables = index
return index

def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None:
index = self._tables_snapshot_index()
source_table = index.get((record.src_schema, record.src_table), None)
return self._table_ownership.owner_of(source_table) if source_table is not None else None
59 changes: 59 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/table_ownership.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
Separate table ownership module to resolve circular dependency for `:mod:hive_metastore.grants` using `:class:Table`
and `:class:TableOwnership` using the `GrantsCrawler`.
"""

import logging
from functools import cached_property

from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator
from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.source_code.base import UsedTable
from databricks.labs.ucx.source_code.used_table import UsedTableOwnership


logger = logging.getLogger(__name__)


class TableOwnership(Ownership[Table]):
"""Table ownership

Determine ownership of tables in the inventory based on the following rules:
- If a table is owned by a principal through grants, then that principal is the owner.
- Otherwise, fallback on the `UsedTableOwnership`.
"""

def __init__(
self,
administrator_locator: AdministratorLocator,
grants_crawler: GrantsCrawler,
used_table_ownership: UsedTableOwnership,
) -> None:
super().__init__(administrator_locator)
self._grants_crawler = grants_crawler
self._used_table_ownership = used_table_ownership

def _maybe_direct_owner(self, record: Table) -> str | None:
owner = self._maybe_from_grants(record)
if owner:
return owner
# The `is_write` and `is_read` has no effect as the actual `UsedTable` definition comes from snapshots
used_table = UsedTable.from_table(record, is_read=False, is_write=False)
# This call defers the `administrator_locator` to the one of `UsedTableOwnership`, we expect them to be the same
return self._used_table_ownership.owner_of(used_table)

def _maybe_from_grants(self, record: Table) -> str | None:
for grant in self._grants_snapshot:
if not grant.action_type == 'OWN':
continue
object_type, full_name = grant.this_type_and_key()
if object_type == 'TABLE' and full_name == record.key:
return grant.principal
if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}":
return grant.principal
return None

@cached_property
def _grants_snapshot(self):
return self._grants_crawler.snapshot()
42 changes: 41 additions & 1 deletion src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.hive_metastore.table_ownership import TableOwnership
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical
from databricks.labs.ucx.source_code.used_table import UsedTable, UsedTableOwnership


class TableProgressEncoder(ProgressEncoder[Table]):
Expand Down Expand Up @@ -52,3 +53,42 @@ def _encode_record_as_historical(self, record: Table) -> Historical:
if not self._table_migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
return replace(historical, failures=historical.failures + failures)


class UsedTableProgressEncoder(ProgressEncoder[UsedTable]):
"""Encoder class:UsedTable to class:History.

A used table has a failure if its catalog is the hive_metastore.
"""

def __init__(
self,
sql_backend: SqlBackend,
ownership: UsedTableOwnership,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(
sql_backend,
ownership,
UsedTable,
run_id,
workspace_id,
catalog,
schema,
table,
)

def _encode_record_as_historical(self, record: UsedTable) -> Historical:
"""Encode record as historical.

A table failure means that the table is pending migration.
"""
historical = super()._encode_record_as_historical(record)
failures = []
if record.catalog_name == "hive_metastore":
failures.append("Pending migration")
return replace(historical, failures=historical.failures + failures)
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def assess_dashboards(self, ctx: RuntimeContext):
Also stores direct filesystem accesses for display in the migration dashboard."""
# TODO: Ensure these are captured in the history log.
ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)
ctx.used_table_progress.append_inventory_snapshot(ctx.used_tables_crawler_for_queries.snapshot())

@job_task(depends_on=[verify_prerequisites])
def assess_workflows(self, ctx: RuntimeContext):
Expand Down
Loading
Loading