From b97a8d54c42cbf335c7d60f68c0ed0713a414c99 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 1 Nov 2024 17:59:36 +0100 Subject: [PATCH] Add `MigrationSequencer` for jobs (#3008) ## Changes Add a `MigrationSequencer` class to sequence the migration steps for jobs. The PR includes the following resources in its sequence: - Jobs - Job tasks - Job tasks dependencies - Job clusters - Cluster Other elements part of the sequence are added later ### Linked issues Progresses #1415 Supersedes #2980 ### Tests - [x] added unit tests - [x] added integration tests --------- Co-authored-by: Eric Vergnaud Co-authored-by: Cor Zuurmond --- .../labs/ucx/assessment/clusters.py | 24 +- src/databricks/labs/ucx/assessment/jobs.py | 24 +- .../labs/ucx/assessment/sequencing.py | 361 +++++++++++++++++ .../labs/ucx/contexts/workflow_task.py | 5 + .../integration/assessment/test_sequencing.py | 59 +++ tests/unit/assessment/test_sequencing.py | 369 ++++++++++++++++++ tests/unit/conftest.py | 24 +- tests/unit/source_code/conftest.py | 15 - 8 files changed, 842 insertions(+), 39 deletions(-) create mode 100644 src/databricks/labs/ucx/assessment/sequencing.py create mode 100644 tests/integration/assessment/test_sequencing.py create mode 100644 tests/unit/assessment/test_sequencing.py diff --git a/src/databricks/labs/ucx/assessment/clusters.py b/src/databricks/labs/ucx/assessment/clusters.py index cdb238a8ba..da6b184404 100644 --- a/src/databricks/labs/ucx/assessment/clusters.py +++ b/src/databricks/labs/ucx/assessment/clusters.py @@ -49,6 +49,18 @@ class ClusterInfo: __id_attributes__: ClassVar[tuple[str, ...]] = ("cluster_id",) + @classmethod + def from_cluster_details(cls, details: ClusterDetails): + return ClusterInfo( + cluster_id=details.cluster_id if details.cluster_id else "", + cluster_name=details.cluster_name, + policy_id=details.policy_id, + spark_version=details.spark_version, + creator=details.creator_user_name or None, + success=1, + failures="[]", + ) + class CheckClusterMixin(CheckInitScriptMixin): _ws: WorkspaceClient @@ -156,7 +168,7 @@ def _crawl(self) -> Iterable[ClusterInfo]: all_clusters = list(self._ws.clusters.list()) return list(self._assess_clusters(all_clusters)) - def _assess_clusters(self, all_clusters): + def _assess_clusters(self, all_clusters: Iterable[ClusterDetails]): for cluster in all_clusters: if cluster.cluster_source == ClusterSource.JOB: continue @@ -166,15 +178,7 @@ def _assess_clusters(self, all_clusters): f"Cluster {cluster.cluster_id} have Unknown creator, it means that the original creator " f"has been deleted and should be re-created" ) - cluster_info = ClusterInfo( - cluster_id=cluster.cluster_id if cluster.cluster_id else "", - cluster_name=cluster.cluster_name, - policy_id=cluster.policy_id, - spark_version=cluster.spark_version, - creator=creator, - success=1, - failures="[]", - ) + cluster_info = ClusterInfo.from_cluster_details(cluster) failures = self._check_cluster_failures(cluster, "cluster") if len(failures) > 0: cluster_info.success = 0 diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index 1f87b26770..fe23e42fa0 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -21,6 +21,7 @@ RunType, SparkJarTask, SqlTask, + Job, ) from databricks.labs.ucx.assessment.clusters import CheckClusterMixin @@ -43,6 +44,17 @@ class JobInfo: __id_attributes__: ClassVar[tuple[str, ...]] = ("job_id",) + @classmethod + def from_job(cls, job: Job): + job_name = job.settings.name if job.settings and job.settings.name else "Unknown" + return JobInfo( + job_id=str(job.job_id), + success=1, + failures="[]", + job_name=job_name, + creator=job.creator_user_name or None, + ) + class JobsMixin: @classmethod @@ -127,17 +139,7 @@ def _prepare(all_jobs) -> tuple[dict[int, set[str]], dict[int, JobInfo]]: job_settings = job.settings if not job_settings: continue - job_name = job_settings.name - if not job_name: - job_name = "Unknown" - - job_details[job.job_id] = JobInfo( - job_id=str(job.job_id), - job_name=job_name, - creator=creator_user_name, - success=1, - failures="[]", - ) + job_details[job.job_id] = JobInfo.from_job(job) return job_assessment, job_details def _try_fetch(self) -> Iterable[JobInfo]: diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py new file mode 100644 index 0000000000..4fa660f558 --- /dev/null +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -0,0 +1,361 @@ +from __future__ import annotations + +import heapq +import itertools +from collections import defaultdict +from collections.abc import Iterable +from dataclasses import dataclass, field + +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import DatabricksError +from databricks.sdk.service.jobs import Job, JobCluster, Task + +from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo +from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo +from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.source_code.graph import DependencyProblem + + +@dataclass +class MigrationStep: + step_id: int + """Globally unique id.""" + + step_number: int + """The position in the migration sequence.""" + + object_type: str + """Object type. Together with `attr:object_id` a unique identifier.""" + + object_id: str + """Object id. Together with `attr:object_id` a unique identifier.""" + + object_name: str + """Object name, more human friendly than `attr:object_id`.""" + + object_owner: str + """Object owner.""" + + required_step_ids: list[int] + """The step ids that should be completed before this step is started.""" + + +MigrationNodeKey = tuple[str, str] + + +@dataclass(frozen=True) +class MigrationNode: + node_id: int = field(compare=False) + """Globally unique id.""" + + object_type: str + """Object type. Together with `attr:object_id` a unique identifier.""" + + object_id: str + """Object id. Together with `attr:object_id` a unique identifier.""" + + object_name: str = field(compare=False) + """Object name, more human friendly than `attr:object_id`.""" + + object_owner: str = field(compare=False) + """Object owner.""" + + @property + def key(self) -> MigrationNodeKey: + """Unique identifier of the node.""" + return self.object_type, self.object_id + + def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationStep: + """Convert to class:MigrationStep.""" + return MigrationStep( + step_id=self.node_id, + step_number=step_number, + object_type=self.object_type, + object_id=self.object_id, + object_name=self.object_name, + object_owner=self.object_owner, + required_step_ids=required_step_ids, + ) + + +@dataclass +class MaybeMigrationNode: + node: MigrationNode | None + problems: list[DependencyProblem] + + @property + def failed(self) -> bool: + return len(self.problems) > 0 + + +# We expect `tuple[int, int, MigrationNode | str]` +# for `[priority, counter, MigrationNode | PriorityQueue._REMOVED | PriorityQueue_UPDATED]` +# but we use list for the required mutability +QueueEntry = list[int | MigrationNode | str] + + +class PriorityQueue: + """A migration node priority queue. + + Note: + This implementation does not support threading safety as that is not required. + + Source: + See https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes on the changes below + to handle priority changes in the task. Also, the _UPDATED marker is introduced to avoid updating removed nodes. + """ + + _REMOVED = "" # Mark removed items + + def __init__(self): + self._entries: list[QueueEntry] = [] + self._entry_finder: dict[MigrationNode, QueueEntry] = {} + self._counter = itertools.count() # Tiebreaker with equal priorities, then "first in, first out" + + def put(self, priority: int, task: MigrationNode) -> None: + """Put or update task in the queue. + + The lowest priority is retrieved from the queue first. + """ + if task in self._entry_finder: + self._remove(task) + count = next(self._counter) + entry = [priority, count, task] + self._entry_finder[task] = entry + heapq.heappush(self._entries, entry) + + def get(self) -> MigrationNode | None: + """Gets the tasks with the lowest priority.""" + while self._entries: + _, _, task = heapq.heappop(self._entries) + if task == self._REMOVED: + continue + assert isinstance(task, MigrationNode) + self._remove(task) + return task + return None + + def _remove(self, task: MigrationNode) -> None: + """Remove a task from the queue.""" + entry = self._entry_finder.pop(task) + entry[2] = self._REMOVED + + +class MigrationSequencer: + """Sequence the migration dependencies in order to execute the migration. + + Similar to the other graph logic, we first build the graph by registering dependencies, then we analyse the graph. + Analysing the graph in this case means: computing the migration sequence in `meth:generate_steps`. + """ + + def __init__(self, ws: WorkspaceClient, administrator_locator: AdministratorLocator): + self._ws = ws + self._admin_locator = administrator_locator + self._counter = itertools.count() + self._nodes: dict[MigrationNodeKey, MigrationNode] = {} + + # Outgoing references contains edges in the graph pointing from a node to a set of nodes that the node + # references. These references follow the API references, e.g. a job contains tasks in the + # `Job.settings.tasks`, thus a job has an outgoing reference to each of those tasks. + self._outgoing_references: dict[MigrationNodeKey, set[MigrationNode]] = defaultdict(set) + + def register_jobs(self, *jobs: Job) -> list[MaybeMigrationNode]: + """Register a job. + + Args: + jobs (Job) : The jobs to register. + + Returns: + list[MaybeMigrationNode] : Each element contains a maybe migration node for each job respectively. If no + problems occurred during registering the job, the maybe migration node contains the migration node. + Otherwise, the maybe migration node contains the dependency problems occurring during registering the + job. + """ + nodes: list[MaybeMigrationNode] = [] + for job in jobs: + node = self._register_job(job) + nodes.append(node) + return nodes + + def _register_job(self, job: Job) -> MaybeMigrationNode: + """Register a single job.""" + problems: list[DependencyProblem] = [] + job_node = self._nodes.get(("JOB", str(job.job_id)), None) + if job_node: + return MaybeMigrationNode(job_node, problems) + job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) + job_node = MigrationNode( + node_id=next(self._counter), + object_type="JOB", + object_id=str(job.job_id), + object_name=job_name, + object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)), + ) + self._nodes[job_node.key] = job_node + if not job.settings: + return MaybeMigrationNode(job_node, problems) + for job_cluster in job.settings.job_clusters or []: + maybe_cluster_node = self._register_job_cluster(job_cluster, job_node) + if maybe_cluster_node.node: + self._outgoing_references[job_node.key].add(maybe_cluster_node.node) + for task in job.settings.tasks or []: + maybe_task_node = self._register_workflow_task(task, job_node) + problems.extend(maybe_task_node.problems) + if maybe_task_node.node: + self._outgoing_references[job_node.key].add(maybe_task_node.node) + # Only after registering all tasks, we can resolve the task dependencies + for task in job.settings.tasks or []: + task_key = ("TASK", f"{job.job_id}/{task.task_key}") + for task_dependency in task.depends_on or []: + task_dependency_key = ("TASK", f"{job.job_id}/{task_dependency.task_key}") + maybe_task_dependency = self._nodes.get(task_dependency_key) + if maybe_task_dependency: + self._outgoing_references[task_key].add(maybe_task_dependency) + else: + # Verified that a job with a task having a depends on referring a non-existing task cannot be + # created. However, this code is just in case. + problem = DependencyProblem( + 'task-dependency-not-found', f"Could not find task: {task_dependency_key[1]}" + ) + problems.append(problem) + return MaybeMigrationNode(job_node, problems) + + def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMigrationNode: + """Register a workflow task. + + TODO: + Handle following Task attributes: + - for_each_task + - libraries + - notebook_task + - pipeline_task + - python_wheel_task + - run_job_task + - spark_jar_task + - spark_python_task + - spark_submit_task + - sql_task + """ + problems: list[DependencyProblem] = [] + task_id = f"{parent.object_id}/{task.task_key}" + task_node = self._nodes.get(("TASK", task_id), None) + if task_node: + return MaybeMigrationNode(task_node, problems) + task_node = MigrationNode( + node_id=next(self._counter), + object_type="TASK", + object_id=task_id, + object_name=task.task_key, + object_owner=parent.object_owner, # No task owner so use parent job owner + ) + self._nodes[task_node.key] = task_node + # `task.new_cluster` is not handled because it is part of the task and not a separate node + if task.existing_cluster_id: + maybe_cluster_node = self._register_cluster(task.existing_cluster_id) + problems.extend(maybe_cluster_node.problems) + if maybe_cluster_node.node: + self._outgoing_references[task_node.key].add(maybe_cluster_node.node) + if task.job_cluster_key: + job_cluster_node = self._nodes.get(("CLUSTER", f"{parent.object_id}/{task.job_cluster_key}")) + if job_cluster_node: + self._outgoing_references[task_node.key].add(job_cluster_node) + else: + problem = DependencyProblem('cluster-not-found', f"Could not find cluster: {task.job_cluster_key}") + problems.append(problem) + return MaybeMigrationNode(task_node, problems) + + def _register_job_cluster(self, cluster: JobCluster, parent: MigrationNode) -> MaybeMigrationNode: + """Register a job cluster. + + A job cluster is defined within a job and therefore is found when defined on the job by definition. + """ + # Different jobs can have job clusters with the same key, therefore job id is prepended to ensure uniqueness + cluster_id = f"{parent.object_id}/{cluster.job_cluster_key}" + cluster_node = MigrationNode( + node_id=next(self._counter), + object_type="CLUSTER", + object_id=cluster_id, + object_name=cluster.job_cluster_key, + object_owner=parent.object_owner, + ) + self._nodes[cluster_node.key] = cluster_node + return MaybeMigrationNode(cluster_node, []) + + def _register_cluster(self, cluster_id: str) -> MaybeMigrationNode: + """Register a cluster. + + TODO + Handle following Task attributes: + - init_scripts + - instance_pool_id (maybe_not) + - policy_id + - spec.init_scripts + - spec.instance_pool_id (maybe not) + - spec.policy_id + """ + node_seen = self._nodes.get(("CLUSTER", cluster_id), None) + if node_seen: + return MaybeMigrationNode(node_seen, []) + try: + details = self._ws.clusters.get(cluster_id) + except DatabricksError: + message = f"Could not find cluster: {cluster_id}" + return MaybeMigrationNode(None, [DependencyProblem('cluster-not-found', message)]) + object_name = details.cluster_name if details and details.cluster_name else cluster_id + cluster_node = MigrationNode( + node_id=next(self._counter), + object_type="CLUSTER", + object_id=cluster_id, + object_name=object_name, + object_owner=ClusterOwnership(self._admin_locator).owner_of(ClusterInfo.from_cluster_details(details)), + ) + self._nodes[cluster_node.key] = cluster_node + # TODO register warehouses and policies + return MaybeMigrationNode(cluster_node, []) + + def generate_steps(self) -> Iterable[MigrationStep]: + """Generate the migration steps. + + An adapted version of the Kahn topological sort is implemented. The differences are as follows: + - We want the same step number for all nodes with same dependency depth. Therefore, instead of pushing to a + queue, we rebuild it once all leaf nodes are processed (these are transient leaf nodes i.e. they only become + leaf during processing) + - We handle cyclic dependencies (implemented in PR #3009) + """ + ordered_steps: list[MigrationStep] = [] + # For updating the priority of steps that depend on other steps + incoming_references = self._invert_outgoing_to_incoming_references() + seen = set[MigrationNode]() + queue = self._create_node_queue(self._outgoing_references) + node = queue.get() + while node is not None: + step = node.as_step(len(ordered_steps), sorted(n.node_id for n in self._outgoing_references[node.key])) + ordered_steps.append(step) + seen.add(node) + # Update the queue priority as if the migration step was completed + for dependency in incoming_references[node.key]: + if dependency in seen: + continue + priority = len(self._outgoing_references[dependency.key] - seen) + queue.put(priority, dependency) + node = queue.get() + return ordered_steps + + def _invert_outgoing_to_incoming_references(self) -> dict[MigrationNodeKey, set[MigrationNode]]: + """Invert the outgoing references to incoming references.""" + result: dict[MigrationNodeKey, set[MigrationNode]] = defaultdict(set) + for node_key, outgoing_nodes in self._outgoing_references.items(): + for target in outgoing_nodes: + result[target.key].add(self._nodes[node_key]) + return result + + def _create_node_queue(self, incoming: dict[MigrationNodeKey, set[MigrationNode]]) -> PriorityQueue: + """Create a priority queue for their nodes using the incoming count as priority. + + A lower number means it is pulled from the queue first, i.e. the key with the lowest number of keys is retrieved + first. + """ + priority_queue = PriorityQueue() + for node_key, node in self._nodes.items(): + priority_queue.put(len(incoming[node_key]), node) + return priority_queue diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index d840a6b087..a3656b290b 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -17,6 +17,7 @@ from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo, JobsCrawler, SubmitRunsCrawler from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler, PipelineInfo, PipelineOwnership +from databricks.labs.ucx.assessment.sequencing import MigrationSequencer from databricks.labs.ucx.config import WorkspaceConfig from databricks.labs.ucx.contexts.application import GlobalContext from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler @@ -240,3 +241,7 @@ def udfs_progress(self) -> ProgressEncoder[Udf]: self.workspace_id, self.config.ucx_catalog, ) + + @cached_property + def migration_sequencer(self) -> MigrationSequencer: + return MigrationSequencer(self.workspace_client, self.administrator_locator) diff --git a/tests/integration/assessment/test_sequencing.py b/tests/integration/assessment/test_sequencing.py new file mode 100644 index 0000000000..3b93682dce --- /dev/null +++ b/tests/integration/assessment/test_sequencing.py @@ -0,0 +1,59 @@ +from databricks.sdk.service import jobs + +from databricks.labs.ucx.source_code.graph import DependencyProblem + + +def test_migration_sequencing_simple_job(make_job, runtime_ctx) -> None: + """Sequence a simple job""" + job = make_job() + + maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0] + assert not maybe_job_node.failed + + steps = runtime_ctx.migration_sequencer.generate_steps() + step_object_types = [step.object_type for step in steps] + assert step_object_types == ["TASK", "JOB"] + + +def test_migration_sequencing_job_with_task_referencing_cluster( + make_job, + make_notebook, + runtime_ctx, + env_or_skip, +) -> None: + """Sequence a job with a task""" + cluster_id = env_or_skip("TEST_DEFAULT_CLUSTER_ID") + task = jobs.Task( + task_key="test-task", + existing_cluster_id=cluster_id, + notebook_task=jobs.NotebookTask(notebook_path=str(make_notebook())), + ) + job = make_job(tasks=[task]) + + maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0] + assert not maybe_job_node.failed + + steps = runtime_ctx.migration_sequencer.generate_steps() + step_object_types = [step.object_type for step in steps] + assert step_object_types == ["CLUSTER", "TASK", "JOB"] + + +def test_migration_sequencing_job_with_task_referencing_non_existing_cluster(runtime_ctx) -> None: + """Sequence a job with a task referencing existing cluster""" + # Cannot make an actual job referencing a non-existing cluster + task = jobs.Task(task_key="test-task", existing_cluster_id="non-existing-id") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + + maybe_node = runtime_ctx.migration_sequencer.register_jobs(job)[0] + assert maybe_node.failed + assert maybe_node.problems == [ + DependencyProblem( + code="cluster-not-found", + message="Could not find cluster: non-existing-id", + ) + ] + + steps = runtime_ctx.migration_sequencer.generate_steps() + step_object_types = [step.object_type for step in steps] + assert step_object_types == ["TASK", "JOB"] diff --git a/tests/unit/assessment/test_sequencing.py b/tests/unit/assessment/test_sequencing.py new file mode 100644 index 0000000000..1708c9a537 --- /dev/null +++ b/tests/unit/assessment/test_sequencing.py @@ -0,0 +1,369 @@ +from unittest.mock import create_autospec + +import pytest +from databricks.sdk.errors import ResourceDoesNotExist +from databricks.sdk.service import iam, jobs +from databricks.sdk.service.compute import ClusterDetails, ClusterSpec + +from databricks.labs.ucx.assessment.sequencing import MigrationSequencer, MigrationStep +from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder +from databricks.labs.ucx.source_code.graph import DependencyProblem + + +@pytest.fixture +def admin_locator(ws): + """Create a mock for an `class:AdminLocator`""" + admin_finder = create_autospec(AdministratorFinder) + admin_user = iam.User(user_name="John Doe", active=True, roles=[iam.ComplexValue(value="account_admin")]) + admin_finder.find_admin_users.return_value = (admin_user,) + return AdministratorLocator(ws, finders=[lambda _ws: admin_finder]) + + +def test_register_jobs_with_existing_cluster(ws, admin_locator) -> None: + """Register a job with a task referencing an existing cluster.""" + task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + + def get_cluster(cluster_id: str) -> ClusterDetails: + if cluster_id == "cluster-123": + return ClusterDetails(cluster_id="cluster-123", cluster_name="my-cluster") + raise ResourceDoesNotExist(f"Unknown cluster: {cluster_id}") + + ws.clusters.get.side_effect = get_cluster + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_node = sequencer.register_jobs(job)[0] + + assert not maybe_node.failed + + +def test_register_jobs_with_non_existing_cluster(ws, admin_locator) -> None: + """Register a job with a task referencing a non-existing cluster.""" + task = jobs.Task(task_key="test-task", existing_cluster_id="non-existing-id") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + + ws.clusters.get.side_effect = ResourceDoesNotExist("Unknown cluster") + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_node = sequencer.register_jobs(job)[0] + + assert maybe_node.failed + assert maybe_node.problems == [ + DependencyProblem( + code="cluster-not-found", + message="Could not find cluster: non-existing-id", + ) + ] + + +def test_register_jobs_with_existing_job_cluster_key(ws, admin_locator) -> None: + """Register a job with a task referencing a existing job cluster.""" + job_cluster = jobs.JobCluster("existing-id", ClusterSpec()) + task = jobs.Task(task_key="test-task", job_cluster_key="existing-id") + settings = jobs.JobSettings(name="test-job", tasks=[task], job_clusters=[job_cluster]) + job = jobs.Job(job_id=1234, settings=settings) + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_node = sequencer.register_jobs(job)[0] + + assert not maybe_node.failed + + +def test_register_jobs_with_non_existing_job_cluster_key(ws, admin_locator) -> None: + """Register a job with a task referencing a non-existing job cluster.""" + task = jobs.Task(task_key="test-task", job_cluster_key="non-existing-id") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_node = sequencer.register_jobs(job)[0] + + assert maybe_node.failed + assert maybe_node.problems == [ + DependencyProblem( + code="cluster-not-found", + message="Could not find cluster: non-existing-id", + ) + ] + + +def test_register_jobs_with_new_cluster(ws, admin_locator) -> None: + """Register a job with a task with a new cluster definition.""" + task = jobs.Task(task_key="test-task", new_cluster=ClusterSpec()) + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + ws.jobs.get.return_value = job + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_node = sequencer.register_jobs(job)[0] + + assert not maybe_node.failed + + +def test_register_jobs_with_task_dependency(ws, admin_locator) -> None: + """Register a job with two tasks having a dependency.""" + task1 = jobs.Task(task_key="task1") + task_dependency = jobs.TaskDependency(task1.task_key) + task2 = jobs.Task(task_key="task2", depends_on=[task_dependency]) + tasks = [task2, task1] # Reverse order on purpose to test if this is handled + settings = jobs.JobSettings(name="job", tasks=tasks) + job = jobs.Job(job_id=1234, settings=settings) + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_node = sequencer.register_jobs(job)[0] + + assert not maybe_node.failed + + +def test_register_jobs_with_non_existing_task_dependency(ws, admin_locator) -> None: + """Register a job with a non-existing task dependency.""" + task_dependency = jobs.TaskDependency("non-existing-id") + task = jobs.Task(task_key="task2", depends_on=[task_dependency]) + settings = jobs.JobSettings(name="job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_node = sequencer.register_jobs(job)[0] + + assert maybe_node.failed + assert maybe_node.problems == [ + DependencyProblem( + code="task-dependency-not-found", + message="Could not find task: 1234/non-existing-id", + ) + ] + + +def test_sequence_steps_from_job_task_with_existing_cluster_id(ws, admin_locator) -> None: + """Sequence a job with a task referencing an existing cluster. + + Sequence: + 1. Cluster + 2. Task + 3. Job + """ + task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + + # Match task cluster above on cluster id + admin_user = admin_locator.get_workspace_administrator() + + def get_cluster(cluster_id: str) -> ClusterDetails: + if cluster_id == "cluster-123": + return ClusterDetails(cluster_id="cluster-123", cluster_name="my-cluster", creator_user_name=admin_user) + raise ResourceDoesNotExist(f"Unknown cluster: {cluster_id}") + + ws.clusters.get.side_effect = get_cluster + + sequencer = MigrationSequencer(ws, admin_locator) + sequencer.register_jobs(job) + + steps = list(sequencer.generate_steps()) + + assert steps == [ + MigrationStep( + step_id=2, + step_number=0, + object_type="CLUSTER", + object_id="cluster-123", + object_name="my-cluster", + object_owner="John Doe", + required_step_ids=[], + ), + MigrationStep( + step_id=1, + step_number=1, + object_type="TASK", + object_id="1234/test-task", + object_name="test-task", + object_owner="John Doe", + required_step_ids=[2], + ), + MigrationStep( + step_id=0, + step_number=2, + object_type="JOB", + object_id="1234", + object_name="test-job", + object_owner="John Doe", + required_step_ids=[1], + ), + ] + + +def test_sequence_steps_from_job_task_with_existing_job_cluster_key(ws, admin_locator) -> None: + """Sequence a job with a task referencing an existing job cluster. + + Sequence: + 1. Job cluster + 2. Task + 3. Job + """ + job_cluster = jobs.JobCluster("existing-id", ClusterSpec()) + task = jobs.Task(task_key="test-task", job_cluster_key="existing-id") + settings = jobs.JobSettings(name="test-job", tasks=[task], job_clusters=[job_cluster]) + job = jobs.Job(job_id=1234, settings=settings) + sequencer = MigrationSequencer(ws, admin_locator) + sequencer.register_jobs(job) + + steps = list(sequencer.generate_steps()) + + assert steps == [ + MigrationStep( + step_id=1, + step_number=0, + object_type="CLUSTER", + object_id="1234/existing-id", + object_name="existing-id", + object_owner="John Doe", + required_step_ids=[], + ), + MigrationStep( + step_id=2, + step_number=1, + object_type="TASK", + object_id="1234/test-task", + object_name="test-task", + object_owner="John Doe", + required_step_ids=[1], + ), + MigrationStep( + step_id=0, + step_number=2, + object_type="JOB", + object_id="1234", + object_name="test-job", + object_owner="John Doe", + required_step_ids=[1, 2], + ), + ] + + +def test_sequence_steps_from_job_task_with_new_cluster(ws, admin_locator) -> None: + """Sequence a job with a task that has a new cluster definition. + + Sequence: + 1. Task # The cluster is part of the task, not a separate step in the sequence + 2. Job + """ + task = jobs.Task(task_key="test-task", new_cluster=ClusterSpec()) + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + sequencer = MigrationSequencer(ws, admin_locator) + sequencer.register_jobs(job) + + steps = list(sequencer.generate_steps()) + + assert steps == [ + MigrationStep( + step_id=1, + step_number=0, + object_type="TASK", + object_id="1234/test-task", + object_name="test-task", + object_owner="John Doe", + required_step_ids=[], + ), + MigrationStep( + step_id=0, + step_number=1, + object_type="JOB", + object_id="1234", + object_name="test-job", + object_owner="John Doe", + required_step_ids=[1], + ), + ] + + +def test_sequence_steps_from_job_task_with_non_existing_cluster(ws, admin_locator) -> None: + """Sequence a job with a task that references a non-existing cluster. + + Sequence: + 1. Task + 2. Job + """ + ws.clusters.get.side_effect = ResourceDoesNotExist("Unknown cluster") + task = jobs.Task(task_key="test-task", existing_cluster_id="non-existing-id") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + sequencer = MigrationSequencer(ws, admin_locator) + sequencer.register_jobs(job) + + steps = list(sequencer.generate_steps()) + + assert steps == [ + MigrationStep( + step_id=1, + step_number=0, + object_type="TASK", + object_id="1234/test-task", + object_name="test-task", + object_owner="John Doe", + required_step_ids=[], + ), + MigrationStep( + step_id=0, + step_number=1, + object_type="JOB", + object_id="1234", + object_name="test-job", + object_owner="John Doe", + required_step_ids=[1], + ), + ] + + +def test_sequence_steps_from_job_task_referencing_other_task(ws, admin_locator) -> None: + """Sequence a job with a task that has a new cluster definition. + + Sequence: + 1. Task1 + 2. Task2 + 3. Job + """ + task1 = jobs.Task(task_key="task1") + task_dependency = jobs.TaskDependency(task1.task_key) + task2 = jobs.Task(task_key="task2", depends_on=[task_dependency]) + tasks = [task2, task1] # Reverse order on purpose to test if this is handled + settings = jobs.JobSettings(name="job", tasks=tasks) + job = jobs.Job(job_id=1234, settings=settings) + sequencer = MigrationSequencer(ws, admin_locator) + + maybe_job_node = sequencer.register_jobs(job)[0] + assert not maybe_job_node.failed + + steps = list(sequencer.generate_steps()) + assert steps == [ + MigrationStep( + step_id=2, + step_number=0, + object_type="TASK", + object_id="1234/task1", + object_name="task1", + object_owner="John Doe", + required_step_ids=[], + ), + MigrationStep( + step_id=1, + step_number=1, + object_type="TASK", + object_id="1234/task2", + object_name="task2", + object_owner="John Doe", + required_step_ids=[2], + ), + MigrationStep( + step_id=0, + step_number=2, + object_type="JOB", + object_id="1234", + object_name="job", + object_owner="John Doe", + required_step_ids=[1, 2], + ), + ] diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 2c8cbfd3b2..675f2012e6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -10,13 +10,17 @@ from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler -from databricks.labs.ucx.source_code.graph import BaseNotebookResolver +from databricks.labs.ucx.source_code.graph import BaseNotebookResolver, DependencyResolver +from databricks.labs.ucx.source_code.known import KnownList +from databricks.labs.ucx.source_code.linters.files import ImportFileResolver, FileLoader +from databricks.labs.ucx.source_code.notebooks.loaders import NotebookResolver, NotebookLoader from databricks.labs.ucx.source_code.path_lookup import PathLookup from databricks.sdk import AccountClient from databricks.sdk.config import Config from databricks.labs.ucx.config import WorkspaceConfig from databricks.labs.ucx.contexts.workflow_task import RuntimeContext +from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver from . import mock_workspace_client @@ -57,8 +61,10 @@ class CustomIterator: def __init__(self, values): self._values = iter(values) self._has_next = True + self._next_value = None - def hasNext(self): # pylint: disable=invalid-name + # pylint: disable=invalid-name + def hasNext(self): try: self._next_value = next(self._values) self._has_next = True @@ -150,9 +156,11 @@ def inner(cb, **replace) -> RuntimeContext: ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().listDatabases.return_value = ( mock_list_databases_iterator ) + # pylint: disable=protected-access ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().listTables.return_value = ( mock_list_tables_iterator ) + # pylint: disable=protected-access ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().getTable.return_value = ( get_table_mock ) @@ -165,8 +173,9 @@ def inner(cb, **replace) -> RuntimeContext: @pytest.fixture def acc_client(): - acc = create_autospec(AccountClient) # pylint: disable=mock-no-usage + acc = create_autospec(AccountClient) acc.config = Config(host="https://accounts.cloud.databricks.com", account_id="123", token="123") + acc.assert_not_called() return acc @@ -201,3 +210,12 @@ def mock_backend() -> MockBackend: @pytest.fixture def ws(): return mock_workspace_client() + + +@pytest.fixture +def simple_dependency_resolver(mock_path_lookup: PathLookup) -> DependencyResolver: + allow_list = KnownList() + library_resolver = PythonLibraryResolver(allow_list) + notebook_resolver = NotebookResolver(NotebookLoader()) + import_resolver = ImportFileResolver(FileLoader(), allow_list) + return DependencyResolver(library_resolver, notebook_resolver, import_resolver, import_resolver, mock_path_lookup) diff --git a/tests/unit/source_code/conftest.py b/tests/unit/source_code/conftest.py index 6029ce4d82..9c999d92dc 100644 --- a/tests/unit/source_code/conftest.py +++ b/tests/unit/source_code/conftest.py @@ -1,12 +1,6 @@ import pytest from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus -from databricks.labs.ucx.source_code.graph import DependencyResolver -from databricks.labs.ucx.source_code.known import KnownList -from databricks.labs.ucx.source_code.linters.files import ImportFileResolver, FileLoader -from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader, NotebookResolver -from databricks.labs.ucx.source_code.path_lookup import PathLookup -from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver @pytest.fixture @@ -51,12 +45,3 @@ def extended_test_index(): ), ] ) - - -@pytest.fixture -def simple_dependency_resolver(mock_path_lookup: PathLookup) -> DependencyResolver: - allow_list = KnownList() - library_resolver = PythonLibraryResolver(allow_list) - notebook_resolver = NotebookResolver(NotebookLoader()) - import_resolver = ImportFileResolver(FileLoader(), allow_list) - return DependencyResolver(library_resolver, notebook_resolver, import_resolver, import_resolver, mock_path_lookup)