From 0b96a349a9bb97ca4b11d2604423be261a31c25c Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 10:51:55 +0200 Subject: [PATCH 01/30] make simple_dependency_resolver available more broadly --- tests/unit/conftest.py | 15 ++++++++++++++- tests/unit/source_code/conftest.py | 15 --------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 2c8cbfd3b2..92e86ec73e 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -10,13 +10,17 @@ from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler -from databricks.labs.ucx.source_code.graph import BaseNotebookResolver +from databricks.labs.ucx.source_code.graph import BaseNotebookResolver, DependencyResolver +from databricks.labs.ucx.source_code.known import KnownList +from databricks.labs.ucx.source_code.linters.files import ImportFileResolver, FileLoader +from databricks.labs.ucx.source_code.notebooks.loaders import NotebookResolver, NotebookLoader from databricks.labs.ucx.source_code.path_lookup import PathLookup from databricks.sdk import AccountClient from databricks.sdk.config import Config from databricks.labs.ucx.config import WorkspaceConfig from databricks.labs.ucx.contexts.workflow_task import RuntimeContext +from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver from . import mock_workspace_client @@ -201,3 +205,12 @@ def mock_backend() -> MockBackend: @pytest.fixture def ws(): return mock_workspace_client() + + +@pytest.fixture +def simple_dependency_resolver(mock_path_lookup: PathLookup) -> DependencyResolver: + allow_list = KnownList() + library_resolver = PythonLibraryResolver(allow_list) + notebook_resolver = NotebookResolver(NotebookLoader()) + import_resolver = ImportFileResolver(FileLoader(), allow_list) + return DependencyResolver(library_resolver, notebook_resolver, import_resolver, import_resolver, mock_path_lookup) diff --git a/tests/unit/source_code/conftest.py b/tests/unit/source_code/conftest.py index 6029ce4d82..9c999d92dc 100644 --- a/tests/unit/source_code/conftest.py +++ b/tests/unit/source_code/conftest.py @@ -1,12 +1,6 @@ import pytest from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus -from databricks.labs.ucx.source_code.graph import DependencyResolver -from databricks.labs.ucx.source_code.known import KnownList -from databricks.labs.ucx.source_code.linters.files import ImportFileResolver, FileLoader -from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader, NotebookResolver -from databricks.labs.ucx.source_code.path_lookup import PathLookup -from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver @pytest.fixture @@ -51,12 +45,3 @@ def extended_test_index(): ), ] ) - - -@pytest.fixture -def simple_dependency_resolver(mock_path_lookup: PathLookup) -> DependencyResolver: - allow_list = KnownList() - library_resolver = PythonLibraryResolver(allow_list) - notebook_resolver = NotebookResolver(NotebookLoader()) - import_resolver = ImportFileResolver(FileLoader(), allow_list) - return DependencyResolver(library_resolver, notebook_resolver, import_resolver, import_resolver, mock_path_lookup) From a34001b43e649c47d3d8d65fbc64890b80598f93 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 10:54:19 +0200 Subject: [PATCH 02/30] build migration steps for workflow task --- .../labs/ucx/sequencing/__init__.py | 0 .../labs/ucx/sequencing/sequencing.py | 145 ++++++++++++++++++ tests/unit/sequencing/__init__.py | 0 tests/unit/sequencing/test_sequencing.py | 22 +++ 4 files changed, 167 insertions(+) create mode 100644 src/databricks/labs/ucx/sequencing/__init__.py create mode 100644 src/databricks/labs/ucx/sequencing/sequencing.py create mode 100644 tests/unit/sequencing/__init__.py create mode 100644 tests/unit/sequencing/test_sequencing.py diff --git a/src/databricks/labs/ucx/sequencing/__init__.py b/src/databricks/labs/ucx/sequencing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py new file mode 100644 index 0000000000..f28400c3cd --- /dev/null +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import itertools +from collections.abc import Iterable +from dataclasses import dataclass, field + +from databricks.sdk.service import jobs + +from databricks.labs.ucx.source_code.graph import DependencyGraph + + +@dataclass +class MigrationStep: + step_id: int + step_number: int + object_type: str + object_id: str + object_owner: str + required_step_ids: list[int] = field(default_factory=list) + + +@dataclass +class MigrationNode: + last_node_id = 0 + node_id: int + object_type: str + object_id: str + object_owner: str + required_steps: list[MigrationNode] = field(default_factory=list) + + def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]: + # traverse the nodes using a depth-first algorithm + # ultimate leaves have a step number of 1 + # use highest required step number + 1 for this step + highest_step_number = 0 + required_step_ids: list[int] = [] + all_generated_steps: list[Iterable[MigrationStep]] = [] + for required_step in self.required_steps: + step, generated_steps = required_step.generate_steps() + highest_step_number = max(highest_step_number, step.step_number) + required_step_ids.append(step.step_id) + all_generated_steps.append(generated_steps) + all_generated_steps.append([step]) + this_step = MigrationStep( + step_id=self.node_id, + step_number=highest_step_number + 1, + object_type=self.object_type, + object_id=self.object_id, + object_owner=self.object_owner, + required_step_ids=required_step_ids, + ) + return this_step, itertools.chain(*all_generated_steps) + + def find(self, object_type: str, object_id: str) -> MigrationNode | None: + if object_type == self.object_type and object_id == self.object_id: + return self + for step in self.required_steps: + found = step.find(object_type, object_id) + if found: + return found + return None + + +class MigrationSequencer: + + def __init__(self): + self._root = MigrationNode(node_id=0, object_type="ROOT", object_id="ROOT", object_owner="NONE") + + def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: + task_node = self._find_node(object_type="TASK", object_id=task.task_key) + if task_node: + return task_node + job_node = self.register_workflow_job(job) + MigrationNode.last_node_id += 1 + task_node = MigrationNode( + node_id=MigrationNode.last_node_id, object_type="TASK", object_id=task.task_key, object_owner="NONE" + ) # TODO object_owner + job_node.required_steps.append(task_node) + if task.existing_cluster_id: + cluster_node = self.register_cluster(task.existing_cluster_id) + cluster_node.required_steps.append(task_node) + if job_node not in cluster_node.required_steps: + cluster_node.required_steps.append(job_node) + # TODO register dependency graph + return task_node + + def register_workflow_job(self, job: jobs.Job) -> MigrationNode: + job_node = self._find_node(object_type="JOB", object_id=str(job.job_id)) + if job_node: + return job_node + MigrationNode.last_node_id += 1 + job_node = MigrationNode( + node_id=MigrationNode.last_node_id, object_type="JOB", object_id=str(job.job_id), object_owner="NONE" + ) # TODO object_owner + top_level = True + if job.settings and job.settings.job_clusters: + for job_cluster in job.settings.job_clusters: + cluster_node = self.register_job_cluster(job_cluster) + if cluster_node: + top_level = False + cluster_node.required_steps.append(job_node) + if top_level: + self._root.required_steps.append(job_node) + return job_node + + def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None: + if cluster.new_cluster: + return None + return self.register_cluster(cluster.job_cluster_key) + + def register_cluster(self, cluster_key: str) -> MigrationNode: + cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key) + if cluster_node: + return cluster_node + MigrationNode.last_node_id += 1 + cluster_node = MigrationNode( + node_id=MigrationNode.last_node_id, object_type="CLUSTER", object_id=cluster_key, object_owner="NONE" + ) # TODO object_owner + # TODO register warehouses and policies + self._root.required_steps.append(cluster_node) + return cluster_node + + def generate_steps(self) -> Iterable[MigrationStep]: + _root_step, generated_steps = self._root.generate_steps() + unique_steps = self._deduplicate_steps(generated_steps) + return self._sorted_steps(unique_steps) + + @staticmethod + def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: + # sort by step number, lowest first + return sorted(steps, key=lambda step: step.step_number) + + @staticmethod + def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: + best_steps: dict[int, MigrationStep] = {} + for step in steps: + existing = best_steps.get(step.step_id, None) + # keep the step with the highest step number + if existing and existing.step_number >= step.step_number: + continue + best_steps[step.step_id] = step + return best_steps.values() + + def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None: + return self._root.find(object_type, object_id) diff --git a/tests/unit/sequencing/__init__.py b/tests/unit/sequencing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py new file mode 100644 index 0000000000..094767fc3e --- /dev/null +++ b/tests/unit/sequencing/test_sequencing.py @@ -0,0 +1,22 @@ +from databricks.sdk.service import jobs + +from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer +from databricks.labs.ucx.source_code.base import CurrentSessionState +from databricks.labs.ucx.source_code.graph import DependencyGraph +from databricks.labs.ucx.source_code.jobs import WorkflowTask + + +def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup): + task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + ws.jobs.get.return_value = job + dependency = WorkflowTask(ws, task, job) + graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + sequencer = MigrationSequencer() + sequencer.register_workflow_task(task, job, graph) + steps = list(sequencer.generate_steps()) + step = steps[-1] + assert step.object_type == "CLUSTER" + assert step.object_id == "cluster-123" + assert step.step_number == 3 From 52729812a95e56f5f4f6ef3d05975c6a1ce09809 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 11:54:04 +0200 Subject: [PATCH 03/30] fix pylint warnings --- tests/unit/conftest.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 92e86ec73e..4eee45fca3 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -61,8 +61,10 @@ class CustomIterator: def __init__(self, values): self._values = iter(values) self._has_next = True + self._next_value = None - def hasNext(self): # pylint: disable=invalid-name + # pylint: disable=invalid-name + def hasNext(self): try: self._next_value = next(self._values) self._has_next = True @@ -154,9 +156,11 @@ def inner(cb, **replace) -> RuntimeContext: ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().listDatabases.return_value = ( mock_list_databases_iterator ) + # pylint: disable=protected-access ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().listTables.return_value = ( mock_list_tables_iterator ) + # pylint: disable=protected-access ctx.tables_crawler._spark._jsparkSession.sharedState().externalCatalog().getTable.return_value = ( get_table_mock ) @@ -169,8 +173,9 @@ def inner(cb, **replace) -> RuntimeContext: @pytest.fixture def acc_client(): - acc = create_autospec(AccountClient) # pylint: disable=mock-no-usage + acc = create_autospec(AccountClient) acc.config = Config(host="https://accounts.cloud.databricks.com", account_id="123", token="123") + acc.asset_not_called() return acc From be30d4c972691a37593b7a38393740df85a1e2e4 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 12:05:36 +0200 Subject: [PATCH 04/30] fix pylint warnings --- tests/unit/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 4eee45fca3..675f2012e6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -175,7 +175,7 @@ def inner(cb, **replace) -> RuntimeContext: def acc_client(): acc = create_autospec(AccountClient) acc.config = Config(host="https://accounts.cloud.databricks.com", account_id="123", token="123") - acc.asset_not_called() + acc.assert_not_called() return acc From 30872fc1d172df41b1583acca938a146e4893da0 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 12:06:04 +0200 Subject: [PATCH 05/30] add object name --- .../labs/ucx/sequencing/sequencing.py | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index f28400c3cd..b12fbcd0b2 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -15,6 +15,7 @@ class MigrationStep: step_number: int object_type: str object_id: str + object_name: str object_owner: str required_step_ids: list[int] = field(default_factory=list) @@ -25,6 +26,7 @@ class MigrationNode: node_id: int object_type: str object_id: str + object_name: str object_owner: str required_steps: list[MigrationNode] = field(default_factory=list) @@ -46,6 +48,7 @@ def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]: step_number=highest_step_number + 1, object_type=self.object_type, object_id=self.object_id, + object_name=self.object_name, object_owner=self.object_owner, required_step_ids=required_step_ids, ) @@ -64,16 +67,23 @@ def find(self, object_type: str, object_id: str) -> MigrationNode | None: class MigrationSequencer: def __init__(self): - self._root = MigrationNode(node_id=0, object_type="ROOT", object_id="ROOT", object_owner="NONE") + self._root = MigrationNode( + node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" + ) def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: - task_node = self._find_node(object_type="TASK", object_id=task.task_key) + task_id = f"{job.job_id}/{task.task_key}" + task_node = self._find_node(object_type="TASK", object_id=task_id) if task_node: return task_node job_node = self.register_workflow_job(job) MigrationNode.last_node_id += 1 task_node = MigrationNode( - node_id=MigrationNode.last_node_id, object_type="TASK", object_id=task.task_key, object_owner="NONE" + node_id=MigrationNode.last_node_id, + object_type="TASK", + object_id=task_id, + object_name=task.task_key, + object_owner="NONE", ) # TODO object_owner job_node.required_steps.append(task_node) if task.existing_cluster_id: @@ -89,8 +99,13 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: if job_node: return job_node MigrationNode.last_node_id += 1 + job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) job_node = MigrationNode( - node_id=MigrationNode.last_node_id, object_type="JOB", object_id=str(job.job_id), object_owner="NONE" + node_id=MigrationNode.last_node_id, + object_type="JOB", + object_id=str(job.job_id), + object_name=job_name, + object_owner="NONE", ) # TODO object_owner top_level = True if job.settings and job.settings.job_clusters: @@ -114,7 +129,11 @@ def register_cluster(self, cluster_key: str) -> MigrationNode: return cluster_node MigrationNode.last_node_id += 1 cluster_node = MigrationNode( - node_id=MigrationNode.last_node_id, object_type="CLUSTER", object_id=cluster_key, object_owner="NONE" + node_id=MigrationNode.last_node_id, + object_type="CLUSTER", + object_id=cluster_key, + object_name=cluster_key, + object_owner="NONE", ) # TODO object_owner # TODO register warehouses and policies self._root.required_steps.append(cluster_node) From b53986aec1da660d3e8857021ecab7dd36e8298a Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 12:14:11 +0200 Subject: [PATCH 06/30] populate object owner --- src/databricks/labs/ucx/sequencing/sequencing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index b12fbcd0b2..f66563de80 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -83,8 +83,8 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende object_type="TASK", object_id=task_id, object_name=task.task_key, - object_owner="NONE", - ) # TODO object_owner + object_owner=job_node.object_owner, # no task owner so use job one + ) job_node.required_steps.append(task_node) if task.existing_cluster_id: cluster_node = self.register_cluster(task.existing_cluster_id) @@ -105,8 +105,8 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: object_type="JOB", object_id=str(job.job_id), object_name=job_name, - object_owner="NONE", - ) # TODO object_owner + object_owner=job.creator_user_name or "", + ) top_level = True if job.settings and job.settings.job_clusters: for job_cluster in job.settings.job_clusters: From c15e2305ad7b78dde05d984c79be3285fc468c52 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 12:48:00 +0200 Subject: [PATCH 07/30] be more defensive --- .../labs/ucx/sequencing/sequencing.py | 17 ++++++++++++----- tests/unit/sequencing/test_sequencing.py | 8 +++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index f66563de80..113b704d30 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -4,6 +4,7 @@ from collections.abc import Iterable from dataclasses import dataclass, field +from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -66,7 +67,8 @@ def find(self, object_type: str, object_id: str) -> MigrationNode | None: class MigrationSequencer: - def __init__(self): + def __init__(self, ws: WorkspaceClient): + self._ws = ws self._root = MigrationNode( node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" ) @@ -83,7 +85,7 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende object_type="TASK", object_id=task_id, object_name=task.task_key, - object_owner=job_node.object_owner, # no task owner so use job one + object_owner=job_node.object_owner, # no task owner so use job one ) job_node.required_steps.append(task_node) if task.existing_cluster_id: @@ -127,14 +129,17 @@ def register_cluster(self, cluster_key: str) -> MigrationNode: cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key) if cluster_node: return cluster_node + details = self._ws.clusters.get(cluster_key) + object_name = details.cluster_name if details and details.cluster_name else cluster_key + object_owner = details.creator_user_name if details and details.creator_user_name else "" MigrationNode.last_node_id += 1 cluster_node = MigrationNode( node_id=MigrationNode.last_node_id, object_type="CLUSTER", object_id=cluster_key, - object_name=cluster_key, - object_owner="NONE", - ) # TODO object_owner + object_name=object_name, + object_owner=object_owner, + ) # TODO register warehouses and policies self._root.required_steps.append(cluster_node) return cluster_node @@ -155,6 +160,8 @@ def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep for step in steps: existing = best_steps.get(step.step_id, None) # keep the step with the highest step number + # TODO this possibly affects the step_number of steps that depend on this one + # but it's probably OK to not be 100% accurate initially if existing and existing.step_number >= step.step_number: continue best_steps[step.step_id] = step diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 094767fc3e..fa7271164e 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,4 +1,5 @@ from databricks.sdk.service import jobs +from databricks.sdk.service.compute import ClusterDetails from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState @@ -7,16 +8,21 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup): + ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) ws.jobs.get.return_value = job dependency = WorkflowTask(ws, task, job) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - sequencer = MigrationSequencer() + sequencer = MigrationSequencer(ws) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) step = steps[-1] + assert step.step_id assert step.object_type == "CLUSTER" assert step.object_id == "cluster-123" + assert step.object_name == "my-cluster" + assert step.object_owner == "John Doe" assert step.step_number == 3 + assert len(step.required_step_ids) == 2 From f2ce384c05b44dcd47b99d5e0e14eeba398116e9 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 10:53:39 +0200 Subject: [PATCH 08/30] move last_node_id to sequencer --- src/databricks/labs/ucx/sequencing/sequencing.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 113b704d30..4037a29697 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -23,7 +23,6 @@ class MigrationStep: @dataclass class MigrationNode: - last_node_id = 0 node_id: int object_type: str object_id: str @@ -69,6 +68,7 @@ class MigrationSequencer: def __init__(self, ws: WorkspaceClient): self._ws = ws + self._last_node_id = 0 self._root = MigrationNode( node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" ) @@ -79,9 +79,9 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende if task_node: return task_node job_node = self.register_workflow_job(job) - MigrationNode.last_node_id += 1 + self._last_node_id += 1 task_node = MigrationNode( - node_id=MigrationNode.last_node_id, + node_id=self._last_node_id, object_type="TASK", object_id=task_id, object_name=task.task_key, @@ -100,10 +100,10 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: job_node = self._find_node(object_type="JOB", object_id=str(job.job_id)) if job_node: return job_node - MigrationNode.last_node_id += 1 + self._last_node_id += 1 job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) job_node = MigrationNode( - node_id=MigrationNode.last_node_id, + node_id=self._last_node_id, object_type="JOB", object_id=str(job.job_id), object_name=job_name, @@ -132,9 +132,9 @@ def register_cluster(self, cluster_key: str) -> MigrationNode: details = self._ws.clusters.get(cluster_key) object_name = details.cluster_name if details and details.cluster_name else cluster_key object_owner = details.creator_user_name if details and details.creator_user_name else "" - MigrationNode.last_node_id += 1 + self._last_node_id += 1 cluster_node = MigrationNode( - node_id=MigrationNode.last_node_id, + node_id=self._last_node_id, object_type="CLUSTER", object_id=cluster_key, object_name=object_name, From 9c5d569ecedf3cf670ca21bc7572ff87336b1402 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 15:15:38 +0200 Subject: [PATCH 09/30] cherry-pick changes --- .../labs/ucx/sequencing/sequencing.py | 136 +++++++++--------- 1 file changed, 64 insertions(+), 72 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 4037a29697..063d0ca854 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -1,8 +1,8 @@ from __future__ import annotations -import itertools +from collections import defaultdict from collections.abc import Iterable -from dataclasses import dataclass, field +from dataclasses import dataclass from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs @@ -18,7 +18,7 @@ class MigrationStep: object_id: str object_name: str object_owner: str - required_step_ids: list[int] = field(default_factory=list) + required_step_ids: list[int] @dataclass @@ -28,40 +28,21 @@ class MigrationNode: object_id: str object_name: str object_owner: str - required_steps: list[MigrationNode] = field(default_factory=list) - - def generate_steps(self) -> tuple[MigrationStep, Iterable[MigrationStep]]: - # traverse the nodes using a depth-first algorithm - # ultimate leaves have a step number of 1 - # use highest required step number + 1 for this step - highest_step_number = 0 - required_step_ids: list[int] = [] - all_generated_steps: list[Iterable[MigrationStep]] = [] - for required_step in self.required_steps: - step, generated_steps = required_step.generate_steps() - highest_step_number = max(highest_step_number, step.step_number) - required_step_ids.append(step.step_id) - all_generated_steps.append(generated_steps) - all_generated_steps.append([step]) - this_step = MigrationStep( + + @property + def key(self) -> tuple[str, str]: + return self.object_type, self.object_id + + def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationStep: + return MigrationStep( step_id=self.node_id, - step_number=highest_step_number + 1, + step_number=step_number, object_type=self.object_type, object_id=self.object_id, object_name=self.object_name, object_owner=self.object_owner, required_step_ids=required_step_ids, ) - return this_step, itertools.chain(*all_generated_steps) - - def find(self, object_type: str, object_id: str) -> MigrationNode | None: - if object_type == self.object_type and object_id == self.object_id: - return self - for step in self.required_steps: - found = step.find(object_type, object_id) - if found: - return found - return None class MigrationSequencer: @@ -69,13 +50,13 @@ class MigrationSequencer: def __init__(self, ws: WorkspaceClient): self._ws = ws self._last_node_id = 0 - self._root = MigrationNode( - node_id=0, object_type="ROOT", object_id="ROOT", object_name="ROOT", object_owner="NONE" - ) + self._nodes: dict[tuple[str, str], MigrationNode] = {} + self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) + self._outgoing: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: task_id = f"{job.job_id}/{task.task_key}" - task_node = self._find_node(object_type="TASK", object_id=task_id) + task_node = self._nodes.get(("TASK", task_id), None) if task_node: return task_node job_node = self.register_workflow_job(job) @@ -87,17 +68,22 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende object_name=task.task_key, object_owner=job_node.object_owner, # no task owner so use job one ) - job_node.required_steps.append(task_node) + self._nodes[task_node.key] = task_node + self._incoming[job_node.key].add(task_node.key) + self._outgoing[task_node.key].add(job_node.key) if task.existing_cluster_id: cluster_node = self.register_cluster(task.existing_cluster_id) - cluster_node.required_steps.append(task_node) - if job_node not in cluster_node.required_steps: - cluster_node.required_steps.append(job_node) + if cluster_node: + self._incoming[cluster_node.key].add(task_node.key) + self._outgoing[task_node.key].add(cluster_node.key) + # also make the cluster dependent on the job + self._incoming[cluster_node.key].add(job_node.key) + self._outgoing[job_node.key].add(cluster_node.key) # TODO register dependency graph return task_node def register_workflow_job(self, job: jobs.Job) -> MigrationNode: - job_node = self._find_node(object_type="JOB", object_id=str(job.job_id)) + job_node = self._nodes.get(("JOB", str(job.job_id)), None) if job_node: return job_node self._last_node_id += 1 @@ -109,15 +95,13 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: object_name=job_name, object_owner=job.creator_user_name or "", ) - top_level = True + self._nodes[job_node.key] = job_node if job.settings and job.settings.job_clusters: for job_cluster in job.settings.job_clusters: cluster_node = self.register_job_cluster(job_cluster) if cluster_node: - top_level = False - cluster_node.required_steps.append(job_node) - if top_level: - self._root.required_steps.append(job_node) + self._incoming[cluster_node.key].add(job_node.key) + self._outgoing[job_node.key].add(cluster_node.key) return job_node def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None: @@ -125,47 +109,55 @@ def register_job_cluster(self, cluster: jobs.JobCluster) -> MigrationNode | None return None return self.register_cluster(cluster.job_cluster_key) - def register_cluster(self, cluster_key: str) -> MigrationNode: - cluster_node = self._find_node(object_type="CLUSTER", object_id=cluster_key) + def register_cluster(self, cluster_id: str) -> MigrationNode: + cluster_node = self._nodes.get(("CLUSTER", cluster_id), None) if cluster_node: return cluster_node - details = self._ws.clusters.get(cluster_key) - object_name = details.cluster_name if details and details.cluster_name else cluster_key - object_owner = details.creator_user_name if details and details.creator_user_name else "" + details = self._ws.clusters.get(cluster_id) + object_name = details.cluster_name if details and details.cluster_name else cluster_id self._last_node_id += 1 cluster_node = MigrationNode( node_id=self._last_node_id, object_type="CLUSTER", - object_id=cluster_key, + object_id=cluster_id, object_name=object_name, object_owner=object_owner, ) + self._nodes[cluster_node.key] = cluster_node # TODO register warehouses and policies - self._root.required_steps.append(cluster_node) return cluster_node def generate_steps(self) -> Iterable[MigrationStep]: - _root_step, generated_steps = self._root.generate_steps() - unique_steps = self._deduplicate_steps(generated_steps) - return self._sorted_steps(unique_steps) + # algo adapted from Kahn topological sort. The main differences is that + # we want the same step number for all nodes with same dependency depth + # so instead of pushing to a queue, we rebuild it once all leaf nodes are processed + # (these are transient leaf nodes i.e. they only become leaf during processing) + incoming_counts = self._populate_incoming_counts() + step_number = 1 + sorted_steps: list[MigrationStep] = [] + while len(incoming_counts) > 0: + leaf_keys = list(self._get_leaf_keys(incoming_counts)) + for leaf_key in leaf_keys: + del incoming_counts[leaf_key] + sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) + for dependency_key in self._outgoing[leaf_key]: + incoming_counts[dependency_key] -= 1 + step_number += 1 + return sorted_steps + + def _required_step_ids(self, node_key: tuple[str, str]) -> Iterable[int]: + for leaf_key in self._incoming[node_key]: + yield self._nodes[leaf_key].node_id + + def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: + result = defaultdict(int) + for node_key in self._nodes: + result[node_key] = len(self._incoming[node_key]) + return result @staticmethod - def _sorted_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: - # sort by step number, lowest first - return sorted(steps, key=lambda step: step.step_number) - - @staticmethod - def _deduplicate_steps(steps: Iterable[MigrationStep]) -> Iterable[MigrationStep]: - best_steps: dict[int, MigrationStep] = {} - for step in steps: - existing = best_steps.get(step.step_id, None) - # keep the step with the highest step number - # TODO this possibly affects the step_number of steps that depend on this one - # but it's probably OK to not be 100% accurate initially - if existing and existing.step_number >= step.step_number: + def _get_leaf_keys(incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: + for node_key, incoming_count in incoming_counts.items(): + if incoming_count > 0: continue - best_steps[step.step_id] = step - return best_steps.values() - - def _find_node(self, object_type: str, object_id: str) -> MigrationNode | None: - return self._root.find(object_type, object_id) + yield node_key From 27beade807257287ca79e66dc28de907376386ff Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:30:27 +0200 Subject: [PATCH 10/30] use existing Ownership classes --- .../labs/ucx/assessment/clusters.py | 24 +++++++++++-------- src/databricks/labs/ucx/assessment/jobs.py | 24 ++++++++++--------- .../labs/ucx/sequencing/sequencing.py | 7 ++++-- 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/clusters.py b/src/databricks/labs/ucx/assessment/clusters.py index 0e0624d3c2..95c825b04c 100644 --- a/src/databricks/labs/ucx/assessment/clusters.py +++ b/src/databricks/labs/ucx/assessment/clusters.py @@ -46,6 +46,18 @@ class ClusterInfo: creator: str | None = None """User-name of the creator of the cluster, if known.""" + @classmethod + def from_cluster_details(cls, details: ClusterDetails): + return ClusterInfo( + cluster_id=details.cluster_id if details.cluster_id else "", + cluster_name=details.cluster_name, + policy_id=details.policy_id, + spark_version=details.spark_version, + creator=details.creator_user_name or None, + success=1, + failures="[]", + ) + class CheckClusterMixin(CheckInitScriptMixin): _ws: WorkspaceClient @@ -152,7 +164,7 @@ def _crawl(self) -> Iterable[ClusterInfo]: all_clusters = list(self._ws.clusters.list()) return list(self._assess_clusters(all_clusters)) - def _assess_clusters(self, all_clusters): + def _assess_clusters(self, all_clusters: Iterable[ClusterDetails]): for cluster in all_clusters: if cluster.cluster_source == ClusterSource.JOB: continue @@ -162,15 +174,7 @@ def _assess_clusters(self, all_clusters): f"Cluster {cluster.cluster_id} have Unknown creator, it means that the original creator " f"has been deleted and should be re-created" ) - cluster_info = ClusterInfo( - cluster_id=cluster.cluster_id if cluster.cluster_id else "", - cluster_name=cluster.cluster_name, - policy_id=cluster.policy_id, - spark_version=cluster.spark_version, - creator=creator, - success=1, - failures="[]", - ) + cluster_info = ClusterInfo.from_cluster_details(cluster) failures = self._check_cluster_failures(cluster, "cluster") if len(failures) > 0: cluster_info.success = 0 diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index 667647d967..ed2fac89d3 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -20,6 +20,7 @@ RunType, SparkJarTask, SqlTask, + Job, ) from databricks.labs.ucx.assessment.clusters import CheckClusterMixin @@ -40,6 +41,17 @@ class JobInfo: creator: str | None = None """User-name of the creator of the pipeline, if known.""" + @classmethod + def from_job(cls, job: Job): + job_name = job.settings.name if job.settings and job.settings.name else "Unknown" + return JobInfo( + job_id=str(job.job_id), + success=1, + failures="[]", + job_name=job_name, + creator=job.creator_user_name or None, + ) + class JobsMixin: @classmethod @@ -124,17 +136,7 @@ def _prepare(all_jobs) -> tuple[dict[int, set[str]], dict[int, JobInfo]]: job_settings = job.settings if not job_settings: continue - job_name = job_settings.name - if not job_name: - job_name = "Unknown" - - job_details[job.job_id] = JobInfo( - job_id=str(job.job_id), - job_name=job_name, - creator=creator_user_name, - success=1, - failures="[]", - ) + job_details[job.job_id] = JobInfo.from_job(job) return job_assessment, job_details def _try_fetch(self) -> Iterable[JobInfo]: diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 063d0ca854..0b1e384297 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -7,6 +7,9 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs +from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo +from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo +from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -93,7 +96,7 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: object_type="JOB", object_id=str(job.job_id), object_name=job_name, - object_owner=job.creator_user_name or "", + object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)), ) self._nodes[job_node.key] = job_node if job.settings and job.settings.job_clusters: @@ -121,7 +124,7 @@ def register_cluster(self, cluster_id: str) -> MigrationNode: object_type="CLUSTER", object_id=cluster_id, object_name=object_name, - object_owner=object_owner, + object_owner=ClusterOwnership(self._admin_locator).owner_of(ClusterInfo.from_cluster_details(details)), ) self._nodes[cluster_node.key] = cluster_node # TODO register warehouses and policies From f642ea412916353eeee744b902de23c0f52f4b68 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:37:57 +0200 Subject: [PATCH 11/30] fix merge issues --- src/databricks/labs/ucx/sequencing/sequencing.py | 3 ++- tests/unit/sequencing/test_sequencing.py | 10 ++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 0b1e384297..a873ee1b7e 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -50,8 +50,9 @@ def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationSt class MigrationSequencer: - def __init__(self, ws: WorkspaceClient): + def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): self._ws = ws + self._admin_locator = admin_locator self._last_node_id = 0 self._nodes: dict[tuple[str, str], MigrationNode] = {} self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index fa7271164e..21d2a612d0 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,6 +1,9 @@ -from databricks.sdk.service import jobs +from unittest.mock import create_autospec + +from databricks.sdk.service import iam, jobs from databricks.sdk.service.compute import ClusterDetails +from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -15,7 +18,10 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_pat ws.jobs.get.return_value = job dependency = WorkflowTask(ws, task, job) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - sequencer = MigrationSequencer(ws) + admin_finder = create_autospec(AdministratorFinder) + admin_user = iam.User(user_name="John Doe", active=True, roles=[iam.ComplexValue(value="account_admin")]) + admin_finder.find_admin_users.return_value = (admin_user,) + sequencer = MigrationSequencer(ws, AdministratorLocator(ws, finders=[lambda _ws: admin_finder])) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) step = steps[-1] From a1ae84aef76b1f7ce45d5884d3ca86eef949de37 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 16 Oct 2024 15:35:23 +0200 Subject: [PATCH 12/30] create steps for source files --- .../labs/ucx/sequencing/sequencing.py | 45 +++++++++++++++++-- tests/unit/sequencing/test_sequencing.py | 32 ++++++++++++- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index a873ee1b7e..d9ffa6a092 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -2,7 +2,8 @@ from collections import defaultdict from collections.abc import Iterable -from dataclasses import dataclass +from dataclasses import dataclass, field +from pathlib import Path from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs @@ -11,6 +12,7 @@ from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.source_code.graph import DependencyGraph +from databricks.labs.ucx.source_code.path_lookup import PathLookup @dataclass @@ -51,6 +53,7 @@ def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationSt class MigrationSequencer: def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): + def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup): self._ws = ws self._admin_locator = admin_locator self._last_node_id = 0 @@ -58,7 +61,7 @@ def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) self._outgoing: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) - def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: + def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: DependencyGraph) -> MigrationNode: task_id = f"{job.job_id}/{task.task_key}" task_node = self._nodes.get(("TASK", task_id), None) if task_node: @@ -86,15 +89,49 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende # TODO register dependency graph return task_node + def _visit_dependency(self, graph: DependencyGraph) -> bool | None: + lineage = graph.dependency.lineage[-1] + parent_node = self._find_node(lineage.object_type, lineage.object_id) + for dependency in graph.local_dependencies: + lineage = dependency.lineage[-1] + child_node = self.register_dependency(lineage.object_type, lineage.object_id) + parent_node.required_steps.append(child_node) + # TODO tables and dfsas + return None + + def register_dependency(self, object_type: str, object_id: str) -> MigrationNode: + existing = self._find_node(object_type, object_id) + if existing: + return existing + object_name: str = "" + object_owner: str = "" + if object_type in { "NOTEBOOK", "FILE" }: + path = Path(object_id) + for library_root in self._path_lookup.library_roots: + if not path.is_relative_to(library_root): + continue + object_name = path.relative_to(library_root).as_posix() + break + else: + raise ValueError(f"{object_type} not supported yet!") + MigrationNode.last_node_id += 1 + return MigrationNode( + node_id=MigrationNode.last_node_id, + object_type=object_type, + object_id=object_id, + object_name=object_name, + object_owner=object_owner, + ) + def register_workflow_job(self, job: jobs.Job) -> MigrationNode: - job_node = self._nodes.get(("JOB", str(job.job_id)), None) + job_node = self._nodes.get(("WORKFLOW", str(job.job_id)), None) if job_node: return job_node self._last_node_id += 1 job_name = job.settings.name if job.settings and job.settings.name else str(job.job_id) job_node = MigrationNode( node_id=self._last_node_id, - object_type="JOB", + object_type="WORKFLOW", object_id=str(job.job_id), object_name=job_name, object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)), diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 21d2a612d0..83db9395ec 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,7 +1,11 @@ from unittest.mock import create_autospec from databricks.sdk.service import iam, jobs +from pathlib import Path + +from databricks.sdk.service import jobs from databricks.sdk.service.compute import ClusterDetails +from databricks.sdk.service.jobs import NotebookTask from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer @@ -10,7 +14,7 @@ from databricks.labs.ucx.source_code.jobs import WorkflowTask -def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_path_lookup): +def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_resolver, mock_path_lookup): ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") settings = jobs.JobSettings(name="test-job", tasks=[task]) @@ -32,3 +36,29 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_pat assert step.object_owner == "John Doe" assert step.step_number == 3 assert len(step.required_step_ids) == 2 + + +def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_resolver, mock_path_lookup): + functional = mock_path_lookup.resolve(Path("functional")) + mock_path_lookup.append_path(functional) + mock_path_lookup = mock_path_lookup.change_directory(functional) + notebook_path = Path("grand_parent_that_imports_parent_that_magic_runs_child.py") + notebook_task = NotebookTask(notebook_path=notebook_path.as_posix()) + task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123", notebook_task=notebook_task) + settings = jobs.JobSettings(name="test-job", tasks=[task]) + job = jobs.Job(job_id=1234, settings=settings) + ws.jobs.get.return_value = job + dependency = WorkflowTask(ws, task, job) + container = dependency.load(mock_path_lookup) + graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + problems = container.build_dependency_graph(graph) + assert not problems + sequencer = MigrationSequencer(ws, mock_path_lookup) + sequencer.register_workflow_task(task, job, graph) + steps = list(sequencer.generate_steps()) + names = {step.object_name for step in steps} + assert notebook_path.as_posix() in names + notebook_path = Path("parent_that_magic_runs_child_that_uses_value_from_parent.py") + assert notebook_path.as_posix() in names + notebook_path = Path("_child_that_uses_value_from_parent.py") + assert notebook_path.as_posix() in names From 990223d46f253fde83ce8e4f1a4c4863a6a8c263 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 17:13:07 +0200 Subject: [PATCH 13/30] fix merge issues --- .../labs/ucx/sequencing/sequencing.py | 27 ++++++++++--------- tests/unit/sequencing/test_sequencing.py | 14 ++++++---- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index d9ffa6a092..c3295aae6c 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -2,7 +2,7 @@ from collections import defaultdict from collections.abc import Iterable -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path from databricks.sdk import WorkspaceClient @@ -91,21 +91,20 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: Dependen def _visit_dependency(self, graph: DependencyGraph) -> bool | None: lineage = graph.dependency.lineage[-1] - parent_node = self._find_node(lineage.object_type, lineage.object_id) + parent_node = self._nodes[(lineage.object_type, lineage.object_id)] for dependency in graph.local_dependencies: lineage = dependency.lineage[-1] - child_node = self.register_dependency(lineage.object_type, lineage.object_id) - parent_node.required_steps.append(child_node) + self.register_dependency(parent_node, lineage.object_type, lineage.object_id) # TODO tables and dfsas return None - def register_dependency(self, object_type: str, object_id: str) -> MigrationNode: - existing = self._find_node(object_type, object_id) - if existing: - return existing + def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: + dependency_node = self._nodes.get((object_type, object_id), None) + if dependency_node: + return dependency_node object_name: str = "" object_owner: str = "" - if object_type in { "NOTEBOOK", "FILE" }: + if object_type in {"NOTEBOOK", "FILE"}: path = Path(object_id) for library_root in self._path_lookup.library_roots: if not path.is_relative_to(library_root): @@ -114,14 +113,18 @@ def register_dependency(self, object_type: str, object_id: str) -> MigrationNode break else: raise ValueError(f"{object_type} not supported yet!") - MigrationNode.last_node_id += 1 - return MigrationNode( - node_id=MigrationNode.last_node_id, + self._last_node_id += 1 + dependency_node = MigrationNode( + node_id=self._last_node_id, object_type=object_type, object_id=object_id, object_name=object_name, object_owner=object_owner, ) + self._nodes[dependency_node.key] = dependency_node + self._incoming[dependency_node.key].add(parent_node.key) + self._outgoing[parent_node.key].add(dependency_node.key) + return dependency_node def register_workflow_job(self, job: jobs.Job) -> MigrationNode: job_node = self._nodes.get(("WORKFLOW", str(job.job_id)), None) diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 83db9395ec..d74b95b8bc 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -14,6 +14,13 @@ from databricks.labs.ucx.source_code.jobs import WorkflowTask +def admin_locator(ws, user_name: str): + admin_finder = create_autospec(AdministratorFinder) + admin_user = iam.User(user_name=user_name, active=True, roles=[iam.ComplexValue(value="account_admin")]) + admin_finder.find_admin_users.return_value = (admin_user,) + return AdministratorLocator(ws, finders=[lambda _ws: admin_finder]) + + def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_resolver, mock_path_lookup): ws.clusters.get.return_value = ClusterDetails(cluster_name="my-cluster", creator_user_name="John Doe") task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123") @@ -22,10 +29,7 @@ def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_r ws.jobs.get.return_value = job dependency = WorkflowTask(ws, task, job) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - admin_finder = create_autospec(AdministratorFinder) - admin_user = iam.User(user_name="John Doe", active=True, roles=[iam.ComplexValue(value="account_admin")]) - admin_finder.find_admin_users.return_value = (admin_user,) - sequencer = MigrationSequencer(ws, AdministratorLocator(ws, finders=[lambda _ws: admin_finder])) + sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) step = steps[-1] @@ -53,7 +57,7 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) problems = container.build_dependency_graph(graph) assert not problems - sequencer = MigrationSequencer(ws, mock_path_lookup) + sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) names = {step.object_name for step in steps} From 56df5061e77901885067e74730f555a546d4b38b Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:10:34 +0200 Subject: [PATCH 14/30] register notebooks from dependency graph --- .../labs/ucx/sequencing/sequencing.py | 6 +++++- tests/unit/sequencing/test_sequencing.py | 17 +++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index c3295aae6c..76a337646b 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -25,6 +25,10 @@ class MigrationStep: object_owner: str required_step_ids: list[int] + @property + def key(self) -> tuple[str, str]: + return self.object_type, self.object_id + @dataclass class MigrationNode: @@ -86,7 +90,7 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, graph: Dependen # also make the cluster dependent on the job self._incoming[cluster_node.key].add(job_node.key) self._outgoing[job_node.key].add(cluster_node.key) - # TODO register dependency graph + graph.visit(self._visit_dependency, None) return task_node def _visit_dependency(self, graph: DependencyGraph) -> bool | None: diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index d74b95b8bc..ac4472a1f4 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -60,9 +60,14 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) - names = {step.object_name for step in steps} - assert notebook_path.as_posix() in names - notebook_path = Path("parent_that_magic_runs_child_that_uses_value_from_parent.py") - assert notebook_path.as_posix() in names - notebook_path = Path("_child_that_uses_value_from_parent.py") - assert notebook_path.as_posix() in names + step0 = next((step for step in steps if step.object_type == "TASK"), None) + assert step0 + step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) + assert step1 + assert step1.step_number > step0.step_number + step2 = next((step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), None) + assert step2 + assert step2.step_number > step1.step_number + step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) + assert step3 + assert step3.step_number > step2.step_number From 840834df942aafb3bee1f0581b6151169d8ed6b7 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 18:57:05 +0200 Subject: [PATCH 15/30] fix merge issues --- src/databricks/labs/ucx/sequencing/sequencing.py | 6 +++--- src/databricks/labs/ucx/source_code/jobs.py | 1 - tests/unit/sequencing/test_sequencing.py | 9 ++++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 76a337646b..612f339822 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -56,9 +56,9 @@ def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationSt class MigrationSequencer: - def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): - def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup): + def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, admin_locator: AdministratorLocator): self._ws = ws + self._path_lookup = path_lookup self._admin_locator = admin_locator self._last_node_id = 0 self._nodes: dict[tuple[str, str], MigrationNode] = {} @@ -100,7 +100,7 @@ def _visit_dependency(self, graph: DependencyGraph) -> bool | None: lineage = dependency.lineage[-1] self.register_dependency(parent_node, lineage.object_type, lineage.object_id) # TODO tables and dfsas - return None + return False def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: dependency_node = self._nodes.get((object_type, object_id), None) diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index 5565f1ef11..ab73045de1 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -34,7 +34,6 @@ SourceInfo, UsedTable, LineageAtom, - read_text, ) from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index ac4472a1f4..28d8947c1b 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,9 +1,9 @@ from unittest.mock import create_autospec -from databricks.sdk.service import iam, jobs from pathlib import Path -from databricks.sdk.service import jobs +from databricks.sdk.service import iam, jobs + from databricks.sdk.service.compute import ClusterDetails from databricks.sdk.service.jobs import NotebookTask @@ -65,7 +65,10 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) assert step1 assert step1.step_number > step0.step_number - step2 = next((step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), None) + step2 = next( + (step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), + None, + ) assert step2 assert step2.step_number > step1.step_number step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) From 7f30ae6efdc2ba32867d74cb8561bef887d0b8cc Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 17 Oct 2024 19:36:27 +0200 Subject: [PATCH 16/30] mock WorkspaceCache for testing --- src/databricks/labs/ucx/source_code/jobs.py | 9 +++++---- tests/unit/sequencing/test_sequencing.py | 5 ++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index ab73045de1..44fcc5c492 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -34,6 +34,7 @@ SourceInfo, UsedTable, LineageAtom, + read_text, ) from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, @@ -77,8 +78,8 @@ def as_message(self) -> str: class WorkflowTask(Dependency): - def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): - loader = WrappingLoader(WorkflowTaskContainer(ws, task, job)) + def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): + loader = WrappingLoader(WorkflowTaskContainer(ws, task, job, cache)) super().__init__(loader, Path(f'/jobs/{task.task_key}'), inherits_context=False) self._task = task self._job = job @@ -98,11 +99,11 @@ def lineage(self) -> list[LineageAtom]: class WorkflowTaskContainer(SourceContainer): - def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job): + def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None): self._task = task self._job = job self._ws = ws - self._cache = WorkspaceCache(ws) + self._cache = cache or WorkspaceCache(ws) self._named_parameters: dict[str, str] | None = {} self._parameters: list[str] | None = [] self._spark_conf: dict[str, str] | None = {} diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 28d8947c1b..e41e92358c 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -8,6 +8,7 @@ from databricks.sdk.service.jobs import NotebookTask from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder +from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.graph import DependencyGraph @@ -52,7 +53,9 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) ws.jobs.get.return_value = job - dependency = WorkflowTask(ws, task, job) + ws_cache = create_autospec(WorkspaceCache) + ws_cache.get_workspace_path.side_effect = lambda path: Path(path) + dependency = WorkflowTask(ws, task, job, ws_cache) container = dependency.load(mock_path_lookup) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) problems = container.build_dependency_graph(graph) From 6a0f873b657bc91320ebf3b92b9aa8e0bb1ad1cc Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 13:19:52 +0200 Subject: [PATCH 17/30] populate ownership - leave the correct implementation to issue #3003 --- src/databricks/labs/ucx/framework/owners.py | 7 +++++++ src/databricks/labs/ucx/sequencing/sequencing.py | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index a8ca725ed7..9939544c09 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -190,3 +190,10 @@ def owner_of(self, record: Record) -> str: def _maybe_direct_owner(self, record: Record) -> str | None: """Obtain the record-specific user-name associated with the given record, if any.""" return None + + +class WorkspaceObjectOwnership(Ownership[tuple[str, str]]): + + def _maybe_direct_owner(self, record: tuple[str, str]) -> str | None: + # TODO: tuple[0] = object_type, tuple[1] = object_id + return None diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 612f339822..eed753d7cb 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -10,7 +10,7 @@ from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo -from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership from databricks.labs.ucx.source_code.graph import DependencyGraph from databricks.labs.ucx.source_code.path_lookup import PathLookup @@ -115,6 +115,7 @@ def register_dependency(self, parent_node: MigrationNode, object_type: str, obje continue object_name = path.relative_to(library_root).as_posix() break + object_owner = WorkspaceObjectOwnership(self._admin_locator).owner_of((object_type, object_id)) else: raise ValueError(f"{object_type} not supported yet!") self._last_node_id += 1 From 0d4d2b0749b6a959033dc438310d551d3ba4a5e8 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 14:17:54 +0200 Subject: [PATCH 18/30] fix incorrect step sequence --- src/databricks/labs/ucx/sequencing/sequencing.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index eed753d7cb..2bca4cbe94 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -104,10 +104,16 @@ def _visit_dependency(self, graph: DependencyGraph) -> bool | None: def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: dependency_node = self._nodes.get((object_type, object_id), None) - if dependency_node: - return dependency_node + if not dependency_node: + dependency_node = self._create_dependency_node(object_type, object_id) + if parent_node: + self._incoming[parent_node.key].add(dependency_node.key) + self._outgoing[dependency_node.key].add(parent_node.key) + return dependency_node + + def _create_dependency_node(self, object_type: str, object_id: str) -> MigrationNode: object_name: str = "" - object_owner: str = "" + _object_owner: str = "" if object_type in {"NOTEBOOK", "FILE"}: path = Path(object_id) for library_root in self._path_lookup.library_roots: @@ -127,8 +133,6 @@ def register_dependency(self, parent_node: MigrationNode, object_type: str, obje object_owner=object_owner, ) self._nodes[dependency_node.key] = dependency_node - self._incoming[dependency_node.key].add(parent_node.key) - self._outgoing[parent_node.key].add(dependency_node.key) return dependency_node def register_workflow_job(self, job: jobs.Job) -> MigrationNode: From 9603c17706306c96d569b972d23c0cbf21a13c33 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 14:18:35 +0200 Subject: [PATCH 19/30] fix incorrect step sequence --- tests/unit/sequencing/test_sequencing.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index e41e92358c..fde276ccd2 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -11,8 +11,9 @@ from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState -from databricks.labs.ucx.source_code.graph import DependencyGraph +from databricks.labs.ucx.source_code.graph import DependencyGraph, Dependency from databricks.labs.ucx.source_code.jobs import WorkflowTask +from databricks.labs.ucx.source_code.linters.files import FileLoader def admin_locator(ws, user_name: str): @@ -67,13 +68,14 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso assert step0 step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) assert step1 - assert step1.step_number > step0.step_number + assert step1.step_number < step0.step_number step2 = next( (step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), None, ) assert step2 - assert step2.step_number > step1.step_number + assert step2.step_number < step1.step_number step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) assert step3 - assert step3.step_number > step2.step_number + assert step3.step_number < step2.step_number + From 5b4e0e63394e286cf4ebf575782b58afdcc8878a Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:03:53 +0200 Subject: [PATCH 20/30] basic support of cyclic dependencies --- .../labs/ucx/sequencing/sequencing.py | 40 ++++++++++++++----- tests/unit/sequencing/test_sequencing.py | 34 ++++++++++++++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 2bca4cbe94..a87c2fa3d4 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -181,20 +181,31 @@ def register_cluster(self, cluster_id: str) -> MigrationNode: return cluster_node def generate_steps(self) -> Iterable[MigrationStep]: - # algo adapted from Kahn topological sort. The main differences is that - # we want the same step number for all nodes with same dependency depth - # so instead of pushing to a queue, we rebuild it once all leaf nodes are processed - # (these are transient leaf nodes i.e. they only become leaf during processing) + """ The below algo is adapted from Kahn's topological sort. + The main differences are as follows: + 1) we want the same step number for all nodes with same dependency depth + so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed + (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing) + 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG + so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order + to avoid an infinite loop. We also avoid side effects (such as negative counts). + This algo works correctly for simple cases, but is not tested on large trees. + """ incoming_counts = self._populate_incoming_counts() step_number = 1 sorted_steps: list[MigrationStep] = [] while len(incoming_counts) > 0: - leaf_keys = list(self._get_leaf_keys(incoming_counts)) + leaf_keys = self._get_leaf_keys(incoming_counts) for leaf_key in leaf_keys: del incoming_counts[leaf_key] sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) for dependency_key in self._outgoing[leaf_key]: - incoming_counts[dependency_key] -= 1 + # prevent re-instantiation of already deleted keys + if dependency_key not in incoming_counts: + continue + # prevent negative count with cyclic dependencies + if incoming_counts[dependency_key] > 0: + incoming_counts[dependency_key] -= 1 step_number += 1 return sorted_steps @@ -208,9 +219,20 @@ def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: result[node_key] = len(self._incoming[node_key]) return result - @staticmethod - def _get_leaf_keys(incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: + @classmethod + def _get_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: + count = 0 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + # if we're not finding nodes with 0 incoming counts, it's likely caused by cyclic dependencies + # in which case it's safe to process nodes with a higher incoming count + while not leaf_keys: + count += 1 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + return leaf_keys + + @classmethod + def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], level: int) -> Iterable[tuple[str, str]]: for node_key, incoming_count in incoming_counts.items(): - if incoming_count > 0: + if incoming_count > level: continue yield node_key diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index fde276ccd2..102da61d5a 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -79,3 +79,37 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso assert step3 assert step3.step_number < step2.step_number + +class _DependencyGraph(DependencyGraph): + + def add_dependency(self, graph: DependencyGraph): + self._dependencies[graph.dependency] = graph + + +class _MigrationSequencer(MigrationSequencer): + + def visit_graph(self, graph: DependencyGraph): + graph.visit(self._visit_dependency, None) + + +def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, mock_path_lookup): + root = Dependency(FileLoader(), Path("root.py")) + root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_a = Dependency(FileLoader(), Path("a.py")) + child_graph_a = _DependencyGraph(child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_b = Dependency(FileLoader(), Path("b.py")) + child_graph_b = _DependencyGraph(child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + # root imports a and b + root_graph.add_dependency(child_graph_a) + root_graph.add_dependency(child_graph_b) + # a imports b + child_graph_a.add_dependency(child_graph_b) + # b imports a (using local import) + child_graph_b.add_dependency(child_graph_a) + sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) + sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id) + sequencer.visit_graph(root_graph) + steps = list(sequencer.generate_steps()) + assert len(steps) == 3 + assert steps[2].object_id == "root.py" + From 4e2aedcf019ab9c664031fef33342fa14f35c5aa Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:08:01 +0200 Subject: [PATCH 21/30] rename local --- src/databricks/labs/ucx/sequencing/sequencing.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index a87c2fa3d4..915bae1113 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -221,18 +221,18 @@ def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: @classmethod def _get_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int]) -> Iterable[tuple[str, str]]: - count = 0 - leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + max_count = 0 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) # if we're not finding nodes with 0 incoming counts, it's likely caused by cyclic dependencies # in which case it's safe to process nodes with a higher incoming count while not leaf_keys: - count += 1 - leaf_keys = list(cls._yield_leaf_keys(incoming_counts, count)) + max_count += 1 + leaf_keys = list(cls._yield_leaf_keys(incoming_counts, max_count)) return leaf_keys @classmethod - def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], level: int) -> Iterable[tuple[str, str]]: + def _yield_leaf_keys(cls, incoming_counts: dict[tuple[str, str], int], max_count: int) -> Iterable[tuple[str, str]]: for node_key, incoming_count in incoming_counts.items(): - if incoming_count > level: + if incoming_count > max_count: continue yield node_key From 206cb36dfa31641bb5902a264acc04d023d8cebe Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:10:36 +0200 Subject: [PATCH 22/30] formatting --- src/databricks/labs/ucx/sequencing/sequencing.py | 4 ++-- tests/unit/sequencing/test_sequencing.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 915bae1113..897a1c137a 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -181,7 +181,7 @@ def register_cluster(self, cluster_id: str) -> MigrationNode: return cluster_node def generate_steps(self) -> Iterable[MigrationStep]: - """ The below algo is adapted from Kahn's topological sort. + """The below algo is adapted from Kahn's topological sort. The main differences are as follows: 1) we want the same step number for all nodes with same dependency depth so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed @@ -190,7 +190,7 @@ def generate_steps(self) -> Iterable[MigrationStep]: so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order to avoid an infinite loop. We also avoid side effects (such as negative counts). This algo works correctly for simple cases, but is not tested on large trees. - """ + """ incoming_counts = self._populate_incoming_counts() step_number = 1 sorted_steps: list[MigrationStep] = [] diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index 102da61d5a..ed92378808 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -96,9 +96,13 @@ def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, root = Dependency(FileLoader(), Path("root.py")) root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) child_a = Dependency(FileLoader(), Path("a.py")) - child_graph_a = _DependencyGraph(child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_graph_a = _DependencyGraph( + child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) child_b = Dependency(FileLoader(), Path("b.py")) - child_graph_b = _DependencyGraph(child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) + child_graph_b = _DependencyGraph( + child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState() + ) # root imports a and b root_graph.add_dependency(child_graph_a) root_graph.add_dependency(child_graph_b) @@ -112,4 +116,3 @@ def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, steps = list(sequencer.generate_steps()) assert len(steps) == 3 assert steps[2].object_id == "root.py" - From 5eacacc0ab11ebfe8ea37cc072ba7d344ff7693a Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 15:39:38 +0200 Subject: [PATCH 23/30] formatting --- .../labs/ucx/sequencing/sequencing.py | 17 ++++++----- tests/unit/sequencing/test_sequencing.py | 30 +++++++++---------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 897a1c137a..025c81bc5c 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -199,16 +199,19 @@ def generate_steps(self) -> Iterable[MigrationStep]: for leaf_key in leaf_keys: del incoming_counts[leaf_key] sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) - for dependency_key in self._outgoing[leaf_key]: - # prevent re-instantiation of already deleted keys - if dependency_key not in incoming_counts: - continue - # prevent negative count with cyclic dependencies - if incoming_counts[dependency_key] > 0: - incoming_counts[dependency_key] -= 1 + self._on_leaf_key_processed(leaf_key, incoming_counts) step_number += 1 return sorted_steps + def _on_leaf_key_processed(self, leaf_key: tuple[str, str], incoming_counts: dict[tuple[str, str], int]): + for dependency_key in self._outgoing[leaf_key]: + # prevent re-instantiation of already deleted keys + if dependency_key not in incoming_counts: + continue + # prevent negative count with cyclic dependencies + if incoming_counts[dependency_key] > 0: + incoming_counts[dependency_key] -= 1 + def _required_step_ids(self, node_key: tuple[str, str]) -> Iterable[int]: for leaf_key in self._incoming[node_key]: yield self._nodes[leaf_key].node_id diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index ed92378808..d1c6f776a7 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -55,7 +55,7 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso job = jobs.Job(job_id=1234, settings=settings) ws.jobs.get.return_value = job ws_cache = create_autospec(WorkspaceCache) - ws_cache.get_workspace_path.side_effect = lambda path: Path(path) + ws_cache.get_workspace_path.side_effect = Path dependency = WorkflowTask(ws, task, job, ws_cache) container = dependency.load(mock_path_lookup) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) @@ -63,21 +63,19 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso assert not problems sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) sequencer.register_workflow_task(task, job, graph) - steps = list(sequencer.generate_steps()) - step0 = next((step for step in steps if step.object_type == "TASK"), None) - assert step0 - step1 = next((step for step in steps if step.object_name == notebook_path.as_posix()), None) - assert step1 - assert step1.step_number < step0.step_number - step2 = next( - (step for step in steps if step.object_name == "parent_that_magic_runs_child_that_uses_value_from_parent.py"), - None, - ) - assert step2 - assert step2.step_number < step1.step_number - step3 = next((step for step in steps if step.object_name == "_child_that_uses_value_from_parent.py"), None) - assert step3 - assert step3.step_number < step2.step_number + all_steps = list(sequencer.generate_steps()) + # ensure steps have a consistent step_number: TASK > grand-parent > parent > child + parent_name = "parent_that_magic_runs_child_that_uses_value_from_parent.py" + steps = [ + next((step for step in all_steps if step.object_name == "_child_that_uses_value_from_parent.py"), None), + next((step for step in all_steps if step.object_name == parent_name), None), + next((step for step in all_steps if step.object_name == notebook_path.as_posix()), None), + next((step for step in all_steps if step.object_type == "TASK"), None), + ] + # ensure steps have a consistent step_number + for i in range(0, len(steps) - 1): + assert steps[i] + assert steps[i].step_number < steps[i + 1].step_number class _DependencyGraph(DependencyGraph): From 0ad44d5aa678e316f011003b435fc3b7ec54b2e4 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 18 Oct 2024 18:24:20 +0200 Subject: [PATCH 24/30] create step for used table --- .../labs/ucx/hive_metastore/tables.py | 11 ++++ .../labs/ucx/sequencing/sequencing.py | 32 ++++++++++- src/databricks/labs/ucx/source_code/base.py | 4 ++ .../labs/ucx/source_code/used_table.py | 7 +++ tests/unit/sequencing/test_sequencing.py | 54 ++++++++++++++++--- 5 files changed, 100 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index de8457503a..0c8dff509a 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -15,6 +15,7 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk.errors import NotFound +from databricks.labs.ucx.source_code.base import UsedTable from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier @@ -85,6 +86,16 @@ def __post_init__(self) -> None: if isinstance(self.table_format, str): # Should not happen according to type hint, still safer self.table_format = self.table_format.upper() + @staticmethod + def from_used_table(used_table: UsedTable): + return Table( + catalog=used_table.catalog_name, + database=used_table.table_name, + name=used_table.table_name, + object_type="UNKNOWN", + table_format="UNKNOWN", + ) + @property def is_delta(self) -> bool: if self.table_format is None: diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/sequencing/sequencing.py index 025c81bc5c..22f6bf1ba7 100644 --- a/src/databricks/labs/ucx/sequencing/sequencing.py +++ b/src/databricks/labs/ucx/sequencing/sequencing.py @@ -11,8 +11,10 @@ from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership +from databricks.labs.ucx.hive_metastore.tables import TableOwnership, Table from databricks.labs.ucx.source_code.graph import DependencyGraph from databricks.labs.ucx.source_code.path_lookup import PathLookup +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler @dataclass @@ -56,10 +58,17 @@ def as_step(self, step_number: int, required_step_ids: list[int]) -> MigrationSt class MigrationSequencer: - def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, admin_locator: AdministratorLocator): + def __init__( + self, + ws: WorkspaceClient, + path_lookup: PathLookup, + admin_locator: AdministratorLocator, + used_tables_crawler: UsedTablesCrawler, + ): self._ws = ws self._path_lookup = path_lookup self._admin_locator = admin_locator + self._used_tables_crawler = used_tables_crawler self._last_node_id = 0 self._nodes: dict[tuple[str, str], MigrationNode] = {} self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) @@ -102,10 +111,11 @@ def _visit_dependency(self, graph: DependencyGraph) -> bool | None: # TODO tables and dfsas return False - def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode: + def register_dependency(self, parent_node: MigrationNode | None, object_type: str, object_id: str) -> MigrationNode: dependency_node = self._nodes.get((object_type, object_id), None) if not dependency_node: dependency_node = self._create_dependency_node(object_type, object_id) + list(self._register_used_tables_for(dependency_node)) if parent_node: self._incoming[parent_node.key].add(dependency_node.key) self._outgoing[dependency_node.key].add(parent_node.key) @@ -135,6 +145,24 @@ def _create_dependency_node(self, object_type: str, object_id: str) -> Migration self._nodes[dependency_node.key] = dependency_node return dependency_node + def _register_used_tables_for(self, parent_node: MigrationNode) -> Iterable[MigrationNode]: + if parent_node.object_type not in {"NOTEBOOK", "FILE"}: + return + used_tables = self._used_tables_crawler.for_lineage(parent_node.object_type, parent_node.object_id) + for used_table in used_tables: + self._last_node_id += 1 + table_node = MigrationNode( + node_id=self._last_node_id, + object_type="TABLE", + object_id=used_table.fullname, + object_name=used_table.fullname, + object_owner=TableOwnership(self._admin_locator).owner_of(Table.from_used_table(used_table)), + ) + self._nodes[table_node.key] = table_node + self._incoming[parent_node.key].add(table_node.key) + self._outgoing[table_node.key].add(parent_node.key) + yield table_node + def register_workflow_job(self, job: jobs.Job) -> MigrationNode: job_node = self._nodes.get(("WORKFLOW", str(job.job_id)), None) if job_node: diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 85f5b598f6..5be09789d9 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -249,6 +249,10 @@ def parse(cls, value: str, default_schema: str, is_read=True, is_write=False) -> is_read: bool = True is_write: bool = False + @property + def fullname(self) -> str: + return f"{self.catalog_name}.{self.schema_name}.{self.table_name}" + class TableCollector(ABC): diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index 5b45a96864..23647f55fb 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -52,3 +52,10 @@ def _try_fetch(self) -> Iterable[UsedTable]: def _crawl(self) -> Iterable[UsedTable]: return [] # TODO raise NotImplementedError() once CrawlerBase supports empty snapshots + + def for_lineage(self, object_type: str, object_id: str): + sql = f"SELECT * FROM ( \ + SELECT *, explode(source_lineage) as lineage FROM {escape_sql_identifier(self.full_name)} \ + ) where lineage.object_type = '{object_type}' and lineage.object_id = '{object_id}'" + for row in self._backend.fetch(sql): + yield self._klass.from_dict(row.as_dict()) diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/sequencing/test_sequencing.py index d1c6f776a7..cf9acbd957 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/sequencing/test_sequencing.py @@ -1,3 +1,4 @@ +from datetime import datetime from unittest.mock import create_autospec from pathlib import Path @@ -10,10 +11,11 @@ from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer -from databricks.labs.ucx.source_code.base import CurrentSessionState +from databricks.labs.ucx.source_code.base import CurrentSessionState, UsedTable, LineageAtom from databricks.labs.ucx.source_code.graph import DependencyGraph, Dependency from databricks.labs.ucx.source_code.jobs import WorkflowTask from databricks.labs.ucx.source_code.linters.files import FileLoader +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler def admin_locator(ws, user_name: str): @@ -31,7 +33,9 @@ def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_r ws.jobs.get.return_value = job dependency = WorkflowTask(ws, task, job) graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) - sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.assert_not_called() + sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) step = steps[-1] @@ -49,8 +53,11 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso mock_path_lookup.append_path(functional) mock_path_lookup = mock_path_lookup.change_directory(functional) notebook_path = Path("grand_parent_that_imports_parent_that_magic_runs_child.py") - notebook_task = NotebookTask(notebook_path=notebook_path.as_posix()) - task = jobs.Task(task_key="test-task", existing_cluster_id="cluster-123", notebook_task=notebook_task) + task = jobs.Task( + task_key="test-task", + existing_cluster_id="cluster-123", + notebook_task=NotebookTask(notebook_path=notebook_path.as_posix()), + ) settings = jobs.JobSettings(name="test-job", tasks=[task]) job = jobs.Job(job_id=1234, settings=settings) ws.jobs.get.return_value = job @@ -61,7 +68,9 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()) problems = container.build_dependency_graph(graph) assert not problems - sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.assert_not_called() + sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler) sequencer.register_workflow_task(task, job, graph) all_steps = list(sequencer.generate_steps()) # ensure steps have a consistent step_number: TASK > grand-parent > parent > child @@ -108,9 +117,42 @@ def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, child_graph_a.add_dependency(child_graph_b) # b imports a (using local import) child_graph_b.add_dependency(child_graph_a) - sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe")) + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.assert_not_called() + sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler) sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id) sequencer.visit_graph(root_graph) steps = list(sequencer.generate_steps()) assert len(steps) == 3 assert steps[2].object_id == "root.py" + + +def test_sequencer_builds_steps_from_used_tables(ws, simple_dependency_resolver, mock_path_lookup): + used_tables_crawler = create_autospec(UsedTablesCrawler) + used_tables_crawler.for_lineage.side_effect = lambda object_type, object_id: ( + [] + if object_id != "/some-folder/some-notebook" + else [ + UsedTable( + source_id="/some-folder/some-notebook", + source_timestamp=datetime.now(), + source_lineage=[LineageAtom(object_type="NOTEBOOK", object_id="/some-folder/some-notebook")], + catalog_name="my-catalog", + schema_name="my-schema", + table_name="my-table", + is_read=False, + is_write=False, + ) + ] + ) + sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler) + sequencer.register_dependency(None, object_type="FILE", object_id="/some-folder/some-file") + all_steps = list(sequencer.generate_steps()) + assert len(all_steps) == 1 + sequencer.register_dependency(None, object_type="NOTEBOOK", object_id="/some-folder/some-notebook") + all_steps = list(sequencer.generate_steps()) + assert len(all_steps) == 3 + step = next((step for step in all_steps if step.object_type == "TABLE"), None) + assert step + assert step.step_number == 1 + assert step.object_id == "my-catalog.my-schema.my-table" From 1e7e0e88d3dc2a72fd1ff1c1ee004ff033ef79b1 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Mon, 21 Oct 2024 14:01:20 +0200 Subject: [PATCH 25/30] move package --- .../labs/ucx/{sequencing => assessment}/sequencing.py | 0 src/databricks/labs/ucx/sequencing/__init__.py | 0 tests/unit/{sequencing => assessment}/test_sequencing.py | 3 ++- tests/unit/sequencing/__init__.py | 0 4 files changed, 2 insertions(+), 1 deletion(-) rename src/databricks/labs/ucx/{sequencing => assessment}/sequencing.py (100%) delete mode 100644 src/databricks/labs/ucx/sequencing/__init__.py rename tests/unit/{sequencing => assessment}/test_sequencing.py (94%) delete mode 100644 tests/unit/sequencing/__init__.py diff --git a/src/databricks/labs/ucx/sequencing/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py similarity index 100% rename from src/databricks/labs/ucx/sequencing/sequencing.py rename to src/databricks/labs/ucx/assessment/sequencing.py diff --git a/src/databricks/labs/ucx/sequencing/__init__.py b/src/databricks/labs/ucx/sequencing/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/unit/sequencing/test_sequencing.py b/tests/unit/assessment/test_sequencing.py similarity index 94% rename from tests/unit/sequencing/test_sequencing.py rename to tests/unit/assessment/test_sequencing.py index 21d2a612d0..9410706788 100644 --- a/tests/unit/sequencing/test_sequencing.py +++ b/tests/unit/assessment/test_sequencing.py @@ -1,10 +1,11 @@ +import dataclasses from unittest.mock import create_autospec from databricks.sdk.service import iam, jobs from databricks.sdk.service.compute import ClusterDetails +from databricks.labs.ucx.assessment.sequencing import MigrationSequencer, MigrationStep from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder -from databricks.labs.ucx.sequencing.sequencing import MigrationSequencer from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.graph import DependencyGraph from databricks.labs.ucx.source_code.jobs import WorkflowTask diff --git a/tests/unit/sequencing/__init__.py b/tests/unit/sequencing/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 From 9ac42957e8cc2e66b859e2f24078e0bcf1284e0c Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Mon, 21 Oct 2024 14:01:38 +0200 Subject: [PATCH 26/30] improve assert style --- tests/unit/assessment/test_sequencing.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tests/unit/assessment/test_sequencing.py b/tests/unit/assessment/test_sequencing.py index 9410706788..e4da351392 100644 --- a/tests/unit/assessment/test_sequencing.py +++ b/tests/unit/assessment/test_sequencing.py @@ -25,11 +25,14 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_pat sequencer = MigrationSequencer(ws, AdministratorLocator(ws, finders=[lambda _ws: admin_finder])) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) - step = steps[-1] - assert step.step_id - assert step.object_type == "CLUSTER" - assert step.object_id == "cluster-123" - assert step.object_name == "my-cluster" - assert step.object_owner == "John Doe" - assert step.step_number == 3 - assert len(step.required_step_ids) == 2 + step = dataclasses.replace(steps[-1], step_id=0) + # we don't know the exact ids of the required steps, se let's zero them + step.required_step_ids = [0 for id in step.required_step_ids] + assert step == MigrationStep( + step_id=0, + step_number = 3, + object_type = "CLUSTER", + object_id = "cluster-123", + object_name = "my-cluster", + object_owner = "John Doe", + required_step_ids = [0, 0]) From 17a33e336f069f7bcac8e1053f95acf8ea1d1180 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Mon, 21 Oct 2024 14:11:34 +0200 Subject: [PATCH 27/30] formatting --- tests/unit/assessment/test_sequencing.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/unit/assessment/test_sequencing.py b/tests/unit/assessment/test_sequencing.py index e4da351392..3d2ff3a3e8 100644 --- a/tests/unit/assessment/test_sequencing.py +++ b/tests/unit/assessment/test_sequencing.py @@ -25,14 +25,15 @@ def test_cluster_from_task_has_children(ws, simple_dependency_resolver, mock_pat sequencer = MigrationSequencer(ws, AdministratorLocator(ws, finders=[lambda _ws: admin_finder])) sequencer.register_workflow_task(task, job, graph) steps = list(sequencer.generate_steps()) - step = dataclasses.replace(steps[-1], step_id=0) - # we don't know the exact ids of the required steps, se let's zero them - step.required_step_ids = [0 for id in step.required_step_ids] + step = steps[-1] + # we don't know the ids of the steps, se let's zero them + step = dataclasses.replace(step, step_id=0, required_step_ids=[0] * len(step.required_step_ids)) assert step == MigrationStep( step_id=0, - step_number = 3, - object_type = "CLUSTER", - object_id = "cluster-123", - object_name = "my-cluster", - object_owner = "John Doe", - required_step_ids = [0, 0]) + step_number=3, + object_type="CLUSTER", + object_id="cluster-123", + object_name="my-cluster", + object_owner="John Doe", + required_step_ids=[0, 0], + ) From da0330d1115afd4164b3a1a04be36b9bfe0d33c6 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Mon, 21 Oct 2024 16:24:49 +0200 Subject: [PATCH 28/30] make 'incoming' transient and improve comments --- .../labs/ucx/assessment/sequencing.py | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py index a873ee1b7e..59fbfe2832 100644 --- a/src/databricks/labs/ucx/assessment/sequencing.py +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -55,7 +55,6 @@ def __init__(self, ws: WorkspaceClient, admin_locator: AdministratorLocator): self._admin_locator = admin_locator self._last_node_id = 0 self._nodes: dict[tuple[str, str], MigrationNode] = {} - self._incoming: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) self._outgoing: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: DependencyGraph) -> MigrationNode: @@ -73,15 +72,12 @@ def register_workflow_task(self, task: jobs.Task, job: jobs.Job, _graph: Depende object_owner=job_node.object_owner, # no task owner so use job one ) self._nodes[task_node.key] = task_node - self._incoming[job_node.key].add(task_node.key) self._outgoing[task_node.key].add(job_node.key) if task.existing_cluster_id: cluster_node = self.register_cluster(task.existing_cluster_id) if cluster_node: - self._incoming[cluster_node.key].add(task_node.key) self._outgoing[task_node.key].add(cluster_node.key) # also make the cluster dependent on the job - self._incoming[cluster_node.key].add(job_node.key) self._outgoing[job_node.key].add(cluster_node.key) # TODO register dependency graph return task_node @@ -104,7 +100,6 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode: for job_cluster in job.settings.job_clusters: cluster_node = self.register_job_cluster(job_cluster) if cluster_node: - self._incoming[cluster_node.key].add(job_node.key) self._outgoing[job_node.key].add(cluster_node.key) return job_node @@ -132,31 +127,47 @@ def register_cluster(self, cluster_id: str) -> MigrationNode: return cluster_node def generate_steps(self) -> Iterable[MigrationStep]: - # algo adapted from Kahn topological sort. The main differences is that - # we want the same step number for all nodes with same dependency depth - # so instead of pushing to a queue, we rebuild it once all leaf nodes are processed - # (these are transient leaf nodes i.e. they only become leaf during processing) - incoming_counts = self._populate_incoming_counts() + """algo adapted from Kahn topological sort. The differences are as follows: + - we want the same step number for all nodes with same dependency depth + so instead of pushing to a queue, we rebuild it once all leaf nodes are processed + (these are transient leaf nodes i.e. they only become leaf during processing) + - the inputs do not form a DAG so we need specialized handling of edge cases + (implemented in PR #3009) + """ + # pre-compute incoming keys for best performance of self._required_step_ids + incoming_keys = self._collect_incoming_keys() + incoming_counts = self.compute_incoming_counts(incoming_keys) step_number = 1 sorted_steps: list[MigrationStep] = [] while len(incoming_counts) > 0: leaf_keys = list(self._get_leaf_keys(incoming_counts)) for leaf_key in leaf_keys: del incoming_counts[leaf_key] - sorted_steps.append(self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(leaf_key)))) + sorted_steps.append( + self._nodes[leaf_key].as_step(step_number, list(self._required_step_ids(incoming_keys[leaf_key]))) + ) for dependency_key in self._outgoing[leaf_key]: incoming_counts[dependency_key] -= 1 step_number += 1 return sorted_steps - def _required_step_ids(self, node_key: tuple[str, str]) -> Iterable[int]: - for leaf_key in self._incoming[node_key]: - yield self._nodes[leaf_key].node_id + def _collect_incoming_keys(self) -> dict[tuple[str, str], set[tuple[str, str]]]: + result: dict[tuple[str, str], set[tuple[str, str]]] = defaultdict(set) + for source, outgoing in self._outgoing.items(): + for target in outgoing: + result[target].add(source) + return result + + def _required_step_ids(self, required_step_keys: set[tuple[str, str]]) -> Iterable[int]: + for source_key in required_step_keys: + yield self._nodes[source_key].node_id - def _populate_incoming_counts(self) -> dict[tuple[str, str], int]: + def compute_incoming_counts( + self, incoming: dict[tuple[str, str], set[tuple[str, str]]] + ) -> dict[tuple[str, str], int]: result = defaultdict(int) for node_key in self._nodes: - result[node_key] = len(self._incoming[node_key]) + result[node_key] = len(incoming[node_key]) return result @staticmethod From 1502ad8b5e64074f6ae2936079307e9c62abd7ca Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Wed, 23 Oct 2024 17:39:46 +0200 Subject: [PATCH 29/30] use WorkspacePathOwnership --- src/databricks/labs/ucx/assessment/sequencing.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py index 6ba5f966dd..d6fc8774e7 100644 --- a/src/databricks/labs/ucx/assessment/sequencing.py +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -8,9 +8,11 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs +from databricks.labs.blueprint.paths import WorkspacePath + from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo -from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership from databricks.labs.ucx.source_code.graph import DependencyGraph from databricks.labs.ucx.source_code.path_lookup import PathLookup @@ -116,7 +118,8 @@ def _create_dependency_node(self, object_type: str, object_id: str) -> Migration continue object_name = path.relative_to(library_root).as_posix() break - object_owner = WorkspaceObjectOwnership(self._admin_locator).owner_of((object_type, object_id)) + ws_path = WorkspacePath(self._ws, object_id) + object_owner = WorkspacePathOwnership(self._admin_locator, self._ws).owner_of(ws_path) else: raise ValueError(f"{object_type} not supported yet!") self._last_node_id += 1 From 56ef893e47f49b5a1579af06d98b5e4588797c39 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Fri, 25 Oct 2024 10:53:36 +0200 Subject: [PATCH 30/30] fix merge issues --- src/databricks/labs/ucx/assessment/sequencing.py | 3 +-- src/databricks/labs/ucx/source_code/used_table.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/sequencing.py b/src/databricks/labs/ucx/assessment/sequencing.py index 5dae445105..137752d84a 100644 --- a/src/databricks/labs/ucx/assessment/sequencing.py +++ b/src/databricks/labs/ucx/assessment/sequencing.py @@ -13,7 +13,6 @@ from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership -from databricks.labs.ucx.hive_metastore.tables import TableOwnership, Table from databricks.labs.ucx.source_code.graph import DependencyGraph from databricks.labs.ucx.source_code.path_lookup import PathLookup from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler @@ -154,7 +153,7 @@ def _register_used_tables_for(self, parent_node: MigrationNode) -> Iterable[Migr object_type="TABLE", object_id=used_table.fullname, object_name=used_table.fullname, - object_owner=TableOwnership(self._admin_locator).owner_of(Table.from_used_table(used_table)), + object_owner="", # TODO ) self._nodes[table_node.key] = table_node self._outgoing[table_node.key].add(parent_node.key) diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index 6b3aa702ff..5fd38f9fde 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -57,5 +57,5 @@ def for_lineage(self, object_type: str, object_id: str): sql = f"SELECT * FROM ( \ SELECT *, explode(source_lineage) as lineage FROM {escape_sql_identifier(self.full_name)} \ ) where lineage.object_type = '{object_type}' and lineage.object_id = '{object_id}'" - for row in self._backend.fetch(sql): + for row in self._sql_backend.fetch(sql): yield self._klass.from_dict(row.as_dict())