Skip to content

Commit

Permalink
Let WorkflowLinter.refresh_report lint jobs from JobsCrawler (#3732)
Browse files Browse the repository at this point in the history
## Changes

Let `WorkflowLinter.refresh_report` lint jobs from `JobsCrawler` so that
we only lint what is within scope

### Linked issues

Resolves #3662
Progresses #3722

### Functionality

- [x] modified workflow linting code
- [x] modified existing workflow: `assessment`

### Tests

- [x] modified unit tests
- [x] modified integration tests
  • Loading branch information
JCZuurmond authored Feb 25, 2025
1 parent 05c2d6a commit 05593cd
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def assess_dashboards(self, ctx: RuntimeContext):
"""
ctx.query_linter.refresh_report()

@job_task
@job_task(depends_on=[assess_jobs])
def assess_workflows(self, ctx: RuntimeContext):
"""Scans all jobs for migration issues in notebooks jobs.
Expand Down
3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,13 +589,12 @@ def dependency_resolver(self) -> DependencyResolver:
def workflow_linter(self) -> WorkflowLinter:
return WorkflowLinter(
self.workspace_client,
self.jobs_crawler,
self.dependency_resolver,
self.path_lookup,
TableMigrationIndex([]), # TODO: bring back self.tables_migrator.index()
self.directfs_access_crawler_for_paths,
self.used_tables_crawler_for_paths,
self.config.include_job_ids,
self.config.debug_listing_upper_limit,
)

@cached_property
Expand Down
19 changes: 4 additions & 15 deletions src/databricks/labs/ucx/source_code/linters/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from databricks.sdk.errors import NotFound
from databricks.sdk.service import jobs

from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.source_code.base import (
DirectFsAccess,
Expand Down Expand Up @@ -40,37 +41,25 @@ class WorkflowLinter:
def __init__(
self,
ws: WorkspaceClient,
jobs_crawler: JobsCrawler,
resolver: DependencyResolver,
path_lookup: PathLookup,
migration_index: TableMigrationIndex,
directfs_crawler: DirectFsAccessCrawler,
used_tables_crawler: UsedTablesCrawler,
include_job_ids: list[int] | None = None,
debug_listing_upper_limit: int | None = None,
):
self._ws = ws
self._jobs_crawler = jobs_crawler
self._resolver = resolver
self._path_lookup = path_lookup
self._migration_index = migration_index
self._directfs_crawler = directfs_crawler
self._used_tables_crawler = used_tables_crawler
self._include_job_ids = include_job_ids
self._debug_listing_upper_limit = debug_listing_upper_limit

def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None:
tasks = []
items_listed = 0
for job in self._ws.jobs.list():
if self._include_job_ids is not None and job.job_id not in self._include_job_ids:
logger.info(f"Skipping job_id={job.job_id}")
continue
if self._debug_listing_upper_limit is not None and items_listed >= self._debug_listing_upper_limit:
logger.warning(f"Debug listing limit reached: {self._debug_listing_upper_limit}")
break
if job.settings is not None and job.settings.name is not None:
logger.info(f"Found job_id={job.job_id}: {job.settings.name}")
for job in self._jobs_crawler.snapshot():
tasks.append(functools.partial(self.lint_job, job.job_id))
items_listed += 1
logger.info(f"Running {len(tasks)} linting tasks in parallel...")
job_results, errors = Threads.gather('linting workflows', tasks)
job_problems: list[JobProblem] = []
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ def test_running_real_assessment_job(
assert actual_tables == expected_tables

query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems"
for row in sql_backend.fetch(query):
assert row['path'] != 'UNKNOWN'
workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"]
assert not workflow_problems_without_path
8 changes: 3 additions & 5 deletions tests/integration/source_code/test_directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ def test_lakeview_query_dfsa_ownership(runtime_ctx) -> None:

def test_path_dfsa_ownership(
runtime_ctx,
make_notebook,
make_job,
make_directory,
inventory_schema,
sql_backend,
Expand All @@ -88,18 +86,18 @@ def test_path_dfsa_ownership(

# A job with a notebook task that contains direct filesystem access.
notebook_source = b"display(spark.read.csv('/mnt/things/e/f/g'))"
notebook = make_notebook(path=f"{make_directory()}/notebook.py", content=notebook_source)
job = make_job(notebook_path=notebook)
notebook = runtime_ctx.make_notebook(path=f"{make_directory()}/notebook.py", content=notebook_source)
runtime_ctx.make_job(notebook_path=notebook)

# Produce a DFSA record for the job.
linter = WorkflowLinter(
runtime_ctx.workspace_client,
runtime_ctx.jobs_crawler,
runtime_ctx.dependency_resolver,
runtime_ctx.path_lookup,
TableMigrationIndex([]),
runtime_ctx.directfs_access_crawler_for_paths,
runtime_ctx.used_tables_crawler_for_paths,
include_job_ids=[job.job_id],
)
linter.refresh_report(sql_backend, inventory_schema)

Expand Down
45 changes: 30 additions & 15 deletions tests/unit/source_code/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
from unittest.mock import create_autospec

import pytest
from databricks.labs.blueprint.paths import DBFSPath, WorkspacePath
from databricks.labs.lsql.backends import MockBackend
from databricks.sdk.service.compute import LibraryInstallStatus
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service import compute, jobs
from databricks.sdk.service.jobs import Job, SparkPythonTask
from databricks.sdk.service.pipelines import NotebookLibrary, GetPipelineResponse, PipelineLibrary, FileLibrary
from databricks.sdk.service.pipelines import (
GetPipelineResponse,
FileLibrary,
NotebookLibrary,
PipelineLibrary,
PipelineSpec,
)
from databricks.sdk.service.workspace import ExportFormat, Language, ObjectInfo

from databricks.labs.blueprint.paths import DBFSPath, WorkspacePath
from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.source_code.base import CurrentSessionState
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service import compute, jobs, pipelines
from databricks.sdk.service.workspace import ExportFormat, ObjectInfo, Language

from databricks.labs.ucx.source_code.files import FileLoader, ImportFileResolver
from databricks.labs.ucx.source_code.graph import (
Dependency,
Expand All @@ -27,7 +31,8 @@
)
from databricks.labs.ucx.source_code.jobs import JobProblem, WorkflowTaskContainer
from databricks.labs.ucx.source_code.linters.jobs import WorkflowLinter
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookResolver, NotebookLoader
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader, NotebookResolver
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler


Expand Down Expand Up @@ -228,10 +233,17 @@ def test_workflow_linter_lint_job_logs_problems(dependency_resolver, mock_path_l
expected_message = "Found job problems:\nUNKNOWN:-1 [library-install-failed] 'pip --disable-pip-version-check install unknown-library"

ws = create_autospec(WorkspaceClient)
jobs_crawler = create_autospec(JobsCrawler)
directfs_crawler = create_autospec(DirectFsAccessCrawler)
used_tables_crawler = create_autospec(UsedTablesCrawler)
linter = WorkflowLinter(
ws, dependency_resolver, mock_path_lookup, empty_index, directfs_crawler, used_tables_crawler
ws,
jobs_crawler,
dependency_resolver,
mock_path_lookup,
empty_index,
directfs_crawler,
used_tables_crawler,
)

libraries = [compute.Library(pypi=compute.PythonPyPiLibrary(package="unknown-library-name"))]
Expand All @@ -243,6 +255,7 @@ def test_workflow_linter_lint_job_logs_problems(dependency_resolver, mock_path_l
with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.source_code.jobs"):
linter.lint_job(1234)

jobs_crawler.assert_not_called() # Only called through refresh_report
directfs_crawler.assert_not_called()
used_tables_crawler.assert_not_called()
assert any(message.startswith(expected_message) for message in caplog.messages), caplog.messages
Expand Down Expand Up @@ -326,7 +339,7 @@ def test_workflow_task_container_with_existing_cluster_builds_dependency_graph_p
whl=None,
),
messages=None,
status=LibraryInstallStatus.PENDING,
status=compute.LibraryInstallStatus.PENDING,
)
]

Expand Down Expand Up @@ -446,7 +459,7 @@ def test_workflow_linter_dlt_pipeline_task(graph) -> None:
ws.pipelines.get.return_value = GetPipelineResponse(
pipeline_id=pipeline.pipeline_id,
name="test-pipeline",
spec=pipelines.PipelineSpec(continuous=False),
spec=PipelineSpec(continuous=False),
)

workflow_task_container = WorkflowTaskContainer(ws, task, Job())
Expand All @@ -456,7 +469,7 @@ def test_workflow_linter_dlt_pipeline_task(graph) -> None:
ws.pipelines.get.return_value = GetPipelineResponse(
pipeline_id=pipeline.pipeline_id,
name="test-pipeline",
spec=pipelines.PipelineSpec(
spec=PipelineSpec(
libraries=[
PipelineLibrary(
jar="some.jar",
Expand Down Expand Up @@ -549,19 +562,21 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m
ws.jobs.get.return_value = Job(job_id=2, settings=settings)

sql_backend = MockBackend()
jobs_crawler = create_autospec(JobsCrawler)
directfs_crawler = DirectFsAccessCrawler.for_paths(sql_backend, "test")
used_tables_crawler = UsedTablesCrawler.for_paths(sql_backend, "test")
linter = WorkflowLinter(
ws,
jobs_crawler,
dependency_resolver,
mock_path_lookup,
migration_index,
directfs_crawler,
used_tables_crawler,
[1],
)
linter.refresh_report(sql_backend, 'test')

jobs_crawler.snapshot.assert_called_once()
sql_backend.has_rows_written_for('test.workflow_problems')
sql_backend.has_rows_written_for('hive_metastore.test.used_tables_in_paths')
sql_backend.has_rows_written_for('hive_metastore.test.directfs_in_paths')

0 comments on commit 05593cd

Please sign in to comment.