From d5654d0a446858a722453af46ea1a17c6bb12675 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 15 Nov 2024 08:52:16 +0100 Subject: [PATCH 1/5] Add direct fs access progress --- .../labs/ucx/progress/directfs_access.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 src/databricks/labs/ucx/progress/directfs_access.py diff --git a/src/databricks/labs/ucx/progress/directfs_access.py b/src/databricks/labs/ucx/progress/directfs_access.py new file mode 100644 index 0000000000..b884de13bd --- /dev/null +++ b/src/databricks/labs/ucx/progress/directfs_access.py @@ -0,0 +1,45 @@ +from dataclasses import replace + +from databricks.labs.lsql.backends import SqlBackend + +from databricks.labs.ucx.progress.history import ProgressEncoder +from databricks.labs.ucx.progress.install import Historical +from databricks.labs.ucx.source_code.base import DirectFsAccess +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessOwnership + + +class DirectFsAccessProgressEncoder(ProgressEncoder[DirectFsAccess]): + """Encoder class:DirectFsAccess to class:History. + + A direct filesystem is by definition a failure as it is not supported in Unity Catalog. + """ + + def __init__( + self, + sql_backend: SqlBackend, + ownership: DirectFsAccessOwnership, + run_id: int, + workspace_id: int, + catalog: str, + schema: str = "multiworkspace", + table: str = "historical", + ) -> None: + super().__init__( + sql_backend, + ownership, + DirectFsAccess, + run_id, + workspace_id, + catalog, + schema, + table, + ) + + def _encode_record_as_historical(self, record: DirectFsAccess) -> Historical: + """Encode record as historical. + + A direct filesystem is by definition a failure as it is not supported in Unity Catalog. + """ + historical = super()._encode_record_as_historical(record) + failure = "Direct filesystem access is not supported in Unity Catalog" + return replace(historical, failures=historical.failures + [failure]) From a9e6b3cac6ba2f63f17179dc5c35a2d963369cbc Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 15 Nov 2024 09:31:37 +0100 Subject: [PATCH 2/5] Add id attributes to direct fs access --- src/databricks/labs/ucx/source_code/base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 659b38b2b7..26806b08b1 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Any, BinaryIO, TextIO +from typing import Any, BinaryIO, ClassVar, TextIO from astroid import NodeNG # type: ignore from sqlglot import Expression, parse as parse_sql @@ -290,6 +290,12 @@ class DirectFsAccess(SourceInfo): is_read: bool = False is_write: bool = False + # TODO: The ids are expected to be unique, but the `UNKNOWN` might not be + __id_attributes__: ClassVar[tuple[str, ...]] = ( + "source_id", + "path", + ) + @dataclass class DirectFsAccessNode: From 099bf88ee8de9823cbe9678a546db883a9daf525 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 15 Nov 2024 09:32:07 +0100 Subject: [PATCH 3/5] Test direct fs access progress encoder --- tests/unit/progress/test_directfs_access.py | 37 +++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 tests/unit/progress/test_directfs_access.py diff --git a/tests/unit/progress/test_directfs_access.py b/tests/unit/progress/test_directfs_access.py new file mode 100644 index 0000000000..d908967d8e --- /dev/null +++ b/tests/unit/progress/test_directfs_access.py @@ -0,0 +1,37 @@ +import datetime as dt +from unittest.mock import create_autospec + +from databricks.labs.ucx.framework.owners import Ownership +from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.progress.directfs_access import DirectFsAccessProgressEncoder +from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom + + +def test_direct_filesystem_access_progress_encoder_failures(mock_backend) -> None: + """A direct filesystem access is a failure by definition as it is not supported by Unity Catalog.""" + direct_filesystem_access = DirectFsAccess( + path="dbfs://folder/file.csv", + is_read=True, + is_write=False, + source_id="path/to/file.py", + source_lineage=[LineageAtom(object_type="FILE", object_id="path/to/file.py")], + source_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_start_timestamp=dt.datetime.now(dt.timezone.utc), + assessment_end_timestamp=dt.datetime.now(dt.timezone.utc), + ) + ownership = create_autospec(Ownership) + ownership.owner_of.return_value = "user" + encoder = DirectFsAccessProgressEncoder( + mock_backend, + ownership, + run_id=1, + workspace_id=123456789, + catalog="test", + ) + + encoder.append_inventory_snapshot([direct_filesystem_access]) + + rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") + assert len(rows) > 0, f"No rows written for: {encoder.full_name}" + assert rows[0].failures == ["Direct filesystem access is not supported in Unity Catalog"] + ownership.owner_of.assert_called_once() From 7edee0c4fbd1cbfe5ae10880dd2899ab0c553493 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 15 Nov 2024 09:36:31 +0100 Subject: [PATCH 4/5] Add direct fs access progress encoder to runtime context --- src/databricks/labs/ucx/contexts/workflow_task.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index a3656b290b..3bdccedcca 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -25,6 +25,7 @@ from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder +from databricks.labs.ucx.progress.directfs_access import DirectFsAccessProgressEncoder from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.jobs import JobsProgressEncoder @@ -242,6 +243,16 @@ def udfs_progress(self) -> ProgressEncoder[Udf]: self.config.ucx_catalog, ) + @cached_property + def directfs_access_progress(self) -> DirectFsAccessProgressEncoder: + return DirectFsAccessProgressEncoder( + self.sql_backend, + self.directfs_access_ownership, + self.parent_run_id, + self.workspace_id, + self.config.ucx_catalog, + ) + @cached_property def migration_sequencer(self) -> MigrationSequencer: return MigrationSequencer(self.workspace_client, self.administrator_locator) From f94168db61ede097d4813017c60906996a1b55f6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 15 Nov 2024 09:38:12 +0100 Subject: [PATCH 5/5] Append inventory snapshot in assess dashboards --- src/databricks/labs/ucx/progress/workflows.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 060b2fdccf..fbda5d4d99 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -149,6 +149,7 @@ def assess_dashboards(self, ctx: RuntimeContext): Also stores direct filesystem accesses for display in the migration dashboard.""" # TODO: Ensure these are captured in the history log. ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database) + ctx.directfs_access_progress.append_inventory_snapshot(ctx.directfs_access_crawler_for_queries.snapshot()) @job_task(depends_on=[verify_prerequisites]) def assess_workflows(self, ctx: RuntimeContext):