Skip to content

Commit

Permalink
Track DirectFsAccess on JobsProgressEncoder (#3375)
Browse files Browse the repository at this point in the history
## Changes

Track `DirectFsAccess` on `JobsProgressEncoder`

### Linked issues

Resolves #3059

### Functionality

- [x] modified existing workflow: `migration-progress-experimental`

### Tests

- [x] added unit tests
- [x] added integration tests
  • Loading branch information
JCZuurmond authored Nov 26, 2024
1 parent bccc103 commit 438ffc8
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def jobs_progress(self) -> ProgressEncoder[JobInfo]:
return JobsProgressEncoder(
self.sql_backend,
self.job_ownership,
[self.directfs_access_crawler_for_paths, self.directfs_access_crawler_for_queries],
self.inventory_database,
self.parent_run_id,
self.workspace_id,
Expand Down
41 changes: 36 additions & 5 deletions src/databricks/labs/ucx/progress/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@
from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
from databricks.labs.ucx.source_code.jobs import JobProblem


class JobsProgressEncoder(ProgressEncoder[JobInfo]):
"""Encoder class:Job to class:History."""

def __init__(
self,
sql_backend: SqlBackend,
ownership: JobOwnership,
direct_fs_access_crawlers: list[DirectFsAccessCrawler],
inventory_database: str,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(
sql_backend,
Expand All @@ -30,9 +31,10 @@ def __init__(
run_id,
workspace_id,
catalog,
schema,
table,
"multiworkspace",
"historical",
)
self._direct_fs_access_crawlers = direct_fs_access_crawlers
self._inventory_database = inventory_database

@cached_property
Expand All @@ -48,7 +50,36 @@ def _job_problems(self) -> dict[int, list[str]]:
index[job_problem.job_id].append(failure)
return index

@cached_property
def _direct_fs_accesses(self) -> dict[str, list[str]]:
index = collections.defaultdict(list)
for crawler in self._direct_fs_access_crawlers:
for direct_fs_access in crawler.snapshot():
# The workflow and task source lineage are added by the WorkflowLinter
if len(direct_fs_access.source_lineage) < 2:
continue
if direct_fs_access.source_lineage[0].object_type != "WORKFLOW":
continue
if direct_fs_access.source_lineage[1].object_type != "TASK":
continue
job_id = direct_fs_access.source_lineage[0].object_id
task_key = direct_fs_access.source_lineage[1].object_id # <job id>/<task key>
# Follow same failure message structure as the JobProblems above and DirectFsAccessPyLinter deprecation
code = "direct-filesystem-access"
message = f"The use of direct filesystem references is deprecated: {direct_fs_access.path}"
failure = f"{code}: {task_key} task: {direct_fs_access.source_id}: {message}"
index[job_id].append(failure)
return index

def _encode_record_as_historical(self, record: JobInfo) -> Historical:
"""Encode a job as a historical records.
Failures are detected by the WorkflowLinter:
- Job problems
- Direct filesystem access by code used in job
"""
historical = super()._encode_record_as_historical(record)
failures = self._job_problems.get(int(record.job_id), [])
failures = []
failures.extend(self._job_problems.get(int(record.job_id), []))
failures.extend(self._direct_fs_accesses.get(record.job_id, []))
return replace(historical, failures=historical.failures + failures)
3 changes: 2 additions & 1 deletion src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None:
Providing a multi-entity crawler is out-of-scope of this PR
"""
try:
# TODO until we historize data, we append all DFSAs
# TODO: Until we historize data, we append all DFSAs
# UPDATE: We historize DFSA from WorkflowLinter, not from QueryLinter yet
self._update_snapshot(dfsas, mode="append")
except DatabricksError as e:
logger.error("Failed to store DFSAs", exc_info=e)
Expand Down
26 changes: 16 additions & 10 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,20 +570,26 @@ def make_cluster_policy(self, **kwargs) -> CreatePolicyResponse:
def make_cluster_policy_permissions(self, **kwargs):
return self._make_cluster_policy_permissions(**kwargs)

def make_job(self, **kwargs) -> Job:
job = self._make_job(**kwargs)
self._jobs.append(job)
return job

def make_dashboard(self, **kwargs) -> Dashboard:
dashboard = self._make_dashboard(**kwargs)
self._dashboards.append(dashboard)
return dashboard

def make_linting_resources(self) -> None:
"""Make resources to lint."""
notebook_job_1 = self._make_job(content="spark.read.parquet('dbfs://mnt/notebook/')")
notebook_job_2 = self._make_job(content="spark.table('old.stuff')")
file_job_1 = self._make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask)
file_job_2 = self._make_job(content="spark.table('some.table')", task_type=SparkPythonTask)
self.make_job(content="spark.read.parquet('dbfs://mnt/notebook/')")
self.make_job(content="spark.table('old.stuff')")
self.make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask)
self.make_job(content="spark.table('some.table')", task_type=SparkPythonTask)
query_1 = self._make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo2/bar2`')
dashboard_1 = self._make_dashboard(query=query_1)
self._make_dashboard(query=query_1)
query_2 = self._make_query(sql_query='SELECT * from my_schema.my_table')
dashboard_2 = self._make_dashboard(query=query_2)

self._jobs.extend([notebook_job_1, notebook_job_2, file_job_1, file_job_2])
self._dashboards.append(dashboard_1)
self._dashboards.append(dashboard_2)
self._make_dashboard(query=query_2)

def add_table(self, table: TableInfo):
self._tables.append(table)
Expand Down
86 changes: 86 additions & 0 deletions tests/integration/progress/test_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from databricks.labs.ucx.assessment.jobs import JobInfo
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom
from databricks.labs.ucx.source_code.jobs import JobProblem


def test_job_progress_encoder_failures(runtime_ctx, az_cli_ctx) -> None:
az_cli_ctx.progress_tracking_installation.run()
runtime_ctx = runtime_ctx.replace(
parent_run_id=1,
sql_backend=az_cli_ctx.sql_backend,
ucx_catalog=az_cli_ctx.ucx_catalog,
)

job = runtime_ctx.make_job()
assert job.job_id, "Expected job with id"
assert job.settings and job.settings.tasks, "Expected job with tasks"

job_problems = [
JobProblem(
job_id=job.job_id,
job_name=job.settings.name,
task_key=job.settings.tasks[0].task_key,
path="parent/child.py",
code="sql-parse-error",
message="Could not parse SQL",
start_line=12,
start_col=0,
end_line=12,
end_col=20,
)
]
runtime_ctx.sql_backend.save_table(
f'{runtime_ctx.inventory_database}.workflow_problems',
job_problems,
JobProblem,
mode='overwrite',
)

dashboard = runtime_ctx.make_dashboard()

direct_fs_access_for_path = DirectFsAccess(
source_id="/path/to/write_dfsa.py",
source_lineage=[
LineageAtom(object_type="WORKFLOW", object_id=str(job.job_id), other={"name": job.settings.name}),
LineageAtom(object_type="TASK", object_id=job.settings.tasks[0].task_key),
],
path="dfsa:/path/to/data/",
is_read=False,
is_write=True,
)
runtime_ctx.directfs_access_crawler_for_paths.dump_all([direct_fs_access_for_path])

direct_fs_access_for_query = DirectFsAccess(
source_id="/path/to/write_dfsa.py",
source_lineage=[
LineageAtom(
object_type="DASHBOARD",
object_id=dashboard.id,
other={"parent": dashboard.parent, "name": dashboard.name},
),
LineageAtom(object_type="QUERY", object_id=f"{dashboard.id}/query", other={"name": "test"}),
],
path="dfsa:/path/to/data/",
is_read=False,
is_write=True,
)
runtime_ctx.directfs_access_crawler_for_queries.dump_all([direct_fs_access_for_query])

job_info = JobInfo(
str(job.job_id),
success=1,
failures="[]",
job_name=job.settings.name,
creator=job.creator_user_name,
)
runtime_ctx.jobs_progress.append_inventory_snapshot([job_info])

history_table_name = escape_sql_identifier(runtime_ctx.tables_progress.full_name)
records = list(runtime_ctx.sql_backend.fetch(f"SELECT * FROM {history_table_name}"))

assert len(records) == 1, "Expected one historical entry"
assert records[0].failures == [
f"sql-parse-error: {job.settings.tasks[0].task_key} task: parent/child.py: Could not parse SQL",
f"direct-filesystem-access: {job.settings.tasks[0].task_key} task: /path/to/write_dfsa.py: The use of direct filesystem references is deprecated: dfsa:/path/to/data/",
]
32 changes: 31 additions & 1 deletion tests/unit/progress/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from databricks.labs.lsql import Row
from databricks.labs.lsql.backends import MockBackend

from databricks.labs.ucx import __version__
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx import __version__
from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler


def test_jobs_progress_encoder() -> None:
Expand All @@ -28,9 +30,35 @@ def test_jobs_progress_encoder() -> None:
)
job_ownership = create_autospec(JobOwnership)
job_ownership.owner_of.return_value = "some_owner"
direct_fs_access_crawler = create_autospec(DirectFsAccessCrawler)
direct_fs_accesses = [
DirectFsAccess(
source_id="/path/to/write_dfsa.py",
source_lineage=[
LineageAtom(object_type="WORKFLOW", object_id="1", other={"name": "test"}),
LineageAtom(object_type="TASK", object_id="1/write-dfsa"),
],
path="dfsa:/path/to/data/",
is_read=False,
is_write=True,
),
DirectFsAccess(
source_id="/path/to/write_dfsa.py",
source_lineage=[
# Dashboard with same id as job is unlikely, but here to test it is not included
LineageAtom(object_type="DASHBOARD", object_id="1", other={"parent": "parent", "name": "test"}),
LineageAtom(object_type="QUERY", object_id="1/query", other={"name": "test"}),
],
path="dfsa:/path/to/data/",
is_read=False,
is_write=True,
),
]
direct_fs_access_crawler.snapshot.return_value = direct_fs_accesses
jobs_progress_encoder = JobsProgressEncoder(
sql_backend,
job_ownership,
[direct_fs_access_crawler],
"inventory",
2,
3,
Expand Down Expand Up @@ -59,8 +87,10 @@ def test_jobs_progress_encoder() -> None:
'some failure from config',
'cannot-autofix-table-reference: a task: /some/path: some failure',
'catalog-api-in-shared-clusters: b task: /some/other: some failure',
"direct-filesystem-access: 1/write-dfsa task: /path/to/write_dfsa.py: The use of direct filesystem references is deprecated: dfsa:/path/to/data/",
],
owner='some_owner',
ucx_version=__version__,
)
]
direct_fs_access_crawler.snapshot.assert_called_once()

0 comments on commit 438ffc8

Please sign in to comment.