diff --git a/client/verta/docs/python.rst b/client/verta/docs/python.rst index d5e2383e6a..5f4a02d3f9 100644 --- a/client/verta/docs/python.rst +++ b/client/verta/docs/python.rst @@ -29,6 +29,7 @@ Verta endpoint environment integrations + pipeline registry runtime tracking diff --git a/client/verta/tests/unit_tests/conftest.py b/client/verta/tests/unit_tests/conftest.py index c7806ee4ac..7a95d8ea6d 100644 --- a/client/verta/tests/unit_tests/conftest.py +++ b/client/verta/tests/unit_tests/conftest.py @@ -2,7 +2,10 @@ """Pytest fixtures for use in client unit tests.""" +import json import os +import random +from typing import Any, Callable, Dict, Optional from unittest.mock import patch import pytest @@ -13,7 +16,8 @@ from verta.client import Client from verta.credentials import EmailCredentials from verta.endpoint import Endpoint -from verta.registry.entities import RegisteredModelVersion +from verta.pipeline import PipelineGraph, PipelineStep +from verta.registry.entities import RegisteredModel, RegisteredModelVersion @pytest.fixture(scope="session") @@ -64,15 +68,159 @@ def __repr__(self): # avoid network calls when displaying test results @pytest.fixture(scope="session") -def mock_registered_model_version(mock_conn, mock_config): - """Return a mocked object of the RegisteredModelVersion class for use in tests""" +def make_mock_simple_pipeline_definition() -> Callable: + """Return a callable function for creating a simple mocked pipeline + definition. + + For use in tests, including a parameter for the pipeline + id to ensure consistency in tests that mock creation of a pipeline + object from a pipeline definition. + """ + + def simple_pipeline_definition(id: int) -> Dict[str, Any]: + return { + "graph": [ + {"predecessors": [], "name": "step1"}, + {"predecessors": ["step1"], "name": "step2"}, + ], + "pipeline_version_id": id, + "steps": [ + { + "model_version_id": 1, + "name": "step1", + }, + { + "model_version_id": 2, + "name": "step2", + }, + ], + } + + return simple_pipeline_definition + + +@pytest.fixture(scope="session") +def make_mock_registered_model(mock_conn, mock_config) -> Callable: + """Return a callable function for creating mocked objects of the + RegisteredModel class. + """ + + class MockRegisteredModel(RegisteredModel): + def __repr__(self): # avoid network calls when displaying test results + return object.__repr__(self) + + def _make_mock_registered_model(id: int, name: str): + """Return a mocked RegisteredModel object.""" + + return MockRegisteredModel( + mock_conn, + mock_config, + _RegistryService.RegisteredModel( + id=id, + name=name, + ), + ) + + return _make_mock_registered_model + + +@pytest.fixture(scope="session") +def make_mock_registered_model_version( + mock_conn, mock_config, make_mock_simple_pipeline_definition +) -> Callable: + """Return a callable function for creating mocked objects of the + RegisteredModelVersion class. + """ + unique_ids = set() class MockRegisteredModelVersion(RegisteredModelVersion): def __repr__(self): # avoid network calls when displaying test results return object.__repr__(self) - return MockRegisteredModelVersion( - mock_conn, - mock_config, - _RegistryService.ModelVersion(id=555, registered_model_id=123), - ) + def _get_artifact(self, key=None, artifact_type=None): + if key == "pipeline.json": + return json.dumps( + make_mock_simple_pipeline_definition(id=self.id) + ).encode("utf-8") + + def _make_mock_registered_model_version(): + """Return a mocked ``RegisteredModelVersion``. + + ``id`` and ``registered_model_id`` will be random and unique for the + test session. + + """ + model_ver_id = random.randint(1, 1000000) + while model_ver_id in unique_ids: + model_ver_id = random.randint(1, 1000000) + unique_ids.add(model_ver_id) + + reg_model_id = random.randint(1, 1000000) + while reg_model_id in unique_ids: + reg_model_id = random.randint(1, 1000000) + unique_ids.add(reg_model_id) + + return MockRegisteredModelVersion( + mock_conn, + mock_config, + _RegistryService.ModelVersion( + id=model_ver_id, + registered_model_id=reg_model_id, + version="test_model_version_name", + ), + ) + + return _make_mock_registered_model_version + + +@pytest.fixture(scope="session") +def make_mock_pipeline_step(make_mock_registered_model_version) -> Callable: + """Return a callable function for creating mocked objects of the PipelineStep + class. + + The optional `name` parameter is for use in tests where names must be + known for assertions. + """ + unique_names = set() + + class MockPipelineStep(PipelineStep): + def __repr__(self): # avoid network calls when displaying test results + return object.__repr__(self) + + def _make_unique_name(): + name = f"step{random.randint(1, 1000000)}" + while name in unique_names: + name = f"step{random.randint(1, 1000000)}" + unique_names.add(name) + return name + + def _make_mock_pipeline_step(name: Optional[str] = None): + return MockPipelineStep( + registered_model_version=make_mock_registered_model_version(), + name=name if name else _make_unique_name(), + predecessors=set(), + ) + + return _make_mock_pipeline_step + + +@pytest.fixture(scope="session") +def make_mock_pipeline_graph(make_mock_pipeline_step) -> Callable: + """Return a callable function for creating mocked objects of the PipelineGraph + class. + """ + + class MockPipelineGraph(PipelineGraph): + def __repr__(self): # avoid network calls when displaying test results + return object.__repr__(self) + + def _make_mock_pipeline_graph(): + step1 = make_mock_pipeline_step() + step1.set_name("step1") + step2 = make_mock_pipeline_step() + step2.set_name("step2") + step3 = make_mock_pipeline_step() + step3.set_name("step3") + return MockPipelineGraph(steps={step1, step2, step3}) + + return _make_mock_pipeline_graph diff --git a/client/verta/tests/unit_tests/deployment/test_build.py b/client/verta/tests/unit_tests/deployment/test_build.py index f6d3eea30b..5fadbaaad1 100644 --- a/client/verta/tests/unit_tests/deployment/test_build.py +++ b/client/verta/tests/unit_tests/deployment/test_build.py @@ -60,14 +60,15 @@ def test_endpoint_get_current_build( @settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) @given(build_dicts=st.lists(build_dict(), unique_by=lambda d: d["id"])) def test_model_version_list_builds( - mock_registered_model_version, + make_mock_registered_model_version, mock_conn, mocked_responses, build_dicts, ): """Verify we can construct Build objects from list_builds().""" + rmv = make_mock_registered_model_version() registry_url = f"{mock_conn.scheme}://{mock_conn.socket}/api/v1/registry" - model_version_url = f"{registry_url}/registered_models/{mock_registered_model_version.registered_model_id}" + model_version_url = f"{registry_url}/registered_models/{rmv.registered_model_id}" deployment_url = f"{mock_conn.scheme}://{mock_conn.socket}/api/v1/deployment" list_builds_url = f"{deployment_url}/builds" @@ -77,7 +78,7 @@ def test_model_version_list_builds( status=200, match=[ query_param_matcher( - {"model_version_id": mock_registered_model_version.id}, + {"model_version_id": rmv.id}, ), ], json={"builds": build_dicts}, @@ -88,7 +89,7 @@ def test_model_version_list_builds( json={"workspace_id": "123"}, ) - builds = mock_registered_model_version.list_builds() + builds = rmv.list_builds() # verify builds are ordered by creation date assert [b.id for b in builds] == [ diff --git a/client/verta/tests/unit_tests/deployment/test_endpoint.py b/client/verta/tests/unit_tests/deployment/test_endpoint.py index ccca025f15..f2a5ea950f 100644 --- a/client/verta/tests/unit_tests/deployment/test_endpoint.py +++ b/client/verta/tests/unit_tests/deployment/test_endpoint.py @@ -145,7 +145,7 @@ def test_kafka_cluster_config_id_default( mock_endpoint, mock_conn, mocked_responses, - mock_registered_model_version, + make_mock_registered_model_version, ) -> None: """Verify that, while updating an endpoint, not including a `cluster_config_id` in the KafkaSettings results in the correct sequence of HTTP requests, including @@ -181,7 +181,7 @@ def test_kafka_cluster_config_id_default( ) mock_endpoint.update( - mock_registered_model_version, kafka_settings=kafka_settings + make_mock_registered_model_version(), kafka_settings=kafka_settings ) _responses.assert_call_count(get_configs_url, 1) @@ -193,7 +193,7 @@ def test_kafka_cluster_config_id_value( mock_endpoint, mock_conn, mocked_responses, - mock_registered_model_version, + make_mock_registered_model_version, ) -> None: """Verify that, while updating an endpoint, the provided value for `cluster_config_id` is used, resulting in the correct sequence of HTTP @@ -226,7 +226,7 @@ def test_kafka_cluster_config_id_value( url=stages_url + f"/{STAGE_ID}", status=200, json={"id": STAGE_ID} ) mock_endpoint.update( - mock_registered_model_version, kafka_settings=kafka_settings + make_mock_registered_model_version(), kafka_settings=kafka_settings ) @settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) @@ -241,7 +241,7 @@ def test_kafka_config_missing_config_id_exception( mock_endpoint, mock_conn, mocked_responses, - mock_registered_model_version, + make_mock_registered_model_version, ) -> None: """In the unlikely evert the ID of a found Kafka config is missing from the backend response, the expected exception is raised. @@ -259,7 +259,7 @@ def test_kafka_config_missing_config_id_exception( _responses.get(url=get_configs_url, status=200, json=kafka_configs_response) with pytest.raises(RuntimeError) as err: mock_endpoint.update( - mock_registered_model_version, kafka_settings=kafka_settings + make_mock_registered_model_version(), kafka_settings=kafka_settings ) assert ( str(err.value) @@ -275,7 +275,7 @@ def test_no_kafka_configs_found_exception( mock_endpoint, mock_conn, mocked_responses, - mock_registered_model_version, + make_mock_registered_model_version, ) -> None: """If no valid Kafka configurations are found, the expected exception is raised.""" deployment_url = f"{mock_conn.scheme}://{mock_conn.socket}/api/v1/deployment" @@ -290,7 +290,7 @@ def test_no_kafka_configs_found_exception( _responses.get(url=get_configs_url, status=200, json={"configurations": []}) with pytest.raises(RuntimeError) as err: mock_endpoint.update( - mock_registered_model_version, kafka_settings=kafka_settings + make_mock_registered_model_version(), kafka_settings=kafka_settings ) assert ( str(err.value) diff --git a/client/verta/tests/unit_tests/pipeline/test_pipeline_graph.py b/client/verta/tests/unit_tests/pipeline/test_pipeline_graph.py new file mode 100644 index 0000000000..82e9eb24f8 --- /dev/null +++ b/client/verta/tests/unit_tests/pipeline/test_pipeline_graph.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for the PipelineGraph class +""" + +from unittest.mock import patch + +import pytest +from hypothesis import given, HealthCheck, settings, strategies as st + +import verta +from tests.unit_tests.strategies import pipeline_definition +from verta.pipeline import PipelineGraph, PipelineStep + + +def test_set_steps(make_mock_pipeline_step, make_mock_registered_model) -> None: + """Test that the steps of a PipelineGraph can be set.""" + mocked_rm = make_mock_registered_model(id=123, name="test_rmv") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + step_1 = make_mock_pipeline_step() + step_2 = make_mock_pipeline_step() + graph = PipelineGraph(steps=set()) + graph.set_steps({step_1, step_2}) + assert set(graph.steps) == {step_1, step_2} + graph.set_steps(set()) + assert not graph.steps + + +@given( + pipeline_definition=pipeline_definition(), + # max value limit avoids protobuf "Value out of range" error + registered_model_id=st.integers(min_value=1, max_value=2**63), + model_version_name=st.text(min_size=1), + model_name=st.text(min_size=1), +) +@settings( + suppress_health_check=[HealthCheck.function_scoped_fixture], + deadline=None, +) +def test_from_definition( + pipeline_definition, + registered_model_id, + model_version_name, + model_name, + mock_conn, + mock_config, + mocked_responses, +) -> None: + """Test that a PipelineGraph object can be constructed from a pipeline + definition. + + The model version is fetched for each step, so a response + is mocked for each. In depth testing of each step is handled in + test_pipeline_step.test_steps_from_pipeline_definition. + """ + for step in pipeline_definition["steps"]: + mocked_responses.get( + f"{mock_conn.scheme}://{mock_conn.socket}/api/v1/registry/model_versions/" + f"{step['model_version_id']}", + json={ + "model_version": { + "id": step["model_version_id"], + "registered_model_id": registered_model_id, + "version": model_version_name, + } + }, + status=200, + ) + mocked_responses.get( + f"{mock_conn.scheme}://{mock_conn.socket}/api/v1/registry/registered_models/" + f"{registered_model_id}", + json={ + "registered_model": { + "id": registered_model_id, + "name": model_name, + } + }, + status=200, + ) + graph = PipelineGraph._from_definition( + pipeline_definition=pipeline_definition, conn=mock_conn, conf=mock_config + ) + # the object produced is a PipelineGraph + assert isinstance(graph, PipelineGraph) + # we have the same number of steps as in the pipeline definition + assert len(graph.steps) == len(pipeline_definition["steps"]) + # sort each group of steps for comparison + pipeline_steps_sorted = sorted( + pipeline_definition["steps"], key=lambda x: x["name"] + ) + graph_steps_sorted = sorted(graph.steps, key=lambda x: x.name) + + for graph_step, pipeline_step in zip(graph_steps_sorted, pipeline_steps_sorted): + assert graph_step.name == pipeline_step["name"] + assert ( + graph_step.registered_model_version.id == pipeline_step["model_version_id"] + ) + assert graph_step._registered_model.name == model_name + assert graph_step._registered_model.id == registered_model_id + + +def test_to_graph_definition( + make_mock_pipeline_step, make_mock_registered_model +) -> None: + """Test that a pipeline `graph` specification can be constructed from a + PipelineGraph object. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rmv") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + step_1 = make_mock_pipeline_step("step_1") + step_2 = make_mock_pipeline_step("step_2") + step_3 = make_mock_pipeline_step("step_3") + step_2.set_predecessors({step_1}) + step_3.set_predecessors({step_2}) + graph = PipelineGraph(steps={step_1, step_2, step_3}) + graph_spec = graph._to_graph_definition() + assert sorted(graph_spec, key=lambda x: x["name"]) == [ + { + "name": step_1.name, + "predecessors": [], + }, + { + "name": step_2.name, + "predecessors": [step_1.name], + }, + { + "name": step_3.name, + "predecessors": [step_2.name], + }, + ] + + +def test_to_steps_definition( + make_mock_pipeline_step, make_mock_registered_model +) -> None: + """Test that a pipeline `steps` specification can be constructed from a + PipelineGraph object. + + Definitions are type list to remain json serializable. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + step_1 = make_mock_pipeline_step(name="step_1") + step_2 = make_mock_pipeline_step(name="step_2") + graph = PipelineGraph(steps={step_1, step_2}) + step_specs = graph._to_steps_definition() + expected_definition = [ + { + "name": step_1.name, + "model_version_id": step_1.registered_model_version.id, + }, + { + "name": step_2.name, + "model_version_id": step_2.registered_model_version.id, + }, + ] + assert sorted(step_specs, key=lambda x: x["name"]) == sorted( + expected_definition, key=lambda x: x["name"] + ) + + +def test_bad_mutation_of_step_predecessors_exception( + make_mock_registered_model_version, + make_mock_registered_model, + make_mock_pipeline_step, +): + """Test that we throw the correct exception when a user tries to mutate + the predecessors of a step in an inappropriate way. + """ + mocked_rmv = make_mock_registered_model_version() + mocked_rm = make_mock_registered_model( + id=mocked_rmv.registered_model_id, name="test_rm" + ) + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + step = PipelineStep( + registered_model_version=mocked_rmv, + name="test_name", + predecessors=set(), + ) + step.predecessors.add("not_a_step") + with pytest.raises(TypeError) as err: + PipelineGraph(steps={step}) + assert ( + str(err.value) == f"individual predecessors of a PipelineStep must be type" + f" PipelineStep, not ." + ) + + +def test_step_name_uniqueness_exception( + make_mock_registered_model, make_mock_pipeline_step +): + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + step_1 = make_mock_pipeline_step(name="step_1") + step_2 = make_mock_pipeline_step(name="step_2") + step_3 = make_mock_pipeline_step(name="step_1") + + with pytest.raises(ValueError) as err: + PipelineGraph(steps={step_1, step_2, step_3}) + assert str(err.value) == "step names must be unique within a PipelineGraph" diff --git a/client/verta/tests/unit_tests/pipeline/test_pipeline_step.py b/client/verta/tests/unit_tests/pipeline/test_pipeline_step.py new file mode 100644 index 0000000000..60ea91ca8f --- /dev/null +++ b/client/verta/tests/unit_tests/pipeline/test_pipeline_step.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for the PipelineStep class +""" + +import random +from unittest.mock import patch + +from hypothesis import given, HealthCheck, settings, strategies as st + +import verta +from tests.unit_tests.strategies import pipeline_definition +from verta.pipeline import PipelineStep + + +@given( + pipeline_definition=pipeline_definition(), + # max value limit avoids protobuf "Value out of range" error + registered_model_id=st.integers(min_value=1, max_value=2**63), + model_version_name=st.text(min_size=1), + model_name=st.text(min_size=1), +) +@settings( + suppress_health_check=[HealthCheck.function_scoped_fixture], + deadline=None, +) +def test_steps_from_pipeline_definition( + pipeline_definition, + registered_model_id, + model_version_name, + model_name, + mock_conn, + mock_config, + mocked_responses, +) -> None: + """Test that a list of PipelineStep objects can be constructed and + returned from a pipeline definition. + + The registered model, and registered model version is fetched for + each step, so a call is mocked for each. + """ + graph = pipeline_definition["graph"] + for step in pipeline_definition["steps"]: + mocked_responses.get( + f"{mock_conn.scheme}://{mock_conn.socket}/api/v1/registry/model_versions/{step['model_version_id']}", + json={ + "model_version": { + "id": step["model_version_id"], + "registered_model_id": registered_model_id, + "version": model_version_name, + } + }, + status=200, + ) + mocked_responses.get( + f"{mock_conn.scheme}://{mock_conn.socket}/api/v1/registry/registered_models/{registered_model_id}", + json={ + "registered_model": { + "id": registered_model_id, + "name": model_name, + } + }, + status=200, + ) + generated_steps = PipelineStep._steps_from_pipeline_definition( + pipeline_definition=pipeline_definition, + conn=mock_conn, + conf=mock_config, + ) + # we have the same number of steps as in the pipeline definition + assert len(generated_steps) == len(pipeline_definition["steps"]) + # sort both group of steps for side-by-side comparison + generated_steps_sorted = sorted(generated_steps, key=lambda x: x.name) + definition_steps_sorted = sorted( + pipeline_definition["steps"], key=lambda x: x["name"] + ) + + for def_step, gen_step in zip(definition_steps_sorted, generated_steps_sorted): + # the names are the same for the steps and their definitions + assert gen_step.name == def_step["name"] + # model version ids are the same for the steps and their definitions + assert gen_step.registered_model_version.id == def_step["model_version_id"] + # the registered model id for each step was fetched and added from the mocked response. + assert gen_step._registered_model.id == registered_model_id + # registered model names are fetched and added from the mocked response. + assert gen_step._registered_model.name == model_name + # each step is converted to a PipelineStep object + assert isinstance(gen_step, PipelineStep) + # predecessors for each step are also converted to PipelineStep objects + for predecessor in gen_step.predecessors: + assert isinstance(predecessor, PipelineStep) + # the predecessors for each step are all included and have the same name as in the definition + assert [s.name for s in gen_step.predecessors] == [ + s["predecessors"] for s in graph if gen_step.name == s["name"] + ][0] + + +def test_to_step_spec( + make_mock_registered_model_version, make_mock_registered_model +) -> None: + """Test that a PipelineStep object can be converted to a step specification.""" + mocked_rmv = make_mock_registered_model_version() + mocked_rm = make_mock_registered_model( + id=mocked_rmv.registered_model_id, name="test_rm" + ) + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + step = PipelineStep( + registered_model_version=mocked_rmv, + name="test_name", + predecessors=set(), # predecessors not included in step spec + ) + assert step._to_step_spec() == { + "name": "test_name", + "model_version_id": mocked_rmv.id, + } + + +def test_to_graph_spec( + make_mock_registered_model_version, + make_mock_pipeline_step, + make_mock_registered_model, +) -> None: + """Test that a PipelineStep object can be converted to a graph specification.""" + mocked_rmv = make_mock_registered_model_version() + mocked_rm = make_mock_registered_model( + id=mocked_rmv.registered_model_id, name="test_rmv" + ) + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + predecessors = {make_mock_pipeline_step() for _ in range(random.randint(1, 5))} + step = PipelineStep( + registered_model_version=mocked_rmv, + name="test_name", + predecessors=predecessors, + ) + assert step._to_graph_spec() == { + "name": "test_name", + "predecessors": [s.name for s in predecessors], + } + + +def test_set_predecessors_add( + make_mock_registered_model_version, + make_mock_pipeline_step, + make_mock_registered_model, +) -> None: + """Test that predecessors can be added to a PipelineStep object.""" + mocked_rmv = make_mock_registered_model_version() + mocked_rm = make_mock_registered_model( + id=mocked_rmv.registered_model_id, name="test_rm" + ) + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + predecessor_1 = make_mock_pipeline_step() + predecessor_2 = make_mock_pipeline_step() + step = PipelineStep( + registered_model_version=mocked_rmv, + name="test_name", + predecessors={predecessor_1}, + ) + new_steps = step.predecessors.copy() + new_steps.add(predecessor_2) + step.set_predecessors(new_steps) + assert set(step.predecessors) == {predecessor_1, predecessor_2} + + +def test_set_predecessors_remove( + make_mock_registered_model_version, + make_mock_pipeline_step, + make_mock_registered_model, +) -> None: + """Test that predecessors can be removed from a PipelineStep object.""" + mocked_rmv = make_mock_registered_model_version() + mocked_rm = make_mock_registered_model( + id=mocked_rmv.registered_model_id, name="test_rmv" + ) + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + predecessors = {make_mock_pipeline_step() for _ in range(random.randint(2, 10))} + predecessors_as_list = list(predecessors) # convert to list for slicing + steps_to_remain = predecessors_as_list[: len(predecessors_as_list) // 2] + step = PipelineStep( + registered_model_version=mocked_rmv, + name="test_name", + predecessors=predecessors, + ) + step.set_predecessors(set(steps_to_remain)) + assert step.predecessors == set(steps_to_remain) + + +@given( + rm_1_name=st.text(min_size=1), + rm_2_name=st.text(min_size=1), +) +@settings( + suppress_health_check=[HealthCheck.function_scoped_fixture], + deadline=None, +) +def test_change_model_version( + rm_1_name, + rm_2_name, + make_mock_registered_model_version, + mocked_responses, +) -> None: + """Test that a PipelineStep object can have its model version changed. + + Each time a RMV is set for a PipelineStep, the RM for it is fetched, + so a call is mocked for the initial step creation and the change. + """ + rmv_1 = make_mock_registered_model_version() + rmv_2 = make_mock_registered_model_version() + mocked_responses.get( + f"{rmv_1._conn.scheme}://{rmv_1._conn.socket}/api/v1/registry/registered_models/{rmv_1.registered_model_id}", + json={ + "registered_model": { + "id": rmv_1.registered_model_id, + "name": rm_1_name, + } + }, + status=200, + ) + mocked_responses.get( + f"{rmv_2._conn.scheme}://{rmv_2._conn.socket}/api/v1/registry/registered_models/{rmv_2.registered_model_id}", + json={ + "registered_model": { + "id": rmv_2.registered_model_id, + "name": rm_2_name, + } + }, + status=200, + ) + step = PipelineStep( + registered_model_version=rmv_1, + name="test_name", + predecessors=set(), + ) + assert step.registered_model_version == rmv_1 + assert step._registered_model.id == rmv_1.registered_model_id + step.set_registered_model_version(rmv_2) + assert step.registered_model_version == rmv_2 + assert step._registered_model.id == rmv_2.registered_model_id diff --git a/client/verta/tests/unit_tests/pipeline/test_registered_pipeline.py b/client/verta/tests/unit_tests/pipeline/test_registered_pipeline.py new file mode 100644 index 0000000000..012024590c --- /dev/null +++ b/client/verta/tests/unit_tests/pipeline/test_registered_pipeline.py @@ -0,0 +1,365 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for the RegisteredPipeline class +""" + +from unittest.mock import patch + +import pytest +from hypothesis import given, HealthCheck, settings, strategies as st + +import verta +from tests.unit_tests.strategies import resources +from verta.pipeline import RegisteredPipeline + + +def test_copy_graph( + make_mock_pipeline_graph, + make_mock_registered_model_version, + make_mock_registered_model, +) -> None: + """Test that the graph of a RegisteredPipeline can be copied. + + Each step in the copied graph should be a new object, but have the same + name, predecessors, and model version as the original. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + graph = make_mock_pipeline_graph() + pipeline = RegisteredPipeline( + graph=graph, + registered_model_version=make_mock_registered_model_version(), + ) + copied_graph = pipeline.copy_graph() + # convert from sets to lists and sort for side-by-side comparison + graph_steps_sorted = sorted(graph.steps, key=lambda x: x.name) + copied_graph_steps_sorted = sorted(copied_graph.steps, key=lambda x: x.name) + + for orig_step, copied_step in zip(graph_steps_sorted, copied_graph_steps_sorted): + assert orig_step is not copied_step + assert orig_step.name == copied_step.name + assert orig_step.predecessors == copied_step.predecessors + assert ( + orig_step.registered_model_version.id + == copied_step.registered_model_version.id + ) + assert copied_graph is not graph + + +@given(model_version_name=st.text(min_size=1)) +@settings( + suppress_health_check=[HealthCheck.function_scoped_fixture], + deadline=None, +) +def test_log_pipeline_definition_artifact( + model_version_name, + mocked_responses, + make_mock_pipeline_graph, + make_mock_registered_model, + make_mock_registered_model_version, +) -> None: + """Verify the expected sequence of calls when a pipeline definition + is logged as an artifact to the pipeline's model version. + + Fetching the registered model version is patched instead of mocking a + response to avoid having to pass the RM's id down through multiple + pytest fixtures. + """ + rm = make_mock_registered_model(id=123, name="test_rm") + rmv = make_mock_registered_model_version() + # Fetch the registered model version + mocked_responses.get( + f"{rmv._conn.scheme}://{rmv._conn.socket}/api/v1/registry/model_versions/{rmv.id}", + json={ + "model_version": { + "id": rmv.id, + "registered_model_id": rmv.registered_model_id, + "version": model_version_name, + } + }, + status=200, + ) + mocked_responses.put( + f"{rmv._conn.scheme}://{rmv._conn.socket}/api/v1/registry/registered_models/{rmv.registered_model_id}/model_versions/{rmv.id}", + json={}, + status=200, + ) + # Fetch the artifact upload URL + mocked_responses.post( + f"{rmv._conn.scheme}://{rmv._conn.socket}/api/v1/registry/model_versions/{rmv.id}/getUrlForArtifact", + json={ + "url": f"https://account.s3.amazonaws.com/development/ModelVersionEntity/" + f"{rmv.id}/pipeline.json" + }, + status=200, + ) + # Upload the artifact + mocked_responses.put( + f"https://account.s3.amazonaws.com/development/ModelVersionEntity/{rmv.id}/pipeline.json", + json={}, + status=200, + ) + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=rm + ): + pipeline = RegisteredPipeline( + graph=make_mock_pipeline_graph(), + registered_model_version=rmv, + ) + pipeline._log_pipeline_definition_artifact() + + +def test_get_pipeline_definition_artifact( + make_mock_registered_model_version, + make_mock_simple_pipeline_definition, +) -> None: + """Test that a pipeline definition artifact can be fetched from the + registered model version associated with a RegisteredPipeline object. + """ + rmv = make_mock_registered_model_version() + pipeline_definition = RegisteredPipeline._get_pipeline_definition_artifact( + registered_model_version=rmv, + ) + assert pipeline_definition == make_mock_simple_pipeline_definition(id=rmv.id) + + +def test_to_pipeline_definition( + make_mock_pipeline_graph, + make_mock_registered_model_version, + make_mock_registered_model, +) -> None: + """Test that a pipeline definition can be constructed from a + RegisteredPipeline object. + + In depth testing of the `_to_graph_definition` + and `_to_steps_definition` functions are handled in unit tests for + PipelineGraph. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + graph = make_mock_pipeline_graph() + pipeline = RegisteredPipeline( + graph=graph, + registered_model_version=make_mock_registered_model_version(), + ) + pipeline_definition = pipeline._to_pipeline_definition() + assert pipeline_definition == { + "pipeline_version_id": pipeline.id, + "graph": graph._to_graph_definition(), + "steps": graph._to_steps_definition(), + } + + +@given(resources=resources()) +def test_to_pipeline_configuration_valid_complete( + resources, + make_mock_pipeline_graph, + make_mock_registered_model_version, + make_mock_registered_model, +) -> None: + """Test that a pipeline configuration can be constructed from a + RegisteredPipeline object and a valid list of pipeline resources, + where resources are provided for every step. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + graph = make_mock_pipeline_graph() + step_resources = {step.name: resources for step in graph.steps} + pipeline = RegisteredPipeline( + graph=graph, + registered_model_version=make_mock_registered_model_version(), + ) + + pipeline_configuration = pipeline._to_pipeline_configuration( + pipeline_resources=step_resources + ) + assert pipeline_configuration["pipeline_version_id"] == pipeline.id + assert len(graph.steps) == len(pipeline_configuration["steps"]) + for graph_step, config_step in zip(graph.steps, pipeline_configuration["steps"]): + # All steps provided are included in the configuration. + assert graph_step.name == config_step["name"] + # All steps in the config have resources + assert "resources" in config_step.keys() + + +@given(resources=resources()) +def test_to_pipeline_configuration_valid_incomplete( + resources, + make_mock_pipeline_graph, + make_mock_registered_model_version, + make_mock_registered_model, +) -> None: + """Test that a pipeline configuration can be constructed from a + RegisteredPipeline object and a valid list of pipeline resources, + where resources are not provided for every step. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + graph = make_mock_pipeline_graph() + partial_steps = list(graph.steps)[:-1] + excluded_step = list(graph.steps)[-1] + step_resources = {step.name: resources for step in partial_steps} + pipeline = RegisteredPipeline( + graph=graph, + registered_model_version=make_mock_registered_model_version(), + ) + + pipeline_configuration = pipeline._to_pipeline_configuration( + pipeline_resources=step_resources + ) + assert pipeline_configuration["pipeline_version_id"] == pipeline.id + # All steps have been included in the configuration + assert len(graph.steps) == len(pipeline_configuration["steps"]) + # Compare the steps that have resources, allowing zip to drop the excluded step. + for graph_step, config_step in zip(partial_steps, pipeline_configuration["steps"]): + # All steps provided are included in the configuration. + assert graph_step.name == config_step["name"] + # All steps for which resource were provided have resources in the config. + assert "resources" in config_step.keys() + # The step for which no resources were provided is in the config without resources. + assert excluded_step.name == pipeline_configuration["steps"][-1]["name"] + assert "resources" not in pipeline_configuration["steps"][-1].keys() + + +@given(resources=resources()) +def test_to_pipeline_configuration_invalid_resources( + resources, + make_mock_pipeline_graph, + make_mock_registered_model_version, + make_mock_registered_model, +) -> None: + """Test that the expected errors are raised when an invalid pipeline resources + are provided. + + Invalid resources include: + - a step name not in the pipeline -> ValueError + - a step name that is not a string -> TypeError + - a step resource that is not a Resources object -> TypeError + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rmv") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + graph = make_mock_pipeline_graph() + step_resources = {step.name: resources for step in graph.steps} + pipeline = RegisteredPipeline( + graph=graph, + registered_model_version=make_mock_registered_model_version(), + ) + # step name not in pipeline + step_resources["invalid_step_name"] = resources + with pytest.raises(ValueError) as err: + pipeline._to_pipeline_configuration(pipeline_resources=step_resources) + assert ( + str(err.value) == "pipeline_resources contains resources for a step not in the " + "pipeline: 'invalid_step_name'" + ) + step_resources.pop("invalid_step_name") + # step name not a string + step_resources.update({123: resources}) + with pytest.raises(TypeError) as err2: + pipeline._to_pipeline_configuration(pipeline_resources=step_resources) + assert ( + str(err2.value) == "pipeline_resources keys must be type str, not " + ) + step_resources.pop(123) + # step resource not a Resources object + step_resources.update({"step_1": "not_resources"}) + with pytest.raises(TypeError) as err3: + pipeline._to_pipeline_configuration(pipeline_resources=step_resources) + assert ( + str(err3.value) + == "pipeline_resources values must be type Resources, not " + ) + + +def test_to_pipeline_configuration_no_resources( + make_mock_pipeline_graph, + make_mock_registered_model_version, + make_mock_registered_model, +) -> None: + """Test that a pipeline configuration can be constructed from a + RegisteredPipeline object without providing any pipeline resources. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + graph = make_mock_pipeline_graph() + pipeline = RegisteredPipeline( + graph=graph, + registered_model_version=make_mock_registered_model_version(), + ) + pipeline_configuration = pipeline._to_pipeline_configuration() + assert pipeline_configuration["pipeline_version_id"] == pipeline.id + for graph_step, config_step in zip(graph.steps, pipeline_configuration["steps"]): + # All steps are included in the configuration + assert graph_step.name == config_step["name"] + # No resources are found in the resulting configuration + assert "resources" not in config_step.keys() + + +def test_from_pipeline_definition( + make_mock_registered_model_version, + mocked_responses, +) -> None: + """Test that a RegisteredPipeline object can be constructed from a pipeline + definition. + + The model version's `_get_artifact` function is overidden in the + mocked RMV fixture to return a simple, consistent pipeline definition. + Calls related to the fetching of the RMV and RM are mocked. + """ + rmv = make_mock_registered_model_version() + mocked_responses.get( + f"{rmv._conn.scheme}://{rmv._conn.socket}/api/v1/registry/model_versions/1", + json={}, + status=200, + ) + mocked_responses.get( + f"{rmv._conn.scheme}://{rmv._conn.socket}/api/v1/registry/model_versions/2", + json={}, + status=200, + ) + mocked_responses.get( + f"{rmv._conn.scheme}://{rmv._conn.socket}/api/v1/registry/registered_models/0", + json={}, + status=200, + ) + pipeline = RegisteredPipeline._from_pipeline_definition( + registered_model_version=rmv, + ) + assert isinstance(pipeline, RegisteredPipeline) + assert pipeline.id == rmv.id + + +def test_bad_mutation_of_graph_steps_exception( + make_mock_registered_model, + make_mock_registered_model_version, + make_mock_pipeline_graph, +): + """Test that we throw the correct exception when a user tries to mutate + the steps of a graph in an inappropriate way. + """ + mocked_rm = make_mock_registered_model(id=123, name="test_rm") + mocked_rmv = make_mock_registered_model_version() + with patch.object( + verta.pipeline.PipelineStep, "_get_registered_model", return_value=mocked_rm + ): + graph = make_mock_pipeline_graph() + + graph.steps.add("not_a_step") + with pytest.raises(TypeError) as err: + RegisteredPipeline(graph=graph, registered_model_version=mocked_rmv) + assert ( + str(err.value) == f"individual steps of a PipelineGraph must be type" + f" PipelineStep, not ." + ) diff --git a/client/verta/tests/unit_tests/registry/test_model_dependencies.py b/client/verta/tests/unit_tests/registry/test_model_dependencies.py index d8579f4f25..8af9c2938e 100644 --- a/client/verta/tests/unit_tests/registry/test_model_dependencies.py +++ b/client/verta/tests/unit_tests/registry/test_model_dependencies.py @@ -145,6 +145,7 @@ def test_class_module_names(dependency_testing_model) -> None: 'requests', 'sklearn', 'torch', + 'typing', 'urllib3', 'verta', 'yaml', diff --git a/client/verta/tests/unit_tests/strategies.py b/client/verta/tests/unit_tests/strategies.py index 6241e0aee7..6c442b1bb7 100644 --- a/client/verta/tests/unit_tests/strategies.py +++ b/client/verta/tests/unit_tests/strategies.py @@ -1,18 +1,17 @@ # -*- coding: utf-8 -*- """Hypothesis composite strategies for use in client unit tests.""" - from string import ascii_letters, ascii_lowercase, hexdigits from typing import Any, Dict, Optional import hypothesis.strategies as st +from tests.strategies import json_strategy from verta._internal_utils._utils import _VALID_FLAT_KEY_CHARS, python_to_val_proto from verta._protos.public.common import CommonService_pb2 from verta._protos.public.modeldb.versioning import Code_pb2, Dataset_pb2 -from verta.endpoint import KafkaSettings, build - -from tests.strategies import json_strategy +from verta.endpoint import build, KafkaSettings +from verta.endpoint.resources import NvidiaGPU, NvidiaGPUModel, Resources @st.composite @@ -262,3 +261,59 @@ def mock_workspace(draw): ) ) return workspace + + +@st.composite +def pipeline_definition(draw): + """Return a strategy for a mocked linear pipeline specification + dictionary with an arbitrary number of steps. + """ + + # step names in a pipeline must be unique + step_names = draw(st.lists(st.text(min_size=1), min_size=2, unique=True)) + model_versions = draw( + st.lists( + # limit max value to prevent protobuf "Value out of range" error + st.integers(min_value=1, max_value=2**63), + min_size=len(step_names), + max_size=len(step_names), + unique=True, + ) + ) + + graph = list() + for i in range(len(step_names)): + if i == 0: + graph.append({"predecessors": [], "name": step_names[i]}) + else: + graph.append({"predecessors": [step_names[i - 1]], "name": step_names[i]}) + + steps = list() + for i in range(len(step_names)): + steps.append( + { + "model_version_id": model_versions[i], + "name": step_names[i], + } + ) + + return { + "graph": graph, + "pipeline_version_id": draw(st.integers(min_value=1)), + "steps": steps, + } + + +@st.composite +def resources(draw): + """Return a strategy emulating the Resources class.""" + return Resources( + cpu=draw(st.integers(min_value=1)), + memory=draw( + st.from_regex(r"^[0-9]+[e]?[0-9]*[E|P|T|G|M|K]?[i]?$", fullmatch=True) + ), + nvidia_gpu=NvidiaGPU( + model=draw(st.sampled_from([NvidiaGPUModel.T4, NvidiaGPUModel.V100])), + number=draw(st.integers(min_value=1)), + ), + ) diff --git a/client/verta/verta/pipeline/__init__.py b/client/verta/verta/pipeline/__init__.py new file mode 100644 index 0000000000..af64740fcb --- /dev/null +++ b/client/verta/verta/pipeline/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +"""Utilities for defining and interacting with pipelines.""" + +from verta._internal_utils import documentation +from ._pipeline_graph import PipelineGraph +from ._pipeline_step import PipelineStep +from ._registered_pipeline import RegisteredPipeline + +documentation.reassign_module( + [ + PipelineGraph, + PipelineStep, + RegisteredPipeline, + ], + module_name=__name__, +) diff --git a/client/verta/verta/pipeline/_pipeline_graph.py b/client/verta/verta/pipeline/_pipeline_graph.py new file mode 100644 index 0000000000..55c2d7f4af --- /dev/null +++ b/client/verta/verta/pipeline/_pipeline_graph.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- + +from typing import Any, Dict, List, Set, Tuple, Union + +from .._internal_utils._utils import Configuration, Connection +from ._pipeline_step import PipelineStep + + +class PipelineGraph: + """Object representing a collection of PipelineSteps to be run as a single + inference pipeline. + + Parameters + ---------- + steps : list, set, or tuple of :class:`~verta.pipeline.PipelineStep` + Set of all possible steps of the pipeline. Ordering of steps in the pipeline + itself is determined by the predecessors provided to each step. + + Attributes + ---------- + steps: set of :class:`~verta.deployment.PipelineStep` + Set of PipelineSteps comprising all possible steps in this PiplineGraph. + """ + + def __init__( + self, steps: Union[List[PipelineStep], Set[PipelineStep], Tuple[PipelineStep]] + ): + self._steps = self._validate_steps(steps) + + def __repr__(self) -> str: + return f"PipelineGraph steps:\n{self._format_steps()}" + + def _format_steps(self) -> str: + """Format steps for improved readability in __repr__() function.""" + return "\n".join([repr(s) for s in self._steps]) + + @property + def steps(self) -> Set[PipelineStep]: + return self._steps + + @steps.setter + def steps(self, value): + raise AttributeError("can't set attribute 'steps'; please use set_steps()") + + def set_steps( + self, steps: Union[List[PipelineStep], Set[PipelineStep], Tuple[PipelineStep]] + ) -> Set[PipelineStep]: + """Update the set of steps for this PipelineGraph to the provided value. + + Parameters + ---------- + steps : list, set, tuple of :class:`~verta.deployment.PipelineStep` + List, set, or tuple of all possible steps of the pipeline graph. + All options are converted to a set, so order is irrelevant and + duplicates are removed. + + Returns + ------- + set of :class:`~verta.deployment.PipelineStep` + The steps now set for this graph, if validation is successful. + + Raises + ------ + TypeError + If ``steps`` is not a set of PipelineStep objects. + """ + self._steps = set(self._validate_steps(steps)) + return self.steps + + def _validate_steps( + self, steps: Union[List[PipelineStep], Set[PipelineStep], Tuple[PipelineStep]] + ) -> Set[PipelineStep]: + """Validate that the provided steps are a set of PipelineStep objects. + + Parameters + ---------- + steps : list, set, or tuple of :class:`~verta.deployment.PipelineStep` + List, set, or tuple of steps provided by a user. + + Returns + ------- + set of :class:`~verta.deployment.PipelineStep` + The same set of steps if validation is successful. + + Raises + ------ + TypeError + If steps is not a set of PipelineStep objects. + """ + if not isinstance(steps, (list, set, tuple)): + raise TypeError( + f"steps must be type list, set, or tuple, not {type(steps)}" + ) + for step in steps: + if not isinstance(step, PipelineStep): + raise TypeError( + f"individual steps of a PipelineGraph must be type" + f" PipelineStep, not {type(step)}." + ) + # throw an exception if any step's predecessors attr has been inappropriately mutated. + step._validate_predecessors(step.predecessors) + if len([s.name for s in steps]) != len(set([s.name for s in steps])): + raise ValueError("step names must be unique within a PipelineGraph") + return steps + + @classmethod + def _from_definition( + cls, pipeline_definition: Dict[str, Any], conn: Connection, conf: Configuration + ) -> "PipelineGraph": + """Create a PipelineGraph instance from a specification dict. + + This is used to return a PipelineGraph object when fetching an existing registered + pipeline from the backend in the form of a dict extracted from a `pipeline.json` + artifact. + + Parameters + ---------- + pipeline_definition : dict + Pipeline definition dict from which to create the Pipeline. + conn : :class:`~verta._internal_utils._utils.Connection` + Connection object for fetching the model version associated with the step + conf: :class:`~verta._internal_utils._utils.Configuration` + Configuration object for fetching the model version associated with the step + """ + return cls( + steps=PipelineStep._steps_from_pipeline_definition( + pipeline_definition, conn, conf + ), + ) + + def _to_graph_definition(self) -> List[Dict[str, Any]]: + """Create a pipeline graph specification from this PipelineGraph. + + This is fed to the backend as 'graph' in our PipelineDefinition schema. + """ + return [step._to_graph_spec() for step in self.steps] + + def _to_steps_definition(self) -> List[Dict[str, Any]]: + """Create a pipeline steps specification from this PipelineGraph. + + This is fed to the backend as 'steps' in our PipelineDefinition schema. + """ + return [step._to_step_spec() for step in self.steps] diff --git a/client/verta/verta/pipeline/_pipeline_step.py b/client/verta/verta/pipeline/_pipeline_step.py new file mode 100644 index 0000000000..04b3a68eba --- /dev/null +++ b/client/verta/verta/pipeline/_pipeline_step.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- + +from typing import Any, Dict, List, Optional, Set, Tuple, Union + +from .._internal_utils._utils import Configuration, Connection +from ..registry.entities import RegisteredModel, RegisteredModelVersion + + +class PipelineStep: + """Object representing a single step to be run within an inference pipeline. + + Parameters + ---------- + name : str + Name of the step, for use within the scope of the pipeline only. + registered_model_version : :class:`~verta.registry.entities.RegisteredModelVersion` + Registered model version to run for this step. + predecessors : list, set, tuple, optional + List, set, or tuple of unique PipelineStep objects whose outputs will be treated as + inputs to this step. If not included, the step is assumed to be an initial step. + + Attributes + ---------- + name : str + Name of the step within the scope of the pipeline. + registered_model_version : :class:`~verta.registry.entities.RegisteredModelVersion` + Registered model version run by this step. + predecessors : set + Set of PipelineSteps whose outputs will be treated as inputs to this step. + """ + + def __init__( + self, + name: str, + registered_model_version: RegisteredModelVersion, + predecessors: Optional[ + Union[List["PipelineStep"], Set["PipelineStep"], Tuple["PipelineStep"]] + ] = None, # Optional because it could be the first step with no predecessors + ): + self._name = self.set_name(name) + self._registered_model_version = self.set_registered_model_version( + registered_model_version + ) + self._predecessors = self.set_predecessors(predecessors) + self._registered_model: RegisteredModel = self._get_registered_model() + + def __repr__(self) -> str: + return "\n ".join( + ( + "PipelineStep:", + f"step name: {self.name}", + f"registered_model: {self._registered_model.name}", + f"registered_model_id: {self._registered_model.id}", + f"registered_model_version: {self.registered_model_version.name}", + f"registered_model_version_id: {self.registered_model_version.id}", + f"predecessors: {[s.name for s in self.predecessors]}", + ) + ) + + @property + def registered_model_version(self) -> RegisteredModelVersion: + return self._registered_model_version + + @registered_model_version.setter + def registered_model_version(self, value) -> None: + """Raise a more informative error than the default.""" + raise AttributeError( + "can't set attribute 'registered_model_version'; please use set_registered_model_version()" + ) + + def set_registered_model_version( + self, registered_model_version: RegisteredModelVersion + ) -> RegisteredModelVersion: + """Set the registered model version associated with this step. + + Parameters + ---------- + registered_model_version : :class:`~verta.registry.entities.RegisteredModelVersion` + Registered model version to use for the step. + + Returns + ------- + RegisteredModelVersion + The new registered model version now set for this step. + + Raises + ------ + TypeError + If the provided value is not type RegisteredModelVersion. + + """ + if not isinstance(registered_model_version, RegisteredModelVersion): + raise TypeError( + f"registered_model_version must be a RegisteredModelVersion object, " + f"not {type(registered_model_version)}" + ) + self._registered_model_version = registered_model_version + self._registered_model = self._get_registered_model() + return self.registered_model_version + + @property + def name(self) -> str: + return self._name + + @name.setter + def name(self, value) -> None: + """Raise a more informative error than the default.""" + raise AttributeError("can't set attribute 'name'; please use set_name()") + + def set_name(self, name: str) -> str: + """Change the name of this step. + + Parameters + ---------- + name : str + New name to use for the step. + + Returns + ------- + str + The new name now set for this step. + + Raises + ------ + TypeError + If the provided value for ``name`` is not type str. + + """ + if not isinstance(name, str): + raise TypeError(f"name must be a string, not {type(name)}") + self._name = name + return self.name + + @property + def predecessors(self) -> Set["PipelineStep"]: + return self._predecessors + + @predecessors.setter + def predecessors(self, value) -> None: + """Raise a more informative error than the default.""" + raise AttributeError( + "can't set attribute 'predecessors'; please use set_predecessors()" + ) + + def set_predecessors( + self, + steps: Optional[ + Union[List["PipelineStep"], Set["PipelineStep"], Tuple["PipelineStep"]] + ] = None, + ) -> Set["PipelineStep"]: + """Set the predecessors associated with this step. + + Parameters + ---------- + steps : list, set, or tuple, optional + List, set, or tuple of PipelineStep objects whose outputs will be treated as + inputs to this step. All options are converted to a set, so order is irrelevant + and duplicates are removed. An empty set is used if no input is provided. + + Returns + ------- + set of PipelineStep + The new set of predecessors now set for this step. + + Raises + ------ + TypeError + If the provided value for ``steps`` is not a set of PipelineStep objects. + """ + if steps: + self._predecessors = set(self._validate_predecessors(steps)) + return self.predecessors + self._predecessors = set() + return self.predecessors + + @staticmethod + def _validate_predecessors( + predecessors: Union[ + Set["PipelineStep"], List["PipelineStep"], Tuple["PipelineStep"] + ] + ) -> Set["PipelineStep"]: + """Validate that the provided predecessors are a set of PipelineStep objects. + + Parameters + ---------- + predecessors : set + Set of PipelineStep objects whose outputs will be treated as inputs to + this step. + """ + if not isinstance(predecessors, (set, list, tuple)): + raise TypeError( + f"steps must be type list, set, or tuple, not {type(predecessors)}" + ) + for step in predecessors: + if not isinstance(step, PipelineStep): + raise TypeError( + f"individual predecessors of a PipelineStep must be type" + f" PipelineStep, not {type(step)}." + ) + return predecessors + + def _get_registered_model(self) -> RegisteredModel: + """Fetch the registered model associated with this step's model version. + + This is to provide important context to the user when a registered + pipeline is fetched from the backend. + + Returns + ------- + :class:`~verta.registry.entities.RegisteredModel` + """ + rm = RegisteredModel._get_by_id( + id=self._registered_model_version.registered_model_id, + conn=self.registered_model_version._conn, + conf=self.registered_model_version._conf, + ) + return rm + + @classmethod + def _steps_from_pipeline_definition( + cls, pipeline_definition: Dict[str, Any], conn: Connection, conf: Configuration + ) -> Set["PipelineStep"]: + """Return a list of PipelineStep objects from a pipeline definition. + + This method is used when fetching a pre-existing pipeline from the backend + and converting it to a local RegisteredPipeline object, which includes the + PipelineGraph and all component steps as PipelineStep objects. + + Parameters + ---------- + pipeline_definition : dict + Specification dictionary for the whole pipeline. + conn : :class:`~verta._internal_utils._utils.Connection` + Connection object for fetching the model version associated with each step. + conf: :class:`~verta._internal_utils._utils.Configuration` + Configuration object for fetching the model version associated with each + step. + + Returns + ------- + set of :class:`~verta._pipelines.PipelineStep` + Set of steps in the pipeline spec as PipelineStep objects. + """ + steps: Set["PipelineStep"] = set() + for step in pipeline_definition["steps"]: + steps.add( + cls( + name=step["name"], + registered_model_version=RegisteredModelVersion._get_by_id( + id=step["model_version_id"], conn=conn, conf=conf + ), + predecessors=set(), + ) + ) + for step_object in steps: + predecessor_names = [ + s["predecessors"] + for s in pipeline_definition["graph"] + if s["name"] == step_object.name + ][0] + step_object.set_predecessors( + {s for s in steps if s.name in predecessor_names} + ) + return steps + + def _to_graph_spec(self) -> Dict[str, Any]: + """Return a dictionary representation of predecessors for this step, + formatted for a pipeline definition. + + This is fed to the backend as 'graph' in our PipelineDefinition schema + """ + return { + "name": self.name, + "predecessors": [s.name for s in self.predecessors], + } + + def _to_step_spec(self) -> Dict[str, Any]: + """Return a dictionary representation of this step, formatted for a + pipeline definition. + + This is fed to the backend as 'steps' in our PipelineDefinition schema + """ + return { + "name": self.name, + "model_version_id": self.registered_model_version.id, + } diff --git a/client/verta/verta/pipeline/_registered_pipeline.py b/client/verta/verta/pipeline/_registered_pipeline.py new file mode 100644 index 0000000000..ecb02b0555 --- /dev/null +++ b/client/verta/verta/pipeline/_registered_pipeline.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- + +import copy +import json +import tempfile +from typing import Any, Dict, Optional + +from ..endpoint.resources import Resources +from ._pipeline_graph import PipelineGraph +from ..registry.entities import RegisteredModelVersion + + +class RegisteredPipeline: + """Object representing a version of a registered inference pipeline. + + There should not be a need to instantiate this class directly; please use + :meth:`Client.create_registered_pipeline() ` + for creating a new pipeline, or + :meth:`Client.get_registered_pipeline() ` + for fetching an existing pipeline. + + .. note:: + Registered pipelines are immutable once registered with Verta. A new version + must be created and registered with any desired changes. Use the ``copy_graph()`` + function to create a local copy of this pipeline's graph that can be modified + and used to create the new version. + + Attributes + ---------- + name: str + Name of this pipeline. + id: int + ID of this Pipeline, auto-assigned by the Verta backend. + graph: :class:`~verta.pipeline.PipelineGraph` + PipelineGraph object containing all possible steps in the Pipline. + """ + + def __init__( + self, + registered_model_version: RegisteredModelVersion, + graph: PipelineGraph, + ): + """Create a Pipeline instance from an existing RegisteredModelVersion object + and the provided pipeline graph. + + Name and ID are captured once upon creation to avoid additional HTTP calls + to refresh the cache of the RMV, because pipelines are immutable. + """ + self._registered_model_version = registered_model_version + self._graph = graph + self._graph._validate_steps(self._graph.steps) + # throws an exception if the graph's steps attr has been inappropriately mutated. + + def __repr__(self): + return "\n ".join( + ( + "RegisteredPipeline:", + f"pipeline name: {self.name}", + f"pipeline id: {self.id}", + f"{self.graph}", + ) + ) + + @property + def name(self): + return self._registered_model_version.name + + @property + def id(self): + return self._registered_model_version.id + + @property + def graph(self): + return self._graph + + def copy_graph(self) -> PipelineGraph: + """Return a copy of the PipelineGraph object for this pipeline. + + RegisteredPipeline objects are immutable once registered with Verta. This + copy can be modified and used to create and register a new RegisteredPipeline. + + Returns + ------- + :class:`~verta.pipeline.PipelineGraph` + A deep copy of the PipelineGraph object for this pipeline. + """ + return copy.deepcopy(self.graph) + + def _log_pipeline_definition_artifact(self) -> None: + """ + Log the pipeline definition as an artifact of the registered model version. + """ + with tempfile.NamedTemporaryFile("w+") as temp_file: + json.dump(self._to_pipeline_definition(), temp_file) + self._registered_model_version.log_artifact("pipeline.json", temp_file) + + def _to_pipeline_definition(self) -> Dict[str, Any]: + """Create a complete pipeline definition dict from a name and PipelineGraph. + + Used in conjunction with the client function for creating a registered + pipeline from a pipeline graph. This gets converted to JSON and uploaded + as an artifact to the registered model version for the pipeline by the + _log_pipeline_definition_artifact function. + """ + return { + "pipeline_version_id": self.id, + "graph": self._graph._to_graph_definition(), + "steps": self._graph._to_steps_definition(), + } + + def _to_pipeline_configuration( + self, pipeline_resources: Optional[Dict[str, Resources]] = None + ) -> Dict[str, Any]: + """Build a pipeline configuration dict for this pipeline. + + Used in conjunction with the client function for creating a registered + pipeline from a pipeline graph. This gets included in the update request + for an endpoint when the pipeline is deployed. The `env` and `build` keys + are not included in the configuration resulting in default values being + used by the backend. + + Parameters + ---------- + pipeline_resources : dict of str to :class:`~verta.endpoint.resources.Resources`, optional + Resources to be allocated to each step of the pipeline. Keys are step names. + + Returns + ------- + dict + Representation of a pipeline configuration. + + Raises + ------ + TypeError + If pipeline_resources is not a dict of str to Resources. + ValueError + If pipeline_resources contains resources for a step name that is not + in the pipeline. + """ + if pipeline_resources: + for res in pipeline_resources.values(): + if not isinstance(res, Resources): + raise TypeError( + f"pipeline_resources values must be type Resources, not {type(res)}" + ) + for step_name in pipeline_resources.keys(): + if not isinstance(step_name, str): + raise TypeError( + f"pipeline_resources keys must be type str, not {type(step_name)}" + ) + if step_name not in [step.name for step in self._graph.steps]: + raise ValueError( + f"pipeline_resources contains resources for a step not in " + f"the pipeline: '{step_name}'" + ) + steps = list() + for step in self._graph.steps: + step_config = { + "name": step.name, + } + if pipeline_resources: + step_res = pipeline_resources.get(step.name, None) + if step_res: + step_config["resources"] = step_res._as_dict() + steps.append(step_config) + return { + "pipeline_version_id": self.id, + "steps": steps, + } + + @staticmethod + def _get_pipeline_definition_artifact( + registered_model_version: RegisteredModelVersion, + ) -> Dict[str, Any]: + """Get the pipeline definition artifact from the registered model version. + + This is used to fetch the pipeline definition from the pipeline RMV when an + existing registered pipeline is fetched from the backend. + + Parameters + ---------- + registered_model_version : :class:`~verta.registry.entities.RegisteredModelVersion` + RegisteredModelVersion object associated with this pipeline, from which + the pipeline definition artifact will be fetched. + + Returns + ------- + dict + Pipeline definition dictionary. + """ + return json.load(registered_model_version.get_artifact("pipeline.json")) + + @classmethod + def _from_pipeline_definition( + cls, + registered_model_version: RegisteredModelVersion, + ) -> "RegisteredPipeline": + """Create a local RegisteredPipeline object from a pipeline's registered + model version. + + Used when fetching a registered pipeline from the Verta backend. The + `pipeline.json` artifact is fetched from the RMV and used to build a + local RegisteredPipeline object. + + Parameters + ---------- + registered_model_version : :class:`~verta.registry.entities.RegisteredModelVersion` + RegisteredModelVersion object associated with this pipeline. + """ + pipeline_definition = cls._get_pipeline_definition_artifact( + registered_model_version + ) + return cls( + registered_model_version=registered_model_version, + graph=PipelineGraph._from_definition( + pipeline_definition=pipeline_definition, + conn=registered_model_version._conn, + conf=registered_model_version._conf, + ), + )