diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 8b6313ad0b..d41730bed5 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -203,6 +203,7 @@ def jobs_progress(self) -> ProgressEncoder[JobInfo]: return JobsProgressEncoder( self.sql_backend, self.job_ownership, + [self.directfs_access_crawler_for_paths, self.directfs_access_crawler_for_queries], self.inventory_database, self.parent_run_id, self.workspace_id, diff --git a/src/databricks/labs/ucx/progress/jobs.py b/src/databricks/labs/ucx/progress/jobs.py index 198139543c..d598aef94f 100644 --- a/src/databricks/labs/ucx/progress/jobs.py +++ b/src/databricks/labs/ucx/progress/jobs.py @@ -7,21 +7,22 @@ from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler from databricks.labs.ucx.source_code.jobs import JobProblem class JobsProgressEncoder(ProgressEncoder[JobInfo]): + """Encoder class:Job to class:History.""" def __init__( self, sql_backend: SqlBackend, ownership: JobOwnership, + direct_fs_access_crawlers: list[DirectFsAccessCrawler], inventory_database: str, run_id: int, workspace_id: int, catalog: str, - schema: str = "multiworkspace", - table: str = "historical", ) -> None: super().__init__( sql_backend, @@ -30,9 +31,10 @@ def __init__( run_id, workspace_id, catalog, - schema, - table, + "multiworkspace", + "historical", ) + self._direct_fs_access_crawlers = direct_fs_access_crawlers self._inventory_database = inventory_database @cached_property @@ -48,7 +50,36 @@ def _job_problems(self) -> dict[int, list[str]]: index[job_problem.job_id].append(failure) return index + @cached_property + def _direct_fs_accesses(self) -> dict[str, list[str]]: + index = collections.defaultdict(list) + for crawler in self._direct_fs_access_crawlers: + for direct_fs_access in crawler.snapshot(): + # The workflow and task source lineage are added by the WorkflowLinter + if len(direct_fs_access.source_lineage) < 2: + continue + if direct_fs_access.source_lineage[0].object_type != "WORKFLOW": + continue + if direct_fs_access.source_lineage[1].object_type != "TASK": + continue + job_id = direct_fs_access.source_lineage[0].object_id + task_key = direct_fs_access.source_lineage[1].object_id # / + # Follow same failure message structure as the JobProblems above and DirectFsAccessPyLinter deprecation + code = "direct-filesystem-access" + message = f"The use of direct filesystem references is deprecated: {direct_fs_access.path}" + failure = f"{code}: {task_key} task: {direct_fs_access.source_id}: {message}" + index[job_id].append(failure) + return index + def _encode_record_as_historical(self, record: JobInfo) -> Historical: + """Encode a job as a historical records. + + Failures are detected by the WorkflowLinter: + - Job problems + - Direct filesystem access by code used in job + """ historical = super()._encode_record_as_historical(record) - failures = self._job_problems.get(int(record.job_id), []) + failures = [] + failures.extend(self._job_problems.get(int(record.job_id), [])) + failures.extend(self._direct_fs_accesses.get(record.job_id, [])) return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/source_code/directfs_access.py b/src/databricks/labs/ucx/source_code/directfs_access.py index b0b449dd1a..1e6f9b823c 100644 --- a/src/databricks/labs/ucx/source_code/directfs_access.py +++ b/src/databricks/labs/ucx/source_code/directfs_access.py @@ -50,7 +50,8 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None: Providing a multi-entity crawler is out-of-scope of this PR """ try: - # TODO until we historize data, we append all DFSAs + # TODO: Until we historize data, we append all DFSAs + # UPDATE: We historize DFSA from WorkflowLinter, not from QueryLinter yet self._update_snapshot(dfsas, mode="append") except DatabricksError as e: logger.error("Failed to store DFSAs", exc_info=e) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0fa8a6d163..f50ce1bceb 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -570,20 +570,26 @@ def make_cluster_policy(self, **kwargs) -> CreatePolicyResponse: def make_cluster_policy_permissions(self, **kwargs): return self._make_cluster_policy_permissions(**kwargs) + def make_job(self, **kwargs) -> Job: + job = self._make_job(**kwargs) + self._jobs.append(job) + return job + + def make_dashboard(self, **kwargs) -> Dashboard: + dashboard = self._make_dashboard(**kwargs) + self._dashboards.append(dashboard) + return dashboard + def make_linting_resources(self) -> None: """Make resources to lint.""" - notebook_job_1 = self._make_job(content="spark.read.parquet('dbfs://mnt/notebook/')") - notebook_job_2 = self._make_job(content="spark.table('old.stuff')") - file_job_1 = self._make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask) - file_job_2 = self._make_job(content="spark.table('some.table')", task_type=SparkPythonTask) + self.make_job(content="spark.read.parquet('dbfs://mnt/notebook/')") + self.make_job(content="spark.table('old.stuff')") + self.make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask) + self.make_job(content="spark.table('some.table')", task_type=SparkPythonTask) query_1 = self._make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo2/bar2`') - dashboard_1 = self._make_dashboard(query=query_1) + self._make_dashboard(query=query_1) query_2 = self._make_query(sql_query='SELECT * from my_schema.my_table') - dashboard_2 = self._make_dashboard(query=query_2) - - self._jobs.extend([notebook_job_1, notebook_job_2, file_job_1, file_job_2]) - self._dashboards.append(dashboard_1) - self._dashboards.append(dashboard_2) + self._make_dashboard(query=query_2) def add_table(self, table: TableInfo): self._tables.append(table) diff --git a/tests/integration/progress/test_jobs.py b/tests/integration/progress/test_jobs.py new file mode 100644 index 0000000000..324a89e04a --- /dev/null +++ b/tests/integration/progress/test_jobs.py @@ -0,0 +1,86 @@ +from databricks.labs.ucx.assessment.jobs import JobInfo +from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom +from databricks.labs.ucx.source_code.jobs import JobProblem + + +def test_job_progress_encoder_failures(runtime_ctx, az_cli_ctx) -> None: + az_cli_ctx.progress_tracking_installation.run() + runtime_ctx = runtime_ctx.replace( + parent_run_id=1, + sql_backend=az_cli_ctx.sql_backend, + ucx_catalog=az_cli_ctx.ucx_catalog, + ) + + job = runtime_ctx.make_job() + assert job.job_id, "Expected job with id" + assert job.settings and job.settings.tasks, "Expected job with tasks" + + job_problems = [ + JobProblem( + job_id=job.job_id, + job_name=job.settings.name, + task_key=job.settings.tasks[0].task_key, + path="parent/child.py", + code="sql-parse-error", + message="Could not parse SQL", + start_line=12, + start_col=0, + end_line=12, + end_col=20, + ) + ] + runtime_ctx.sql_backend.save_table( + f'{runtime_ctx.inventory_database}.workflow_problems', + job_problems, + JobProblem, + mode='overwrite', + ) + + dashboard = runtime_ctx.make_dashboard() + + direct_fs_access_for_path = DirectFsAccess( + source_id="/path/to/write_dfsa.py", + source_lineage=[ + LineageAtom(object_type="WORKFLOW", object_id=str(job.job_id), other={"name": job.settings.name}), + LineageAtom(object_type="TASK", object_id=job.settings.tasks[0].task_key), + ], + path="dfsa:/path/to/data/", + is_read=False, + is_write=True, + ) + runtime_ctx.directfs_access_crawler_for_paths.dump_all([direct_fs_access_for_path]) + + direct_fs_access_for_query = DirectFsAccess( + source_id="/path/to/write_dfsa.py", + source_lineage=[ + LineageAtom( + object_type="DASHBOARD", + object_id=dashboard.id, + other={"parent": dashboard.parent, "name": dashboard.name}, + ), + LineageAtom(object_type="QUERY", object_id=f"{dashboard.id}/query", other={"name": "test"}), + ], + path="dfsa:/path/to/data/", + is_read=False, + is_write=True, + ) + runtime_ctx.directfs_access_crawler_for_queries.dump_all([direct_fs_access_for_query]) + + job_info = JobInfo( + str(job.job_id), + success=1, + failures="[]", + job_name=job.settings.name, + creator=job.creator_user_name, + ) + runtime_ctx.jobs_progress.append_inventory_snapshot([job_info]) + + history_table_name = escape_sql_identifier(runtime_ctx.tables_progress.full_name) + records = list(runtime_ctx.sql_backend.fetch(f"SELECT * FROM {history_table_name}")) + + assert len(records) == 1, "Expected one historical entry" + assert records[0].failures == [ + f"sql-parse-error: {job.settings.tasks[0].task_key} task: parent/child.py: Could not parse SQL", + f"direct-filesystem-access: {job.settings.tasks[0].task_key} task: /path/to/write_dfsa.py: The use of direct filesystem references is deprecated: dfsa:/path/to/data/", + ] diff --git a/tests/unit/progress/test_jobs.py b/tests/unit/progress/test_jobs.py index 23c2aa1435..606590e658 100644 --- a/tests/unit/progress/test_jobs.py +++ b/tests/unit/progress/test_jobs.py @@ -3,9 +3,11 @@ from databricks.labs.lsql import Row from databricks.labs.lsql.backends import MockBackend +from databricks.labs.ucx import __version__ from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo from databricks.labs.ucx.progress.jobs import JobsProgressEncoder -from databricks.labs.ucx import __version__ +from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler def test_jobs_progress_encoder() -> None: @@ -28,9 +30,35 @@ def test_jobs_progress_encoder() -> None: ) job_ownership = create_autospec(JobOwnership) job_ownership.owner_of.return_value = "some_owner" + direct_fs_access_crawler = create_autospec(DirectFsAccessCrawler) + direct_fs_accesses = [ + DirectFsAccess( + source_id="/path/to/write_dfsa.py", + source_lineage=[ + LineageAtom(object_type="WORKFLOW", object_id="1", other={"name": "test"}), + LineageAtom(object_type="TASK", object_id="1/write-dfsa"), + ], + path="dfsa:/path/to/data/", + is_read=False, + is_write=True, + ), + DirectFsAccess( + source_id="/path/to/write_dfsa.py", + source_lineage=[ + # Dashboard with same id as job is unlikely, but here to test it is not included + LineageAtom(object_type="DASHBOARD", object_id="1", other={"parent": "parent", "name": "test"}), + LineageAtom(object_type="QUERY", object_id="1/query", other={"name": "test"}), + ], + path="dfsa:/path/to/data/", + is_read=False, + is_write=True, + ), + ] + direct_fs_access_crawler.snapshot.return_value = direct_fs_accesses jobs_progress_encoder = JobsProgressEncoder( sql_backend, job_ownership, + [direct_fs_access_crawler], "inventory", 2, 3, @@ -59,8 +87,10 @@ def test_jobs_progress_encoder() -> None: 'some failure from config', 'cannot-autofix-table-reference: a task: /some/path: some failure', 'catalog-api-in-shared-clusters: b task: /some/other: some failure', + "direct-filesystem-access: 1/write-dfsa task: /path/to/write_dfsa.py: The use of direct filesystem references is deprecated: dfsa:/path/to/data/", ], owner='some_owner', ucx_version=__version__, ) ] + direct_fs_access_crawler.snapshot.assert_called_once()