diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py index 4fa660f558..2e6217dc12 100644 --- a/src/databricks/labs/ucx/assessment/sequencing.py +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -5,6 +5,7 @@ from collections import defaultdict from collections.abc import Iterable from dataclasses import dataclass, field +from pathlib import Path from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError @@ -12,8 +13,9 @@ from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo -from databricks.labs.ucx.framework.owners import AdministratorLocator -from databricks.labs.ucx.source_code.graph import DependencyProblem +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership +from databricks.labs.ucx.source_code.graph import DependencyGraph, DependencyProblem +from databricks.labs.ucx.source_code.path_lookup import PathLookup @dataclass @@ -39,6 +41,9 @@ class MigrationStep: required_step_ids: list[int] """The step ids that should be completed before this step is started.""" + @property + def key(self) -> tuple[str, str]: + return self.object_type, self.object_id MigrationNodeKey = tuple[str, str] @@ -148,8 +153,9 @@ class MigrationSequencer: Analysing the graph in this case means: computing the migration sequence in `meth:generate_steps`. """ - def __init__(self, ws: WorkspaceClient, administrator_locator: AdministratorLocator): + def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, administrator_locator: AdministratorLocator): self._ws = ws + self._path_lookup = path_lookup self._admin_locator = administrator_locator self._counter = itertools.count() self._nodes: dict[MigrationNodeKey, MigrationNode] = {} @@ -220,7 +226,7 @@ def _register_job(self, job: Job) -> MaybeMigrationNode: problems.append(problem) return MaybeMigrationNode(job_node, problems) - def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMigrationNode: + def _register_workflow_task(self, task: Task, parent: MigrationNode, graph: DependencyGraph) -> MaybeMigrationNode: """Register a workflow task. TODO: @@ -262,8 +268,51 @@ def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMig else: problem = DependencyProblem('cluster-not-found', f"Could not find cluster: {task.job_cluster_key}") problems.append(problem) + graph.visit(self._visit_dependency, None) return MaybeMigrationNode(task_node, problems) + def _visit_dependency(self, graph: DependencyGraph) -> bool | None: + lineage = graph.dependency.lineage[-1] + parent_node = self._nodes[(lineage.object_type, lineage.object_id)] + for dependency in graph.local_dependencies: + lineage = dependency.lineage[-1] + self.register_dependency(parent_node, lineage.object_type, lineage.object_id) + # TODO tables and dfsas + return False + + def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: + dependency_node = self._nodes.get((object_type, object_id), None) + if not dependency_node: + dependency_node = self._create_dependency_node(object_type, object_id) + if parent_node: + self._incoming[parent_node.key].add(dependency_node.key) + self._outgoing[dependency_node.key].add(parent_node.key) + return dependency_node + + def _create_dependency_node(self, object_type: str, object_id: str) -> MigrationNode: + object_name: str = "" + _object_owner: str = "" + if object_type in {"NOTEBOOK", "FILE"}: + path = Path(object_id) + for library_root in self._path_lookup.library_roots: + if not path.is_relative_to(library_root): + continue + object_name = path.relative_to(library_root).as_posix() + break + object_owner = WorkspaceObjectOwnership(self._admin_locator).owner_of((object_type, object_id)) + else: + raise ValueError(f"{object_type} not supported yet!") + self._last_node_id += 1 + dependency_node = MigrationNode( + node_id=self._last_node_id, + object_type=object_type, + object_id=object_id, + object_name=object_name, + object_owner=object_owner, + ) + self._nodes[dependency_node.key] = dependency_node + return dependency_node + def _register_job_cluster(self, cluster: JobCluster, parent: MigrationNode) -> MaybeMigrationNode: """Register a job cluster. @@ -322,6 +371,16 @@ def generate_steps(self) -> Iterable[MigrationStep]: leaf during processing) - We handle cyclic dependencies (implemented in PR #3009) """ + """The below algo is adapted from Kahn's topological sort. + The main differences are as follows: + 1) we want the same step number for all nodes with same dependency depth + so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed + (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing) + 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG + so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order + to avoid an infinite loop. We also avoid side effects (such as negative counts). + This algo works correctly for simple cases, but is not tested on large trees. + """ ordered_steps: list[MigrationStep] = [] # For updating the priority of steps that depend on other steps incoming_references = self._invert_outgoing_to_incoming_references() diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index 55a1ddac98..7fcec332f2 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -255,3 +255,10 @@ def _maybe_direct_owner(self, record: str) -> str | None: return None except InternalError: # redash is very naughty and throws 500s instead of proper 404s return None + + +class WorkspaceObjectOwnership(Ownership[tuple[str, str]]): + + def _maybe_direct_owner(self, record: tuple[str, str]) -> str | None: + # TODO: tuple[0] = object_type, tuple[1] = object_id + return None diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index 18c6ff745a..59fa6b9eca 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -78,8 +78,8 @@ def as_message(self) -> str: class WorkflowTask(Dependency): - def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): - loader = WrappingLoader(WorkflowTaskContainer(ws, task, job)) + def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): + loader = WrappingLoader(WorkflowTaskContainer(ws, task, job, cache)) super().__init__(loader, Path(f'/jobs/{task.task_key}'), inherits_context=False) self._task = task self._job = job @@ -99,11 +99,11 @@ def lineage(self) -> list[LineageAtom]: class WorkflowTaskContainer(SourceContainer): - def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): + def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): self._task = task self._job = job self._ws = ws - self._cache = WorkspaceCache(ws) + self._cache = cache or WorkspaceCache(ws) self._named_parameters: dict[str, str] | None = {} self._parameters: list[str] | None = [] self._spark_conf: dict[str, str] | None = {} diff --git a/tests/unit/assessment/test_sequencing.py b/tests/unit/assessment/test_sequencing.py index 1708c9a537..fa93927831 100644 --- a/tests/unit/assessment/test_sequencing.py +++ b/tests/unit/assessment/test_sequencing.py @@ -280,6 +280,39 @@ def test_sequence_steps_from_job_task_with_new_cluster(ws, admin_locator) -> Non ] +class _MigrationSequencer(MigrationSequencer): + + def visit_graph(self, graph: DependencyGraph): + graph.visit(self._visit_dependency, None) + + +def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, mock_path_lookup): + root = Dependency(FileLoader(), Path("root.py")) + root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_a = Dependency(FileLoader(), Path("a.py")) + child_graph_a = _DependencyGraph( + child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) + child_b = Dependency(FileLoader(), Path("b.py")) + child_graph_b = _DependencyGraph( + child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) + # root imports a and b + root_graph.add_dependency(child_graph_a) + root_graph.add_dependency(child_graph_b) + # a imports b + child_graph_a.add_dependency(child_graph_b) + # b imports a (using local import) + child_graph_b.add_dependency(child_graph_a) + sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) + sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id) + sequencer.visit_graph(root_graph) + steps = list(sequencer.generate_steps()) + assert len(steps) == 3 + assert steps[2].object_id == "root.py" + + + def test_sequence_steps_from_job_task_with_non_existing_cluster(ws, admin_locator) -> None: """Sequence a job with a task that references a non-existing cluster.