-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement migration sequencing (phase 2) #3009
Open
ericvergnaud
wants to merge
33
commits into
main
Choose a base branch
from
migration-sequencing-phase-2
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+423
−43
Open
Changes from all commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
0b96a34
make simple_dependency_resolver available more broadly
ericvergnaud a34001b
build migration steps for workflow task
ericvergnaud 5272981
fix pylint warnings
ericvergnaud be30d4c
fix pylint warnings
ericvergnaud 30872fc
add object name
ericvergnaud b53986a
populate object owner
ericvergnaud c15e230
be more defensive
ericvergnaud f2ce384
move last_node_id to sequencer
ericvergnaud 9c5d569
cherry-pick changes
ericvergnaud 27beade
use existing Ownership classes
ericvergnaud f642ea4
fix merge issues
ericvergnaud a1ae84a
create steps for source files
ericvergnaud 990223d
fix merge issues
ericvergnaud 56df506
register notebooks from dependency graph
ericvergnaud 840834d
fix merge issues
ericvergnaud 7f30ae6
mock WorkspaceCache for testing
ericvergnaud 6a0f873
populate ownership - leave the correct implementation to issue #3003
ericvergnaud 0d4d2b0
fix incorrect step sequence
ericvergnaud 9603c17
fix incorrect step sequence
ericvergnaud 5b4e0e6
basic support of cyclic dependencies
ericvergnaud 4e2aedc
rename local
ericvergnaud 206cb36
formatting
ericvergnaud 5eacacc
formatting
ericvergnaud 1e7e0e8
move package
ericvergnaud 9ac4295
improve assert style
ericvergnaud 17a33e3
formatting
ericvergnaud da0330d
make 'incoming' transient and improve comments
ericvergnaud ae1c697
Merge branch 'migration-sequencing-phase-1' into migration-sequencing…
ericvergnaud 2574b1d
Merge branch 'main' into migration-sequencing-phase-1
ericvergnaud 4abda78
Merge branch 'migration-sequencing-phase-1' into migration-sequencing…
ericvergnaud 1502ad8
use WorkspacePathOwnership
ericvergnaud 8c666e5
Merge branch 'main' into migration-sequencing-phase-1
ericvergnaud 3c34640
Merge branch 'migration-sequencing-phase-1' into migration-sequencing…
ericvergnaud File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,250 @@ | ||
from __future__ import annotations | ||
|
||
from collections import defaultdict | ||
from collections.abc import Iterable | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
|
||
from databricks.sdk import WorkspaceClient | ||
from databricks.sdk.service import jobs | ||
|
||
from databricks.labs.blueprint.paths import WorkspacePath | ||
|
||
from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo | ||
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo | ||
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership | ||
from databricks.labs.ucx.source_code.graph import DependencyGraph | ||
from databricks.labs.ucx.source_code.path_lookup import PathLookup | ||
|
||
|
||
@dataclass | ||
class MigrationStep: | ||
step_id: int | ||
step_number: int | ||
object_type: str | ||
object_id: str | ||
object_name: str | ||
object_owner: str | ||
required_step_ids: list[int] | ||
|
||
@property | ||
def key(self) -> tuple[str, str]: | ||
return self.object_type, self.object_id | ||
|
||
|
||
@dataclass | ||
class MigrationNode: | ||
node_id: int | ||
object_type: str | ||
object_id: str | ||
object_name: str | ||
object_owner: str | ||
|
||
@property | ||
def key(self) -> tuple[str, str]: | ||
return self.object_type, self.object_id | ||
|
||
def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationStep: | ||
return MigrationStep( | ||
step_id=self.node_id, | ||
step_number=step_number, | ||
object_type=self.object_type, | ||
object_id=self.object_id, | ||
object_name=self.object_name, | ||
object_owner=self.object_owner, | ||
required_step_ids=required_step_ids, | ||
) | ||
|
||
|
||
class MigrationSequencer: | ||
|
||
def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, admin_locator: AdministratorLocator): | ||
self._ws = ws | ||
self._path_lookup = path_lookup | ||
self._admin_locator = admin_locator | ||
self._last_node_id = 0 | ||
self._nodes: dict[tuple[str, str], MigrationNode] = {} | ||
self._outgoing: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) | ||
|
||
def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: DependencyGraph) -> MigrationNode: | ||
task_id = f"{job.job_id}/{task.task_key}" | ||
task_node = self._nodes.get(("TASK", task_id), None) | ||
if task_node: | ||
return task_node | ||
job_node = self.register_workflow_job(job) | ||
self._last_node_id += 1 | ||
task_node = MigrationNode( | ||
node_id=self._last_node_id, | ||
object_type="TASK", | ||
object_id=task_id, | ||
object_name=task.task_key, | ||
object_owner=job_node.object_owner, # no task owner so use job one | ||
) | ||
self._nodes[task_node.key] = task_node | ||
self._outgoing[task_node.key].add(job_node.key) | ||
if task.existing_cluster_id: | ||
cluster_node = self.register_cluster(task.existing_cluster_id) | ||
if cluster_node: | ||
self._outgoing[task_node.key].add(cluster_node.key) | ||
# also make the cluster dependent on the job | ||
self._outgoing[job_node.key].add(cluster_node.key) | ||
graph.visit(self._visit_dependency, None) | ||
return task_node | ||
|
||
def _visit_dependency(self, graph: DependencyGraph) -> bool | None: | ||
lineage = graph.dependency.lineage[-1] | ||
parent_node = self._nodes[(lineage.object_type, lineage.object_id)] | ||
for dependency in graph.local_dependencies: | ||
lineage = dependency.lineage[-1] | ||
self.register_dependency(parent_node, lineage.object_type, lineage.object_id) | ||
# TODO tables and dfsas | ||
return False | ||
|
||
def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: | ||
dependency_node = self._nodes.get((object_type, object_id), None) | ||
if not dependency_node: | ||
dependency_node = self._create_dependency_node(object_type, object_id) | ||
if parent_node: | ||
self._outgoing[dependency_node.key].add(parent_node.key) | ||
return dependency_node | ||
|
||
def _create_dependency_node(self, object_type: str, object_id: str) -> MigrationNode: | ||
object_name: str = "<ANONYMOUS>" | ||
_object_owner: str = "<UNKNOWN>" | ||
if object_type in {"NOTEBOOK", "FILE"}: | ||
path = Path(object_id) | ||
for library_root in self._path_lookup.library_roots: | ||
if not path.is_relative_to(library_root): | ||
continue | ||
object_name = path.relative_to(library_root).as_posix() | ||
break | ||
ws_path = WorkspacePath(self._ws, object_id) | ||
object_owner = WorkspacePathOwnership(self._admin_locator, self._ws).owner_of(ws_path) | ||
else: | ||
raise ValueError(f"{object_type} not supported yet!") | ||
self._last_node_id += 1 | ||
dependency_node = MigrationNode( | ||
node_id=self._last_node_id, | ||
object_type=object_type, | ||
object_id=object_id, | ||
object_name=object_name, | ||
object_owner=object_owner, | ||
) | ||
self._nodes[dependency_node.key] = dependency_node | ||
return dependency_node | ||
|
||
def register_workflow_job(self, job: jobs.Job) -> MigrationNode: | ||
job_node = self._nodes.get(("WORKFLOW", str(job.job_id)), None) | ||
if job_node: | ||
return job_node | ||
self._last_node_id += 1 | ||
job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) | ||
job_node = MigrationNode( | ||
node_id=self._last_node_id, | ||
object_type="WORKFLOW", | ||
object_id=str(job.job_id), | ||
object_name=job_name, | ||
object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)), | ||
) | ||
self._nodes[job_node.key] = job_node | ||
if job.settings and job.settings.job_clusters: | ||
for job_cluster in job.settings.job_clusters: | ||
cluster_node = self.register_job_cluster(job_cluster) | ||
if cluster_node: | ||
self._outgoing[job_node.key].add(cluster_node.key) | ||
return job_node | ||
|
||
def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None: | ||
if cluster.new_cluster: | ||
return None | ||
return self.register_cluster(cluster.job_cluster_key) | ||
|
||
def register_cluster(self, cluster_id: str) -> MigrationNode: | ||
cluster_node = self._nodes.get(("CLUSTER", cluster_id), None) | ||
if cluster_node: | ||
return cluster_node | ||
details = self._ws.clusters.get(cluster_id) | ||
object_name = details.cluster_name if details and details.cluster_name else cluster_id | ||
self._last_node_id += 1 | ||
cluster_node = MigrationNode( | ||
node_id=self._last_node_id, | ||
object_type="CLUSTER", | ||
object_id=cluster_id, | ||
object_name=object_name, | ||
object_owner=ClusterOwnership(self._admin_locator).owner_of(ClusterInfo.from_cluster_details(details)), | ||
) | ||
self._nodes[cluster_node.key] = cluster_node | ||
# TODO register warehouses and policies | ||
return cluster_node | ||
|
||
def generate_steps(self) -> Iterable[MigrationStep]: | ||
"""The below algo is adapted from Kahn's topological sort. | ||
The differences are as follows: | ||
1) we want the same step number for all nodes with same dependency depth | ||
so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed | ||
(these are transient 'leaf' nodes i.e. they only become 'leaf' during processing) | ||
2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG | ||
so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order | ||
to avoid an infinite loop. We also avoid side effects (such as negative counts). | ||
This algo works correctly for simple cases, but is not tested on large trees. | ||
""" | ||
incoming_keys = self._collect_incoming_keys() | ||
incoming_counts = self._compute_incoming_counts(incoming_keys) | ||
step_number = 1 | ||
sorted_steps: list[MigrationStep] = [] | ||
while len(incoming_counts) > 0: | ||
leaf_keys = self._get_leaf_keys(incoming_counts) | ||
for leaf_key in leaf_keys: | ||
del incoming_counts[leaf_key] | ||
sorted_steps.append( | ||
self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(incoming_keys[leaf_key]))) | ||
) | ||
self._on_leaf_key_processed(leaf_key, incoming_counts) | ||
step_number += 1 | ||
return sorted_steps | ||
|
||
def _on_leaf_key_processed(self, leaf_key: tuple[str, str], incoming_counts: dict[tuple[str, str], int]): | ||
for dependency_key in self._outgoing[leaf_key]: | ||
# prevent re-instantiation of already deleted keys | ||
if dependency_key not in incoming_counts: | ||
continue | ||
# prevent negative count with cyclic dependencies | ||
if incoming_counts[dependency_key] > 0: | ||
incoming_counts[dependency_key] -= 1 | ||
|
||
def _collect_incoming_keys(self) -> dict[tuple[str, str], set[tuple[str, str]]]: | ||
result: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) | ||
for source, outgoing in self._outgoing.items(): | ||
for target in outgoing: | ||
result[target].add(source) | ||
return result | ||
|
||
def _required_step_ids(self, required_step_keys: set[tuple[str, str]]) -> Iterable[int]: | ||
for source_key in required_step_keys: | ||
yield self._nodes[source_key].node_id | ||
|
||
def _compute_incoming_counts( | ||
self, incoming: dict[tuple[str, str], set[tuple[str, str]]] | ||
) -> dict[tuple[str, str], int]: | ||
result = defaultdict(int) | ||
for node_key in self._nodes: | ||
result[node_key] = len(incoming[node_key]) | ||
return result | ||
|
||
@classmethod | ||
def _get_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: | ||
max_count = 0 | ||
leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) | ||
# if we're not finding nodes with 0 incoming counts, it's likely caused by cyclic dependencies | ||
# in which case it's safe to process nodes with a higher incoming count | ||
while not leaf_keys: | ||
max_count += 1 | ||
leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) | ||
return leaf_keys | ||
|
||
@classmethod | ||
def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], max_count: int) -> Iterable[tuple[str, str]]: | ||
for node_key, incoming_count in incoming_counts.items(): | ||
if incoming_count > max_count: | ||
continue | ||
yield node_key |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,8 +78,8 @@ def as_message(self) -> str: | |
|
||
|
||
class WorkflowTask(Dependency): | ||
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): | ||
loader = WrappingLoader(WorkflowTaskContainer(ws, task, job)) | ||
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't add |
||
loader = WrappingLoader(WorkflowTaskContainer(ws, task, job, cache)) | ||
super().__init__(loader, Path(f'/jobs/{task.task_key}'), inherits_context=False) | ||
self._task = task | ||
self._job = job | ||
|
@@ -99,11 +99,11 @@ def lineage(self) -> list[LineageAtom]: | |
|
||
|
||
class WorkflowTaskContainer(SourceContainer): | ||
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): | ||
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): | ||
self._task = task | ||
self._job = job | ||
self._ws = ws | ||
self._cache = WorkspaceCache(ws) | ||
self._cache = cache or WorkspaceCache(ws) | ||
self._named_parameters: dict[str, str] | None = {} | ||
self._parameters: list[str] | None = [] | ||
self._spark_conf: dict[str, str] | None = {} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where this exception is caught? it'll crash the assessment workflow if unhandled.