Skip to content

Commit

Permalink
Add MigrationSequencer for jobs (#3008)
Browse files Browse the repository at this point in the history
## Changes
Add a `MigrationSequencer` class to sequence the migration steps for
jobs.

The PR includes the following resources in its sequence:
- Jobs
- Job tasks
- Job tasks dependencies
- Job clusters
- Cluster

Other elements part of the sequence are added later

### Linked issues
Progresses #1415 
Supersedes #2980 

### Tests
- [x] added unit tests
- [x] added integration tests

---------

Co-authored-by: Eric Vergnaud <[email protected]>
Co-authored-by: Cor Zuurmond <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2024
1 parent 6eab234 commit b97a8d5
Show file tree
Hide file tree
Showing 8 changed files with 842 additions and 39 deletions.
24 changes: 14 additions & 10 deletions src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ class ClusterInfo:

__id_attributes__: ClassVar[tuple[str, ...]] = ("cluster_id",)

@classmethod
def from_cluster_details(cls, details: ClusterDetails):
return ClusterInfo(
cluster_id=details.cluster_id if details.cluster_id else "",
cluster_name=details.cluster_name,
policy_id=details.policy_id,
spark_version=details.spark_version,
creator=details.creator_user_name or None,
success=1,
failures="[]",
)


class CheckClusterMixin(CheckInitScriptMixin):
_ws: WorkspaceClient
Expand Down Expand Up @@ -156,7 +168,7 @@ def _crawl(self) -> Iterable[ClusterInfo]:
all_clusters = list(self._ws.clusters.list())
return list(self._assess_clusters(all_clusters))

def _assess_clusters(self, all_clusters):
def _assess_clusters(self, all_clusters: Iterable[ClusterDetails]):
for cluster in all_clusters:
if cluster.cluster_source == ClusterSource.JOB:
continue
Expand All @@ -166,15 +178,7 @@ def _assess_clusters(self, all_clusters):
f"Cluster {cluster.cluster_id} have Unknown creator, it means that the original creator "
f"has been deleted and should be re-created"
)
cluster_info = ClusterInfo(
cluster_id=cluster.cluster_id if cluster.cluster_id else "",
cluster_name=cluster.cluster_name,
policy_id=cluster.policy_id,
spark_version=cluster.spark_version,
creator=creator,
success=1,
failures="[]",
)
cluster_info = ClusterInfo.from_cluster_details(cluster)
failures = self._check_cluster_failures(cluster, "cluster")
if len(failures) > 0:
cluster_info.success = 0
Expand Down
24 changes: 13 additions & 11 deletions src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
RunType,
SparkJarTask,
SqlTask,
Job,
)

from databricks.labs.ucx.assessment.clusters import CheckClusterMixin
Expand All @@ -43,6 +44,17 @@ class JobInfo:

__id_attributes__: ClassVar[tuple[str, ...]] = ("job_id",)

@classmethod
def from_job(cls, job: Job):
job_name = job.settings.name if job.settings and job.settings.name else "Unknown"
return JobInfo(
job_id=str(job.job_id),
success=1,
failures="[]",
job_name=job_name,
creator=job.creator_user_name or None,
)


class JobsMixin:
@classmethod
Expand Down Expand Up @@ -127,17 +139,7 @@ def _prepare(all_jobs) -> tuple[dict[int, set[str]], dict[int, JobInfo]]:
job_settings = job.settings
if not job_settings:
continue
job_name = job_settings.name
if not job_name:
job_name = "Unknown"

job_details[job.job_id] = JobInfo(
job_id=str(job.job_id),
job_name=job_name,
creator=creator_user_name,
success=1,
failures="[]",
)
job_details[job.job_id] = JobInfo.from_job(job)
return job_assessment, job_details

def _try_fetch(self) -> Iterable[JobInfo]:
Expand Down
Loading

0 comments on commit b97a8d5

Please sign in to comment.