Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DirectFsAccess progress encoder #3315

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.directfs_access import DirectFsAccessProgressEncoder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
Expand Down Expand Up @@ -242,6 +243,16 @@ def udfs_progress(self) -> ProgressEncoder[Udf]:
self.config.ucx_catalog,
)

@cached_property
def directfs_access_progress(self) -> DirectFsAccessProgressEncoder:
return DirectFsAccessProgressEncoder(
self.sql_backend,
self.directfs_access_ownership,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def migration_sequencer(self) -> MigrationSequencer:
return MigrationSequencer(self.workspace_client, self.administrator_locator)
45 changes: 45 additions & 0 deletions src/databricks/labs/ucx/progress/directfs_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from dataclasses import replace

from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical
from databricks.labs.ucx.source_code.base import DirectFsAccess
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessOwnership


class DirectFsAccessProgressEncoder(ProgressEncoder[DirectFsAccess]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it belong to a notebook/sql query/dashboard, rather than being it's own top-level entity?

"""Encoder class:DirectFsAccess to class:History.

A direct filesystem is by definition a failure as it is not supported in Unity Catalog.
"""

def __init__(
self,
sql_backend: SqlBackend,
ownership: DirectFsAccessOwnership,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(
sql_backend,
ownership,
DirectFsAccess,
run_id,
workspace_id,
catalog,
schema,
table,
)

def _encode_record_as_historical(self, record: DirectFsAccess) -> Historical:
"""Encode record as historical.

A direct filesystem is by definition a failure as it is not supported in Unity Catalog.
"""
historical = super()._encode_record_as_historical(record)
failure = "Direct filesystem access is not supported in Unity Catalog"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
failure = "Direct filesystem access is not supported in Unity Catalog"
failure = "Direct filesystem access is not supported in Unity Catalog: {record.path}"

and add it to a notebook/view/sql query/dashboard

return replace(historical, failures=historical.failures + [failure])
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def assess_dashboards(self, ctx: RuntimeContext):
Also stores direct filesystem accesses for display in the migration dashboard."""
# TODO: Ensure these are captured in the history log.
ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)
ctx.directfs_access_progress.append_inventory_snapshot(ctx.directfs_access_crawler_for_queries.snapshot())

@job_task(depends_on=[verify_prerequisites])
def assess_workflows(self, ctx: RuntimeContext):
Expand Down
8 changes: 7 additions & 1 deletion src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, BinaryIO, TextIO
from typing import Any, BinaryIO, ClassVar, TextIO

from astroid import NodeNG # type: ignore
from sqlglot import Expression, parse as parse_sql
Expand Down Expand Up @@ -290,6 +290,12 @@ class DirectFsAccess(SourceInfo):
is_read: bool = False
is_write: bool = False

# TODO: The ids are expected to be unique, but the `UNKNOWN` might not be
__id_attributes__: ClassVar[tuple[str, ...]] = (
"source_id",
"path",
)


@dataclass
class DirectFsAccessNode:
Expand Down
37 changes: 37 additions & 0 deletions tests/unit/progress/test_directfs_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import datetime as dt
from unittest.mock import create_autospec

from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.progress.directfs_access import DirectFsAccessProgressEncoder
from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom


def test_direct_filesystem_access_progress_encoder_failures(mock_backend) -> None:
"""A direct filesystem access is a failure by definition as it is not supported by Unity Catalog."""
direct_filesystem_access = DirectFsAccess(
path="dbfs://folder/file.csv",
is_read=True,
is_write=False,
source_id="path/to/file.py",
source_lineage=[LineageAtom(object_type="FILE", object_id="path/to/file.py")],
source_timestamp=dt.datetime.now(dt.timezone.utc),
assessment_start_timestamp=dt.datetime.now(dt.timezone.utc),
assessment_end_timestamp=dt.datetime.now(dt.timezone.utc),
)
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
encoder = DirectFsAccessProgressEncoder(
mock_backend,
ownership,
run_id=1,
workspace_id=123456789,
catalog="test",
)

encoder.append_inventory_snapshot([direct_filesystem_access])

rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert rows[0].failures == ["Direct filesystem access is not supported in Unity Catalog"]
ownership.owner_of.assert_called_once()
Loading