From 3b541e5f2abe91ee435a51c1f66f623d8611bc40 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Thu, 18 Jul 2024 14:03:24 +1000 Subject: [PATCH 01/16] Detect tables that are not present in the mapping file Introduces #1221 --- .../labs/ucx/hive_metastore/mapping.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/databricks/labs/ucx/hive_metastore/mapping.py b/src/databricks/labs/ucx/hive_metastore/mapping.py index b0d258dadd..4970021479 100644 --- a/src/databricks/labs/ucx/hive_metastore/mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/mapping.py @@ -66,6 +66,18 @@ def as_uc_table_key(self): def as_hms_table_key(self): return f"hive_metastore.{self.src_schema}.{self.src_table}" +@dataclass +class TableNotMapped: + workspace_name: str + src_table: str + + @classmethod + def initial(cls, workspace_name: str, table: str) -> "TableNotMapped": + return cls( + workspace_name=workspace_name, + src_table=table, + ) + @dataclass class TableToMigrate: @@ -81,6 +93,7 @@ def __eq__(self, other): class TableMapping: FILENAME = 'mapping.csv' + FILENAME_UNMAPPED = 'unmapped_tables.csv' UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" def __init__( @@ -103,10 +116,21 @@ def current_tables(self, tables: TablesCrawler, workspace_name: str, catalog_nam for table in tables_snapshot: yield Rule.initial(workspace_name, catalog_name, table, self._recon_tolerance_percent) + @staticmethod + def tables_not_mapped(tables_crawler: TablesCrawler, current_tables: list[Rule], workspace_name: str): + crawled_tables_keys = [crawled_table.key for crawled_table in tables_crawler.snapshot()] + hms_table_keys = [rule.as_hms_table_key for rule in current_tables] + for crawled_table_key in crawled_tables_keys: + if crawled_table_key not in hms_table_keys: + yield TableNotMapped.initial(workspace_name, crawled_table_key) + def save(self, tables: TablesCrawler, workspace_info: WorkspaceInfo) -> str: workspace_name = workspace_info.current() default_catalog_name = re.sub(r"\W+", "_", workspace_name) current_tables = self.current_tables(tables, workspace_name, default_catalog_name) + unmapped_tables = self.tables_not_mapped(tables, list(current_tables), workspace_name) + if len(unmapped_tables) != 0: + self._installation.save(list(unmapped_tables), filename=self.FILENAME_UNMAPPED) return self._installation.save(list(current_tables), filename=self.FILENAME) def load(self) -> list[Rule]: From 6d063da0cbfb58b2a06eade79fb32d5490d5622e Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Wed, 24 Jul 2024 22:30:22 +1000 Subject: [PATCH 02/16] Revert "Detect tables that are not present in the mapping file" This reverts commit 61282187343f8fe5ef81593c0acea67395f2bbe4. --- .../labs/ucx/hive_metastore/mapping.py | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/mapping.py b/src/databricks/labs/ucx/hive_metastore/mapping.py index 4970021479..b0d258dadd 100644 --- a/src/databricks/labs/ucx/hive_metastore/mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/mapping.py @@ -66,18 +66,6 @@ def as_uc_table_key(self): def as_hms_table_key(self): return f"hive_metastore.{self.src_schema}.{self.src_table}" -@dataclass -class TableNotMapped: - workspace_name: str - src_table: str - - @classmethod - def initial(cls, workspace_name: str, table: str) -> "TableNotMapped": - return cls( - workspace_name=workspace_name, - src_table=table, - ) - @dataclass class TableToMigrate: @@ -93,7 +81,6 @@ def __eq__(self, other): class TableMapping: FILENAME = 'mapping.csv' - FILENAME_UNMAPPED = 'unmapped_tables.csv' UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip" def __init__( @@ -116,21 +103,10 @@ def current_tables(self, tables: TablesCrawler, workspace_name: str, catalog_nam for table in tables_snapshot: yield Rule.initial(workspace_name, catalog_name, table, self._recon_tolerance_percent) - @staticmethod - def tables_not_mapped(tables_crawler: TablesCrawler, current_tables: list[Rule], workspace_name: str): - crawled_tables_keys = [crawled_table.key for crawled_table in tables_crawler.snapshot()] - hms_table_keys = [rule.as_hms_table_key for rule in current_tables] - for crawled_table_key in crawled_tables_keys: - if crawled_table_key not in hms_table_keys: - yield TableNotMapped.initial(workspace_name, crawled_table_key) - def save(self, tables: TablesCrawler, workspace_info: WorkspaceInfo) -> str: workspace_name = workspace_info.current() default_catalog_name = re.sub(r"\W+", "_", workspace_name) current_tables = self.current_tables(tables, workspace_name, default_catalog_name) - unmapped_tables = self.tables_not_mapped(tables, list(current_tables), workspace_name) - if len(unmapped_tables) != 0: - self._installation.save(list(unmapped_tables), filename=self.FILENAME_UNMAPPED) return self._installation.save(list(current_tables), filename=self.FILENAME) def load(self) -> list[Rule]: From ed1dec57d4cf1c043a5b945956ddb53f19a8b69c Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Thu, 25 Jul 2024 12:01:18 +1000 Subject: [PATCH 03/16] Detect tables that are not present in the mapping file Introduces #1221 --- src/databricks/labs/ucx/hive_metastore/table_migrate.py | 7 +++++++ src/databricks/labs/ucx/hive_metastore/workflows.py | 4 ++++ tests/unit/hive_metastore/test_workflows.py | 5 +++++ 3 files changed, 16 insertions(+) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index cfab0dab9f..34123253bd 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -57,6 +57,13 @@ def __init__( self._seen_tables: dict[str, str] = {} self._principal_grants = principal_grants + def not_migrated_refresh(self) -> list[Table]: + table_rows: list[Table] = [] + for crawled_table in self._tc.snapshot(): + if not self.is_migrated(crawled_table.database, crawled_table.name): + table_rows.append(crawled_table) + return table_rows # depending on how to publish this data, we may need to convert it to other forms to able to show it in the dashboard + def index(self): return self._migration_status_refresher.index() diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index eec269fe43..e25aa92950 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -73,6 +73,10 @@ def refresh_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.index_full_refresh() + @job_task(job_cluster="table_migration", depends_on=[migrate_external_tables_sync, migrate_dbfs_root_delta_tables, migrate_dbfs_root_non_delta_tables,migrate_views]) + def refresh_not_migrated_status(self, ctx: RuntimeContext): + """Refresh the not migrated tables status to present it in the dashboard.""" + ctx.tables_migrator.not_migrated_refresh() class MigrateHiveSerdeTablesInPlace(Workflow): def __init__(self): diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 2cab1fd86c..0a7095f4c5 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -70,3 +70,8 @@ def test_refresh_migration_status_is_refreshed(run_workflow, workflow): assert "DELETE FROM hive_metastore.ucx.migration_status" in ctx.sql_backend.queries assert "SHOW DATABASES" in ctx.sql_backend.queries # No "SHOW TABLE FROM" query as table are not mocked + +#TODO: create a unit test for the new task in the workflow +# def test_refresh_not_migrated_status_is_refreshed(run_workflow): +# ctx = run_workflow(TableMigration.refresh_not_migrated_status) +# # ctx.workspace_client.catalogs.list.assert_called() From dc3d086a992d2d2c6d1bc64cbef5e2e7fac646b0 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Sun, 28 Jul 2024 13:32:57 +1000 Subject: [PATCH 04/16] Updating table_migrate.py as per the comments --- .../labs/ucx/hive_metastore/table_migrate.py | 19 +++++++++++++++--- .../labs/ucx/hive_metastore/workflows.py | 7 +++++-- .../assessment/main/40_4_remaining_tables.sql | 4 ++++ .../hive_metastore/test_workflows.py | 20 +++++++++++++++++++ 4 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 34123253bd..d34318d6db 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -57,12 +57,22 @@ def __init__( self._seen_tables: dict[str, str] = {} self._principal_grants = principal_grants - def not_migrated_refresh(self) -> list[Table]: + def get_remaining_tables(self) -> list[Table]: table_rows: list[Table] = [] for crawled_table in self._tc.snapshot(): - if not self.is_migrated(crawled_table.database, crawled_table.name): + if not self._is_migrated(crawled_table.database, crawled_table.name): table_rows.append(crawled_table) - return table_rows # depending on how to publish this data, we may need to convert it to other forms to able to show it in the dashboard + logger.info(f"remained-table-to-migrate: {crawled_table.key}") + return table_rows + + + # def get_remaining_tables(self, workspace_name) -> list[Table]: + # table_rows: list[Table] = [] + # for crawled_table in self._tc.snapshot(): + # if not self._is_migrated(crawled_table.database, crawled_table.name): + # table_rows.append(crawled_table) + # logger.info(f"remained-table-to-migrate: {crawled_table.key} in {workspace_name}") + # return table_rows def index(self): return self._migration_status_refresher.index() @@ -496,3 +506,6 @@ def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int): f"('upgraded_from' = '{source}'" f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');" ) + def _is_migrated(self, schema: str, table: str) -> bool: + index = self._migration_status_refresher.index() + return index.is_migrated(schema, table) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index e25aa92950..ee19c2b19e 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -73,10 +73,13 @@ def refresh_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.index_full_refresh() - @job_task(job_cluster="table_migration", depends_on=[migrate_external_tables_sync, migrate_dbfs_root_delta_tables, migrate_dbfs_root_non_delta_tables,migrate_views]) + @job_task(job_cluster="table_migration", depends_on=[migrate_external_tables_sync, migrate_dbfs_root_delta_tables, + migrate_dbfs_root_non_delta_tables, migrate_views, refresh_migration_status]) def refresh_not_migrated_status(self, ctx: RuntimeContext): """Refresh the not migrated tables status to present it in the dashboard.""" - ctx.tables_migrator.not_migrated_refresh() + ctx.tables_migrator.get_remaining_tables( + # workspace_name=ctx.workspace_info.current() + ) class MigrateHiveSerdeTablesInPlace(Workflow): def __init__(self): diff --git a/src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql b/src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql new file mode 100644 index 0000000000..0b8840a003 --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql @@ -0,0 +1,4 @@ +/* --title 'List of remaining tables in HMS' --width 6 */ +SELECT message +FROM inventory.logs +WHERE message LIKE 'remained-table-to-migrate: %' diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 84bcf5e46d..a67f9fc677 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -97,3 +97,23 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name except NotFound: assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + +#TODO: add an integration test for refresh_not_migrated_status task in migrate-tables workflow +@pytest.mark.parametrize('prepare_tables_for_migration', [('regular')], indirect=True) +def test_refresh_not_migrated_status_job(ws, installation_ctx, prepare_tables_for_migration): + tables, dst_schema = prepare_tables_for_migration + ctx = installation_ctx.replace( + extend_prompts={ + r".*Do you want to update the existing installation?.*": 'yes', + }, + ) + ctx.workspace_installation.run() + ctx.deployed_workflows.run_workflow("migrate-tables") + # assert the workflow is successful + assert ctx.deployed_workflows.validate_step("migrate-tables") +# # assert the tables are migrated +# for table in tables.values(): +# try: +# assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name +# except NotFound: +# assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" From 9a5ba5306792eb2a8a0068146c94639f4f3ae490 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Mon, 5 Aug 2024 13:37:02 +1000 Subject: [PATCH 05/16] Incorporating the review comments --- src/databricks/labs/ucx/hive_metastore/workflows.py | 5 ----- .../main/04_1_remaining_tables.sql} | 0 tests/integration/hive_metastore/test_workflows.py | 2 +- 3 files changed, 1 insertion(+), 6 deletions(-) rename src/databricks/labs/ucx/queries/{assessment/main/40_4_remaining_tables.sql => migration/main/04_1_remaining_tables.sql} (100%) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index ee19c2b19e..bb80f36d63 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -72,11 +72,6 @@ def migrate_views(self, ctx: RuntimeContext): def refresh_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.index_full_refresh() - - @job_task(job_cluster="table_migration", depends_on=[migrate_external_tables_sync, migrate_dbfs_root_delta_tables, - migrate_dbfs_root_non_delta_tables, migrate_views, refresh_migration_status]) - def refresh_not_migrated_status(self, ctx: RuntimeContext): - """Refresh the not migrated tables status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables( # workspace_name=ctx.workspace_info.current() ) diff --git a/src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_tables.sql similarity index 100% rename from src/databricks/labs/ucx/queries/assessment/main/40_4_remaining_tables.sql rename to src/databricks/labs/ucx/queries/migration/main/04_1_remaining_tables.sql diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index a67f9fc677..7cfc53d72f 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -99,7 +99,7 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" #TODO: add an integration test for refresh_not_migrated_status task in migrate-tables workflow -@pytest.mark.parametrize('prepare_tables_for_migration', [('regular')], indirect=True) +@pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) def test_refresh_not_migrated_status_job(ws, installation_ctx, prepare_tables_for_migration): tables, dst_schema = prepare_tables_for_migration ctx = installation_ctx.replace( From ca0604128bd042921ad673d96cc3c5f5e3b00406 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Mon, 5 Aug 2024 16:46:49 +1000 Subject: [PATCH 06/16] Adding the latest changes - yet not final --- .../labs/ucx/hive_metastore/table_migrate.py | 2 +- .../labs/ucx/hive_metastore/workflows.py | 1 + .../hive_metastore/test_workflows.py | 5 +- tests/unit/hive_metastore/test_workflows.py | 87 +++++++++++++++++-- 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index d34318d6db..fb18bceb29 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -65,7 +65,6 @@ def get_remaining_tables(self) -> list[Table]: logger.info(f"remained-table-to-migrate: {crawled_table.key}") return table_rows - # def get_remaining_tables(self, workspace_name) -> list[Table]: # table_rows: list[Table] = [] # for crawled_table in self._tc.snapshot(): @@ -506,6 +505,7 @@ def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int): f"('upgraded_from' = '{source}'" f" , '{table.UPGRADED_FROM_WS_PARAM}' = '{ws_id}');" ) + def _is_migrated(self, schema: str, table: str) -> bool: index = self._migration_status_refresher.index() return index.is_migrated(schema, table) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index bb80f36d63..cf9c438f92 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -76,6 +76,7 @@ def refresh_migration_status(self, ctx: RuntimeContext): # workspace_name=ctx.workspace_info.current() ) + class MigrateHiveSerdeTablesInPlace(Workflow): def __init__(self): super().__init__('migrate-external-hiveserde-tables-in-place-experimental') diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 7cfc53d72f..2fe00198f7 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -98,7 +98,8 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables except NotFound: assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" -#TODO: add an integration test for refresh_not_migrated_status task in migrate-tables workflow + +# TODO: add an integration test for refresh_not_migrated_status task in migrate-tables workflow @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) def test_refresh_not_migrated_status_job(ws, installation_ctx, prepare_tables_for_migration): tables, dst_schema = prepare_tables_for_migration @@ -111,6 +112,8 @@ def test_refresh_not_migrated_status_job(ws, installation_ctx, prepare_tables_fo ctx.deployed_workflows.run_workflow("migrate-tables") # assert the workflow is successful assert ctx.deployed_workflows.validate_step("migrate-tables") + + # # assert the tables are migrated # for table in tables.values(): # try: diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 0a7095f4c5..0669afa002 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -1,5 +1,6 @@ import pytest - +import logging +from unittest.mock import create_autospec from databricks.labs.ucx.hive_metastore.workflows import ( TableMigration, MigrateExternalTablesCTAS, @@ -8,6 +9,17 @@ ScanTablesInMounts, ) +from databricks.labs.ucx.hive_metastore.tables import ( + Table, + TablesCrawler, +) + +from databricks.labs.ucx.hive_metastore.migration_status import ( + MigrationStatusRefresher, + MigrationIndex, + MigrationStatus, +) + def test_migrate_external_tables_sync(run_workflow): ctx = run_workflow(TableMigration.migrate_external_tables_sync) @@ -71,7 +83,72 @@ def test_refresh_migration_status_is_refreshed(run_workflow, workflow): assert "SHOW DATABASES" in ctx.sql_backend.queries # No "SHOW TABLE FROM" query as table are not mocked -#TODO: create a unit test for the new task in the workflow -# def test_refresh_not_migrated_status_is_refreshed(run_workflow): -# ctx = run_workflow(TableMigration.refresh_not_migrated_status) -# # ctx.workspace_client.catalogs.list.assert_called() +def test_refresh_migration_status_published_remained_tables(run_workflow): + # class LogCaptureHandler(logging.Handler): + # def __init__(self): + # super().__init__() + # self.records = [] + # + # def emit(self, record): + # self.records.append(record) + + + # # Setup custom log handler + # log_capture_handler = LogCaptureHandler() + # logger = logging.getLogger(__name__) + # logger.addHandler(log_capture_handler) + # logger.setLevel(logging.INFO) + + # Setup mocks + custom_table_crawler = create_autospec(TablesCrawler) + custom_table_crawler.snapshot.return_value = [ + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table1", + location="s3://some_location/table1", + upgraded_to="ucx_default.db1_dst.dst_table1", + ), + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table2", + location="s3://some_location/table2", + upgraded_to="ucx_default.db1_dst.dst_table2", + ), + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table3", + location="s3://some_location/table3", + ), + ] + custom_migration_status_refresher = create_autospec(MigrationStatusRefresher) + migration_index = MigrationIndex( + [ + MigrationStatus("schema1", "table1", "ucx_default", "db1_dst", "dst_table1"), + MigrationStatus("schema1", "table2", "ucx_default", "db1_dst", "dst_table2"), + ] + ) + custom_migration_status_refresher.index.return_value = migration_index + + # Call the method + ctx = run_workflow( + TableMigration.refresh_migration_status, + table_crawler=custom_table_crawler, + migration_status_refresher=custom_migration_status_refresher, + ) + + # # Assert the log message + # log_messages = [record.getMessage() for record in log_capture_handler.records] + # assert 'remained-table-to-migrate: schema1.table3' in log_messages + return ctx.task_run_warning_recorder.snapshot() + + # # Remove the custom log handler + # logger.removeHandler(log_capture_handler) From 832075c16a8fa6ea2a68687406ab1c3a832aff98 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Tue, 6 Aug 2024 08:49:38 +1000 Subject: [PATCH 07/16] Updating table_migrate.py as per the comments --- .../labs/ucx/hive_metastore/table_migrate.py | 13 +--- .../labs/ucx/hive_metastore/workflows.py | 7 +- ...bles.sql => 04_1_remaining_hms_tables.sql} | 2 +- .../unit/hive_metastore/test_table_migrate.py | 60 +++++++++++++++ tests/unit/hive_metastore/test_workflows.py | 75 ++----------------- 5 files changed, 71 insertions(+), 86 deletions(-) rename src/databricks/labs/ucx/queries/migration/main/{04_1_remaining_tables.sql => 04_1_remaining_hms_tables.sql} (63%) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index fb18bceb29..3a5468831c 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -58,21 +58,14 @@ def __init__( self._principal_grants = principal_grants def get_remaining_tables(self) -> list[Table]: - table_rows: list[Table] = [] + self.index_full_refresh() + table_rows = [] for crawled_table in self._tc.snapshot(): if not self._is_migrated(crawled_table.database, crawled_table.name): table_rows.append(crawled_table) - logger.info(f"remained-table-to-migrate: {crawled_table.key}") + logger.info(f"remained-hive-metastore-table: {crawled_table.key}") return table_rows - # def get_remaining_tables(self, workspace_name) -> list[Table]: - # table_rows: list[Table] = [] - # for crawled_table in self._tc.snapshot(): - # if not self._is_migrated(crawled_table.database, crawled_table.name): - # table_rows.append(crawled_table) - # logger.info(f"remained-table-to-migrate: {crawled_table.key} in {workspace_name}") - # return table_rows - def index(self): return self._migration_status_refresher.index() diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index cf9c438f92..0b5cafea08 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -69,12 +69,9 @@ def migrate_views(self, ctx: RuntimeContext): ) @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def refresh_migration_status(self, ctx: RuntimeContext): + def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.index_full_refresh() - ctx.tables_migrator.get_remaining_tables( - # workspace_name=ctx.workspace_info.current() - ) + ctx.tables_migrator.get_remaining_tables() class MigrateHiveSerdeTablesInPlace(Workflow): diff --git a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_tables.sql b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql similarity index 63% rename from src/databricks/labs/ucx/queries/migration/main/04_1_remaining_tables.sql rename to src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql index 0b8840a003..b310733a25 100644 --- a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_tables.sql +++ b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql @@ -1,4 +1,4 @@ /* --title 'List of remaining tables in HMS' --width 6 */ SELECT message FROM inventory.logs -WHERE message LIKE 'remained-table-to-migrate: %' +WHERE message LIKE 'remained-hive-metastore-table: %' diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index f3310186d8..c52e09c659 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1349,3 +1349,63 @@ def test_revert_migrated_tables_failed(caplog): table_migrate = get_table_migrator(backend) table_migrate.revert_migrated_tables(schema="test_schema1") assert "Failed to revert table hive_metastore.test_schema1.test_table1: error" in caplog.text + +def test_refresh_migration_status_published_remained_tables(caplog): + errors = {} + rows = {} + backend = MockBackend(fails_on_first=errors, rows=rows) + table_crawler = create_autospec(TablesCrawler) + grant_crawler = create_autospec(GrantsCrawler) # pylint: disable=mock-no-usage + client = mock_workspace_client() + table_crawler.snapshot.return_value = [ + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table1", + location="s3://some_location/table1", + upgraded_to="ucx_default.db1_dst.dst_table1", + ), + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table2", + location="s3://some_location/table2", + upgraded_to="ucx_default.db1_dst.dst_table2", + ), + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table3", + location="s3://some_location/table3", + ), + ] + group_manager = GroupManager(backend, client, "inventory_database") + table_mapping = mock_table_mapping() + migration_status_refresher = create_autospec(MigrationStatusRefresher) + migration_index = MigrationIndex( + [ + MigrationStatus("schema1", "table1", "ucx_default", "db1_dst", "dst_table1"), + MigrationStatus("schema1", "table2", "ucx_default", "db1_dst", "dst_table2"), + ] + ) + migration_status_refresher.index.return_value = migration_index + principal_grants = create_autospec(PrincipalACL) # pylint: disable=mock-no-usage + table_migrate = TablesMigrator( + table_crawler, + grant_crawler, + client, + backend, + table_mapping, + group_manager, + migration_status_refresher, + principal_grants, + ) + with caplog.at_level(logging.INFO, logger="databricks.labs.ucx.hive_metastore"): + table_migrate.get_remaining_tables() + assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 0669afa002..1de5a13ef3 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -20,6 +20,11 @@ MigrationStatus, ) +from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler +from databricks.labs.lsql.backends import SqlBackend + +from databricks.labs.lsql.backends import MockBackend + def test_migrate_external_tables_sync(run_workflow): ctx = run_workflow(TableMigration.migrate_external_tables_sync) @@ -82,73 +87,3 @@ def test_refresh_migration_status_is_refreshed(run_workflow, workflow): assert "DELETE FROM hive_metastore.ucx.migration_status" in ctx.sql_backend.queries assert "SHOW DATABASES" in ctx.sql_backend.queries # No "SHOW TABLE FROM" query as table are not mocked - -def test_refresh_migration_status_published_remained_tables(run_workflow): - # class LogCaptureHandler(logging.Handler): - # def __init__(self): - # super().__init__() - # self.records = [] - # - # def emit(self, record): - # self.records.append(record) - - - # # Setup custom log handler - # log_capture_handler = LogCaptureHandler() - # logger = logging.getLogger(__name__) - # logger.addHandler(log_capture_handler) - # logger.setLevel(logging.INFO) - - # Setup mocks - custom_table_crawler = create_autospec(TablesCrawler) - custom_table_crawler.snapshot.return_value = [ - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table1", - location="s3://some_location/table1", - upgraded_to="ucx_default.db1_dst.dst_table1", - ), - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table2", - location="s3://some_location/table2", - upgraded_to="ucx_default.db1_dst.dst_table2", - ), - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table3", - location="s3://some_location/table3", - ), - ] - custom_migration_status_refresher = create_autospec(MigrationStatusRefresher) - migration_index = MigrationIndex( - [ - MigrationStatus("schema1", "table1", "ucx_default", "db1_dst", "dst_table1"), - MigrationStatus("schema1", "table2", "ucx_default", "db1_dst", "dst_table2"), - ] - ) - custom_migration_status_refresher.index.return_value = migration_index - - # Call the method - ctx = run_workflow( - TableMigration.refresh_migration_status, - table_crawler=custom_table_crawler, - migration_status_refresher=custom_migration_status_refresher, - ) - - # # Assert the log message - # log_messages = [record.getMessage() for record in log_capture_handler.records] - # assert 'remained-table-to-migrate: schema1.table3' in log_messages - return ctx.task_run_warning_recorder.snapshot() - - # # Remove the custom log handler - # logger.removeHandler(log_capture_handler) From 6550e91f5b234902ff811a458c5d70d93fb4c375 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Tue, 6 Aug 2024 16:08:39 +1000 Subject: [PATCH 08/16] Renaming "update_migration_status" to "refresh_migration_status" --- .../labs/ucx/hive_metastore/workflows.py | 2 +- tests/unit/hive_metastore/test_workflows.py | 14 -------------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 0b5cafea08..a42eebc2eb 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -69,7 +69,7 @@ def migrate_views(self, ctx: RuntimeContext): ) @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def update_migration_status(self, ctx: RuntimeContext): + def refresh_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables() diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 1de5a13ef3..0674c952ad 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -1,6 +1,4 @@ import pytest -import logging -from unittest.mock import create_autospec from databricks.labs.ucx.hive_metastore.workflows import ( TableMigration, MigrateExternalTablesCTAS, @@ -9,21 +7,9 @@ ScanTablesInMounts, ) -from databricks.labs.ucx.hive_metastore.tables import ( - Table, - TablesCrawler, -) -from databricks.labs.ucx.hive_metastore.migration_status import ( - MigrationStatusRefresher, - MigrationIndex, - MigrationStatus, -) -from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler -from databricks.labs.lsql.backends import SqlBackend -from databricks.labs.lsql.backends import MockBackend def test_migrate_external_tables_sync(run_workflow): From 4d0a5e8d16611fd3734a7171ce4e156b6d360cf0 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Wed, 7 Aug 2024 10:26:56 +1000 Subject: [PATCH 09/16] Renaming "refresh_migration_status" to "update_migration_status" * updating the relevant unit test --- .../labs/ucx/hive_metastore/table_migrate.py | 2 +- .../labs/ucx/hive_metastore/workflows.py | 2 +- .../main/04_1_remaining_hms_tables.sql | 6 +++-- .../hive_metastore/test_workflows.py | 25 +++++++++++-------- .../unit/hive_metastore/test_table_migrate.py | 3 ++- tests/unit/hive_metastore/test_workflows.py | 16 +++++------- 6 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 3a5468831c..b659d77acb 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -63,7 +63,7 @@ def get_remaining_tables(self) -> list[Table]: for crawled_table in self._tc.snapshot(): if not self._is_migrated(crawled_table.database, crawled_table.name): table_rows.append(crawled_table) - logger.info(f"remained-hive-metastore-table: {crawled_table.key}") + logger.warning(f"remained-hive-metastore-table: {crawled_table.key}") return table_rows def index(self): diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index a42eebc2eb..0b5cafea08 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -69,7 +69,7 @@ def migrate_views(self, ctx: RuntimeContext): ) @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def refresh_migration_status(self, ctx: RuntimeContext): + def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables() diff --git a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql index b310733a25..4f49716bde 100644 --- a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql +++ b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql @@ -1,4 +1,6 @@ /* --title 'List of remaining tables in HMS' --width 6 */ -SELECT message +SELECT + SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) AS message FROM inventory.logs -WHERE message LIKE 'remained-hive-metastore-table: %' +WHERE + message LIKE 'remained-hive-metastore-table: %'; diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 2fe00198f7..b306d395ad 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -99,9 +99,8 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" -# TODO: add an integration test for refresh_not_migrated_status task in migrate-tables workflow @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_refresh_not_migrated_status_job(ws, installation_ctx, prepare_tables_for_migration): +def test_refresh_not_migrated_status_job(ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog): tables, dst_schema = prepare_tables_for_migration ctx = installation_ctx.replace( extend_prompts={ @@ -109,14 +108,20 @@ def test_refresh_not_migrated_status_job(ws, installation_ctx, prepare_tables_fo }, ) ctx.workspace_installation.run() + second_table = list(tables.values())[1] + ctx.table_mapping.skip_table(dst_schema.name, second_table.name) ctx.deployed_workflows.run_workflow("migrate-tables") - # assert the workflow is successful assert ctx.deployed_workflows.validate_step("migrate-tables") - -# # assert the tables are migrated -# for table in tables.values(): -# try: -# assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name -# except NotFound: -# assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + remained_tables = list( + sql_backend.fetch( + f""" + SELECT + SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) + AS message + FROM {ctx.inventory_database}.logs + WHERE message LIKE 'remained-hive-metastore-table: %' + """ + ) + ) + assert remained_tables[0].message == f'hive_metastore.{dst_schema.name}.{second_table.name}' diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index c52e09c659..f288b2ac1a 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1350,6 +1350,7 @@ def test_revert_migrated_tables_failed(caplog): table_migrate.revert_migrated_tables(schema="test_schema1") assert "Failed to revert table hive_metastore.test_schema1.test_table1: error" in caplog.text + def test_refresh_migration_status_published_remained_tables(caplog): errors = {} rows = {} @@ -1406,6 +1407,6 @@ def test_refresh_migration_status_published_remained_tables(caplog): migration_status_refresher, principal_grants, ) - with caplog.at_level(logging.INFO, logger="databricks.labs.ucx.hive_metastore"): + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore"): table_migrate.get_remaining_tables() assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 0674c952ad..36d8adeaa0 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -8,10 +8,6 @@ ) - - - - def test_migrate_external_tables_sync(run_workflow): ctx = run_workflow(TableMigration.migrate_external_tables_sync) ctx.workspace_client.catalogs.list.assert_called_once() @@ -60,16 +56,16 @@ def test_migrate_ctas_views(run_workflow): @pytest.mark.parametrize( "workflow", [ - TableMigration, - MigrateHiveSerdeTablesInPlace, - MigrateExternalTablesCTAS, - ScanTablesInMounts, - MigrateTablesInMounts, + TableMigration.update_migration_status, + MigrateHiveSerdeTablesInPlace.refresh_migration_status, + MigrateExternalTablesCTAS.refresh_migration_status, + ScanTablesInMounts.refresh_migration_status, + MigrateTablesInMounts.refresh_migration_status, ], ) def test_refresh_migration_status_is_refreshed(run_workflow, workflow): """Migration status is refreshed by deleting and showing new tables""" - ctx = run_workflow(getattr(workflow, "refresh_migration_status")) + ctx = run_workflow(workflow) assert "DELETE FROM hive_metastore.ucx.migration_status" in ctx.sql_backend.queries assert "SHOW DATABASES" in ctx.sql_backend.queries # No "SHOW TABLE FROM" query as table are not mocked From 1cb47a6206aaa6ab0c8a680b0401f130b0f49b08 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Fri, 9 Aug 2024 12:07:24 +1000 Subject: [PATCH 10/16] Incorporated the new round of comments --- .../labs/ucx/hive_metastore/workflows.py | 16 ++++++------ .../main/04_1_remaining_hms_tables.sql | 2 +- .../hive_metastore/test_workflows.py | 25 ++++++++----------- .../unit/hive_metastore/test_table_migrate.py | 13 +++++----- tests/unit/hive_metastore/test_workflows.py | 13 +++++----- 5 files changed, 32 insertions(+), 37 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 0b5cafea08..4debb9de85 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -109,9 +109,9 @@ def migrate_views(self, ctx: RuntimeContext): ) @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def refresh_migration_status(self, ctx: RuntimeContext): + def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.index_full_refresh() + ctx.tables_migrator.get_remaining_tables() class MigrateExternalTablesCTAS(Workflow): @@ -159,9 +159,9 @@ def migrate_views(self, ctx: RuntimeContext): ) @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def refresh_migration_status(self, ctx: RuntimeContext): + def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.index_full_refresh() + ctx.tables_migrator.get_remaining_tables() class ScanTablesInMounts(Workflow): @@ -176,9 +176,9 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): ctx.tables_in_mounts.snapshot() @job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental]) - def refresh_migration_status(self, ctx: RuntimeContext): + def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.index_full_refresh() + ctx.tables_migrator.get_remaining_tables() class MigrateTablesInMounts(Workflow): @@ -191,6 +191,6 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) @job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental]) - def refresh_migration_status(self, ctx: RuntimeContext): + def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.index_full_refresh() + ctx.tables_migrator.get_remaining_tables() diff --git a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql index 4f49716bde..badff4f101 100644 --- a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql +++ b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql @@ -1,4 +1,4 @@ -/* --title 'List of remaining tables in HMS' --width 6 */ +/* --title 'List of remaining tables in HMS' --type table --width 6 */ SELECT SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) AS message FROM inventory.logs diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index b306d395ad..6a86bcb406 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -102,26 +102,21 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) def test_refresh_not_migrated_status_job(ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog): tables, dst_schema = prepare_tables_for_migration - ctx = installation_ctx.replace( - extend_prompts={ - r".*Do you want to update the existing installation?.*": 'yes', - }, - ) - ctx.workspace_installation.run() + installation_ctx.workspace_installation.run() second_table = list(tables.values())[1] - ctx.table_mapping.skip_table(dst_schema.name, second_table.name) - ctx.deployed_workflows.run_workflow("migrate-tables") - assert ctx.deployed_workflows.validate_step("migrate-tables") + installation_ctx.table_mapping.skip_table(dst_schema.name, second_table.name) + installation_ctx.deployed_workflows.run_workflow("migrate-tables") + assert installation_ctx.deployed_workflows.validate_step("migrate-tables") remained_tables = list( sql_backend.fetch( f""" - SELECT - SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) - AS message - FROM {ctx.inventory_database}.logs - WHERE message LIKE 'remained-hive-metastore-table: %' - """ + SELECT + SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) + AS message + FROM {installation_ctx.inventory_database}.logs + WHERE message LIKE 'remained-hive-metastore-table: %' + """ ) ) assert remained_tables[0].message == f'hive_metastore.{dst_schema.name}.{second_table.name}' diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index f288b2ac1a..a960c5f05a 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1352,11 +1352,9 @@ def test_revert_migrated_tables_failed(caplog): def test_refresh_migration_status_published_remained_tables(caplog): - errors = {} - rows = {} - backend = MockBackend(fails_on_first=errors, rows=rows) + backend = MockBackend() table_crawler = create_autospec(TablesCrawler) - grant_crawler = create_autospec(GrantsCrawler) # pylint: disable=mock-no-usage + grant_crawler = create_autospec(GrantsCrawler) client = mock_workspace_client() table_crawler.snapshot.return_value = [ Table( @@ -1396,7 +1394,7 @@ def test_refresh_migration_status_published_remained_tables(caplog): ] ) migration_status_refresher.index.return_value = migration_index - principal_grants = create_autospec(PrincipalACL) # pylint: disable=mock-no-usage + principal_grants = create_autospec(PrincipalACL) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -1408,5 +1406,8 @@ def test_refresh_migration_status_published_remained_tables(caplog): principal_grants, ) with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore"): - table_migrate.get_remaining_tables() + tables = table_migrate.get_remaining_tables() assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages + assert len(tables) == 1 and tables[0].key == "hive_metastore.schema1.table3" + grant_crawler.assert_not_called() + principal_grants.assert_not_called() diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 36d8adeaa0..f143e6de22 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -56,16 +56,15 @@ def test_migrate_ctas_views(run_workflow): @pytest.mark.parametrize( "workflow", [ - TableMigration.update_migration_status, - MigrateHiveSerdeTablesInPlace.refresh_migration_status, - MigrateExternalTablesCTAS.refresh_migration_status, - ScanTablesInMounts.refresh_migration_status, - MigrateTablesInMounts.refresh_migration_status, + TableMigration, + MigrateHiveSerdeTablesInPlace, + MigrateExternalTablesCTAS, + ScanTablesInMounts, + MigrateTablesInMounts, ], ) def test_refresh_migration_status_is_refreshed(run_workflow, workflow): """Migration status is refreshed by deleting and showing new tables""" - ctx = run_workflow(workflow) + ctx = run_workflow(getattr(workflow, "update_migration_status")) assert "DELETE FROM hive_metastore.ucx.migration_status" in ctx.sql_backend.queries assert "SHOW DATABASES" in ctx.sql_backend.queries - # No "SHOW TABLE FROM" query as table are not mocked From e797bacefea6877281af8124b43c363a1f692bf6 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Fri, 9 Aug 2024 14:52:29 +1000 Subject: [PATCH 11/16] Renaming the test names --- tests/integration/hive_metastore/test_workflows.py | 2 +- tests/unit/hive_metastore/test_workflows.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 6a86bcb406..373ef9f096 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -100,7 +100,7 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_refresh_not_migrated_status_job(ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog): +def test_table_migration_job_publishes_remianed_tables(ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog): tables, dst_schema = prepare_tables_for_migration installation_ctx.workspace_installation.run() second_table = list(tables.values())[1] diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index f143e6de22..1dfaf2ccd6 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -63,7 +63,7 @@ def test_migrate_ctas_views(run_workflow): MigrateTablesInMounts, ], ) -def test_refresh_migration_status_is_refreshed(run_workflow, workflow): +def test_update_migration_status(run_workflow, workflow): """Migration status is refreshed by deleting and showing new tables""" ctx = run_workflow(getattr(workflow, "update_migration_status")) assert "DELETE FROM hive_metastore.ucx.migration_status" in ctx.sql_backend.queries From 38615df19eae3cad15df29034a342f3c32d89d26 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Fri, 9 Aug 2024 15:06:08 +1000 Subject: [PATCH 12/16] Formatting fixed --- .../ucx/queries/migration/main/04_1_remaining_hms_tables.sql | 2 +- tests/integration/hive_metastore/test_workflows.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql index badff4f101..54e8bddd43 100644 --- a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql +++ b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql @@ -3,4 +3,4 @@ SELECT SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) AS message FROM inventory.logs WHERE - message LIKE 'remained-hive-metastore-table: %'; + message LIKE 'remained-hive-metastore-table: %' diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 373ef9f096..3f91f525b8 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -100,7 +100,9 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_table_migration_job_publishes_remianed_tables(ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog): +def test_table_migration_job_publishes_remianed_tables( + ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog +): tables, dst_schema = prepare_tables_for_migration installation_ctx.workspace_installation.run() second_table = list(tables.values())[1] From 376375a6a31599054b2b10daca155f211e97533e Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Fri, 9 Aug 2024 15:12:06 +1000 Subject: [PATCH 13/16] Formatting fixed --- .../ucx/queries/migration/main/04_1_remaining_hms_tables.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql index 54e8bddd43..3ea923e8a7 100644 --- a/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql +++ b/src/databricks/labs/ucx/queries/migration/main/04_1_remaining_hms_tables.sql @@ -3,4 +3,4 @@ SELECT SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1) AS message FROM inventory.logs WHERE - message LIKE 'remained-hive-metastore-table: %' + message LIKE 'remained-hive-metastore-table: %' \ No newline at end of file From 17a3aa82143d9f18d0802ed43ba05d679ecc3894 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Wed, 14 Aug 2024 17:24:19 +1000 Subject: [PATCH 14/16] Fix a bug --- tests/unit/hive_metastore/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index ae1e728c0d..0a27e198b0 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -65,6 +65,6 @@ def test_migrate_ctas_views(run_workflow): ) def test_update_migration_status(run_workflow, workflow): """Migration status is refreshed by deleting and showing new tables""" - ctx = run_workflow(getattr(workflow, "refresh_migration_status")) + ctx = run_workflow(getattr(workflow, "update_migration_status")) assert "TRUNCATE TABLE hive_metastore.ucx.migration_status" in ctx.sql_backend.queries assert "SHOW DATABASES" in ctx.sql_backend.queries From 9da5fce8a845873a71b9871476aacb36aff47917 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Thu, 15 Aug 2024 17:00:09 +1000 Subject: [PATCH 15/16] Modify the integration test due to a function change --- tests/integration/hive_metastore/test_workflows.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 3f91f525b8..2dacc16b91 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -1,5 +1,6 @@ import pytest from databricks.sdk.errors import NotFound +from databricks.labs.ucx.hive_metastore.tables import Table @pytest.mark.parametrize( @@ -106,7 +107,14 @@ def test_table_migration_job_publishes_remianed_tables( tables, dst_schema = prepare_tables_for_migration installation_ctx.workspace_installation.run() second_table = list(tables.values())[1] - installation_ctx.table_mapping.skip_table(dst_schema.name, second_table.name) + table = Table( + "hive_metastore", + dst_schema.name, + second_table.name, + object_type="UNKNOWN", + table_format="UNKNOWN", + ) + installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) installation_ctx.deployed_workflows.run_workflow("migrate-tables") assert installation_ctx.deployed_workflows.validate_step("migrate-tables") From a1c4c24a351f74dd257fb3d51feb291a7bc449d9 Mon Sep 17 00:00:00 2001 From: Amin Movahed Date: Thu, 15 Aug 2024 19:02:40 +1000 Subject: [PATCH 16/16] Formatting fix --- tests/integration/hive_metastore/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 2dacc16b91..3990a774e0 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -114,7 +114,7 @@ def test_table_migration_job_publishes_remianed_tables( object_type="UNKNOWN", table_format="UNKNOWN", ) - installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) + installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) installation_ctx.deployed_workflows.run_workflow("migrate-tables") assert installation_ctx.deployed_workflows.validate_step("migrate-tables")