Skip to content

Commit

Permalink
Refactor register_job to register_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
JCZuurmond committed Nov 1, 2024
1 parent d26916b commit ba8b4c6
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 33 deletions.
31 changes: 20 additions & 11 deletions src/databricks/labs/ucx/assessment/sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import Job, JobCluster, Task

from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
Expand Down Expand Up @@ -156,20 +156,29 @@ def __init__(self, ws: WorkspaceClient, administrator_locator: AdministratorLoca

# Outgoing references contains edges in the graph pointing from a node to a set of nodes that the node
# references. These references follow the API references, e.g. a job contains tasks in the
# `jobs.Job.settings.tasks`, thus a job has an outgoing reference to each of those tasks.
# `Job.settings.tasks`, thus a job has an outgoing reference to each of those tasks.
self._outgoing_references: dict[MigrationNodeKey, set[MigrationNode]] = defaultdict(set)

def register_job(self, job: jobs.Job) -> MaybeMigrationNode:
def register_jobs(self, *jobs: Job) -> list[MaybeMigrationNode]:
"""Register a job.
Args:
job (jobs.Job) : The job to register.
jobs (Job) : The jobs to register.
Returns:
MaybeMigrationNode : A maybe migration node, which has the migration node if no problems occurred during
registering. Otherwise, the maybe migration node contains the dependency problems occurring during
registering the job.
list[MaybeMigrationNode] : Each element contains a maybe migration node for each job respectively. If no
problems occurred during registering the job, the maybe migration node contains the migration node.
Otherwise, the maybe migration node contains the dependency problems occurring during registering the
job.
"""
nodes: list[MaybeMigrationNode] = []
for job in jobs:
node = self._register_job(job)
nodes.append(node)
return nodes

def _register_job(self, job: Job) -> MaybeMigrationNode:
"""Register a single job."""
problems: list[DependencyProblem] = []
job_node = self._nodes.get(("JOB", str(job.job_id)), None)
if job_node:
Expand Down Expand Up @@ -211,11 +220,11 @@ def register_job(self, job: jobs.Job) -> MaybeMigrationNode:
problems.append(problem)
return MaybeMigrationNode(job_node, problems)

def _register_workflow_task(self, task: jobs.Task, parent: MigrationNode) -> MaybeMigrationNode:
def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMigrationNode:
"""Register a workflow task.
TODO:
Handle following jobs.Task attributes:
Handle following Task attributes:
- for_each_task
- libraries
- notebook_task
Expand Down Expand Up @@ -255,7 +264,7 @@ def _register_workflow_task(self, task: jobs.Task, parent: MigrationNode) -> May
problems.append(problem)
return MaybeMigrationNode(task_node, problems)

def _register_job_cluster(self, cluster: jobs.JobCluster, parent: MigrationNode) -> MaybeMigrationNode:
def _register_job_cluster(self, cluster: JobCluster, parent: MigrationNode) -> MaybeMigrationNode:
"""Register a job cluster.
A job cluster is defined within a job and therefore is found when defined on the job by definition.
Expand All @@ -276,7 +285,7 @@ def _register_cluster(self, cluster_id: str) -> MaybeMigrationNode:
"""Register a cluster.
TODO
Handle following jobs.Task attributes:
Handle following Task attributes:
- init_scripts
- instance_pool_id (maybe_not)
- policy_id
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/assessment/test_sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def test_migration_sequencing_simple_job(make_job, runtime_ctx) -> None:
"""Sequence a simple job"""
job = make_job()

maybe_job_node = runtime_ctx.migration_sequencer.register_job(job)
maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0]
assert not maybe_job_node.failed

steps = runtime_ctx.migration_sequencer.generate_steps()
Expand All @@ -30,7 +30,7 @@ def test_migration_sequencing_job_with_task_referencing_cluster(
)
job = make_job(tasks=[task])

maybe_job_node = runtime_ctx.migration_sequencer.register_job(job)
maybe_job_node = runtime_ctx.migration_sequencer.register_jobs(job)[0]
assert not maybe_job_node.failed

steps = runtime_ctx.migration_sequencer.generate_steps()
Expand All @@ -45,7 +45,7 @@ def test_migration_sequencing_job_with_task_referencing_non_existing_cluster(run
settings = jobs.JobSettings(name="test-job", tasks=[task])
job = jobs.Job(job_id=1234, settings=settings)

maybe_node = runtime_ctx.migration_sequencer.register_job(job)
maybe_node = runtime_ctx.migration_sequencer.register_jobs(job)[0]
assert maybe_node.failed
assert maybe_node.problems == [
DependencyProblem(
Expand Down
38 changes: 19 additions & 19 deletions tests/unit/assessment/test_sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def admin_locator(ws):
return AdministratorLocator(ws, finders=[lambda _ws: admin_finder])


def test_register_job_with_existing_cluster(ws, admin_locator) -> None:
def test_register_jobs_with_existing_cluster(ws, admin_locator) -> None:
"""Register a job with a task referencing an existing cluster."""
task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123")
settings = jobs.JobSettings(name="test-job", tasks=[task])
Expand All @@ -33,12 +33,12 @@ def get_cluster(cluster_id: str) -> ClusterDetails:
ws.clusters.get.side_effect = get_cluster
sequencer = MigrationSequencer(ws, admin_locator)

maybe_node = sequencer.register_job(job)
maybe_node = sequencer.register_jobs(job)[0]

assert not maybe_node.failed


def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None:
def test_register_jobs_with_non_existing_cluster(ws, admin_locator) -> None:
"""Register a job with a task referencing a non-existing cluster."""
task = jobs.Task(task_key="test-task", existing_cluster_id="non-existing-id")
settings = jobs.JobSettings(name="test-job", tasks=[task])
Expand All @@ -47,7 +47,7 @@ def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None:
ws.clusters.get.side_effect = ResourceDoesNotExist("Unknown cluster")
sequencer = MigrationSequencer(ws, admin_locator)

maybe_node = sequencer.register_job(job)
maybe_node = sequencer.register_jobs(job)[0]

assert maybe_node.failed
assert maybe_node.problems == [
Expand All @@ -58,28 +58,28 @@ def test_register_job_with_non_existing_cluster(ws, admin_locator) -> None:
]


def test_register_job_with_existing_job_cluster_key(ws, admin_locator) -> None:
def test_register_jobs_with_existing_job_cluster_key(ws, admin_locator) -> None:
"""Register a job with a task referencing a existing job cluster."""
job_cluster = jobs.JobCluster("existing-id", ClusterSpec())
task = jobs.Task(task_key="test-task", job_cluster_key="existing-id")
settings = jobs.JobSettings(name="test-job", tasks=[task], job_clusters=[job_cluster])
job = jobs.Job(job_id=1234, settings=settings)
sequencer = MigrationSequencer(ws, admin_locator)

maybe_node = sequencer.register_job(job)
maybe_node = sequencer.register_jobs(job)[0]

assert not maybe_node.failed


def test_register_job_with_non_existing_job_cluster_key(ws, admin_locator) -> None:
def test_register_jobs_with_non_existing_job_cluster_key(ws, admin_locator) -> None:
"""Register a job with a task referencing a non-existing job cluster."""
task = jobs.Task(task_key="test-task", job_cluster_key="non-existing-id")
settings = jobs.JobSettings(name="test-job", tasks=[task])
job = jobs.Job(job_id=1234, settings=settings)

sequencer = MigrationSequencer(ws, admin_locator)

maybe_node = sequencer.register_job(job)
maybe_node = sequencer.register_jobs(job)[0]

assert maybe_node.failed
assert maybe_node.problems == [
Expand All @@ -90,20 +90,20 @@ def test_register_job_with_non_existing_job_cluster_key(ws, admin_locator) -> No
]


def test_register_job_with_new_cluster(ws, admin_locator) -> None:
def test_register_jobs_with_new_cluster(ws, admin_locator) -> None:
"""Register a job with a task with a new cluster definition."""
task = jobs.Task(task_key="test-task", new_cluster=ClusterSpec())
settings = jobs.JobSettings(name="test-job", tasks=[task])
job = jobs.Job(job_id=1234, settings=settings)
ws.jobs.get.return_value = job
sequencer = MigrationSequencer(ws, admin_locator)

maybe_node = sequencer.register_job(job)
maybe_node = sequencer.register_jobs(job)[0]

assert not maybe_node.failed


def test_register_job_with_task_dependency(ws, admin_locator) -> None:
def test_register_jobs_with_task_dependency(ws, admin_locator) -> None:
"""Register a job with two tasks having a dependency."""
task1 = jobs.Task(task_key="task1")
task_dependency = jobs.TaskDependency(task1.task_key)
Expand All @@ -113,20 +113,20 @@ def test_register_job_with_task_dependency(ws, admin_locator) -> None:
job = jobs.Job(job_id=1234, settings=settings)
sequencer = MigrationSequencer(ws, admin_locator)

maybe_node = sequencer.register_job(job)
maybe_node = sequencer.register_jobs(job)[0]

assert not maybe_node.failed


def test_register_job_with_non_existing_task_dependency(ws, admin_locator) -> None:
def test_register_jobs_with_non_existing_task_dependency(ws, admin_locator) -> None:
"""Register a job with a non-existing task dependency."""
task_dependency = jobs.TaskDependency("non-existing-id")
task = jobs.Task(task_key="task2", depends_on=[task_dependency])
settings = jobs.JobSettings(name="job", tasks=[task])
job = jobs.Job(job_id=1234, settings=settings)
sequencer = MigrationSequencer(ws, admin_locator)

maybe_node = sequencer.register_job(job)
maybe_node = sequencer.register_jobs(job)[0]

assert maybe_node.failed
assert maybe_node.problems == [
Expand Down Expand Up @@ -160,7 +160,7 @@ def get_cluster(cluster_id: str) -> ClusterDetails:
ws.clusters.get.side_effect = get_cluster

sequencer = MigrationSequencer(ws, admin_locator)
sequencer.register_job(job)
sequencer.register_jobs(job)

steps = list(sequencer.generate_steps())

Expand Down Expand Up @@ -208,7 +208,7 @@ def test_sequence_steps_from_job_task_with_existing_job_cluster_key(ws, admin_lo
settings = jobs.JobSettings(name="test-job", tasks=[task], job_clusters=[job_cluster])
job = jobs.Job(job_id=1234, settings=settings)
sequencer = MigrationSequencer(ws, admin_locator)
sequencer.register_job(job)
sequencer.register_jobs(job)

steps = list(sequencer.generate_steps())

Expand Down Expand Up @@ -254,7 +254,7 @@ def test_sequence_steps_from_job_task_with_new_cluster(ws, admin_locator) -> Non
settings = jobs.JobSettings(name="test-job", tasks=[task])
job = jobs.Job(job_id=1234, settings=settings)
sequencer = MigrationSequencer(ws, admin_locator)
sequencer.register_job(job)
sequencer.register_jobs(job)

steps = list(sequencer.generate_steps())

Expand Down Expand Up @@ -292,7 +292,7 @@ def test_sequence_steps_from_job_task_with_non_existing_cluster(ws, admin_locato
settings = jobs.JobSettings(name="test-job", tasks=[task])
job = jobs.Job(job_id=1234, settings=settings)
sequencer = MigrationSequencer(ws, admin_locator)
sequencer.register_job(job)
sequencer.register_jobs(job)

steps = list(sequencer.generate_steps())

Expand Down Expand Up @@ -334,7 +334,7 @@ def test_sequence_steps_from_job_task_referencing_other_task(ws, admin_locator)
job = jobs.Job(job_id=1234, settings=settings)
sequencer = MigrationSequencer(ws, admin_locator)

maybe_job_node = sequencer.register_job(job)
maybe_job_node = sequencer.register_jobs(job)[0]
assert not maybe_job_node.failed

steps = list(sequencer.generate_steps())
Expand Down

0 comments on commit ba8b4c6

Please sign in to comment.