From 3c7b6e9ff9bd5b8b7cfdbb8f9f2498f89acdda59 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:34:02 +0100 Subject: [PATCH 01/14] Add dynamic artifacts naming, documentation and tests --- .../handle-data-artifacts/artifacts-naming.md | 85 +++++++++++++ src/zenml/artifacts/artifact_config.py | 4 +- src/zenml/orchestrators/step_runner.py | 3 +- src/zenml/steps/utils.py | 48 +++++-- .../functional/steps/test_naming.py | 117 ++++++++++++++++++ 5 files changed, 246 insertions(+), 11 deletions(-) create mode 100644 docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md create mode 100644 tests/integration/functional/steps/test_naming.py diff --git a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md new file mode 100644 index 00000000000..b603087ab91 --- /dev/null +++ b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md @@ -0,0 +1,85 @@ +--- +description: Understand how you can name your ZenML artifacts. +--- + +# How Artifact Naming works in ZenML + +ZenML provides flexible options for naming output artifacts, supporting both static and dynamic naming strategies: +- Names can be generated dynamically at runtime +- Support for lambda functions, callable functions, and string templates +- Compatible with single and multiple output scenarios +- Annotations help define naming strategy without modifying core logic + +## Naming Strategies + +### Static Naming +Static names are defined directly as string literals. + +```python +@step +def static_single() -> Annotated[str, "static_output_name"]: + return "null" +``` + +### Dynamic Naming +Dynamic names can be generated using: + +#### Lambda Functions +```python +from random import randint + +lambda_namer = lambda: "dynamic_name_" + str(randint(0,42)) + +@step +def dynamic_single_lambda() -> Annotated[str, lambda_namer]: + return "null" +``` + +#### Callable Functions +```python +from random import randint + +def func_namer(): + return "dummy_dynamic_" + str(randint(0,42)) + +@step +def dynamic_single_callable() -> Annotated[str, func_namer]: + return "null" +``` + +#### String Templates +Use the following placeholders that ZenML will replace: + +* `{date}` will resolve to the current date, e.g. `2024_11_18` +* `{time}` will resolve to the current time, e.g. `11_07_09_326492` + +```python +str_namer = "placeholder_name_{date}_{time}" + +@step +def dynamic_single_string() -> Annotated[str, str_namer]: + return "null" +``` + +### Multiple Output Handling + +If you plan to return multiple artifacts from you ZenML step you can flexibly combine all naming options outlined above, like this: + +```python +from random import randint + +def func_namer(): + return "dummy_dynamic_" + str(randint(0,42)) + +@step +def mixed_tuple() -> Tuple[ + Annotated[str, "static_output_name"], + Annotated[str, lambda: "dynamic_name_" + str(randint(0,42))], + Annotated[str, func_namer], + Annotated[str, "placeholder_name_{date}_{time}"], +]: + return "static_namer", "lambda_namer", "func_namer", "str_namer" +``` + + +
ZenML Scarf
diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index 9a297dd2110..2ab30acd2b2 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -15,7 +15,7 @@ from typing import Any, Dict, List, Optional, Union -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field, PrivateAttr, model_validator from zenml.logger import get_logger from zenml.metadata.metadata_types import MetadataType @@ -61,6 +61,8 @@ def my_step() -> Annotated[ is_model_artifact: bool = False is_deployment_artifact: bool = False + _is_dynamic: bool = PrivateAttr(False) + @model_validator(mode="before") @classmethod @before_validator_handler diff --git a/src/zenml/orchestrators/step_runner.py b/src/zenml/orchestrators/step_runner.py index c11c79c878f..160b800a2c6 100644 --- a/src/zenml/orchestrators/step_runner.py +++ b/src/zenml/orchestrators/step_runner.py @@ -149,7 +149,8 @@ def run( ) output_annotations = parse_return_type_annotations( - func=step_instance.entrypoint + func=step_instance.entrypoint, + original_outputs=step_run_info.config.outputs, ) self._stack.prepare_step_run(info=step_run_info) diff --git a/src/zenml/steps/utils.py b/src/zenml/steps/utils.py index e237d12f9ff..f6b9d02d47b 100644 --- a/src/zenml/steps/utils.py +++ b/src/zenml/steps/utils.py @@ -36,6 +36,7 @@ from zenml.metadata.metadata_types import MetadataType from zenml.steps.step_context import get_step_context from zenml.utils import settings_utils, source_code_utils, typing_utils +from zenml.utils.string_utils import format_name_template if TYPE_CHECKING: from zenml.steps import BaseStep @@ -94,7 +95,9 @@ def get_args(obj: Any) -> Tuple[Any, ...]: def parse_return_type_annotations( - func: Callable[..., Any], enforce_type_annotations: bool = False + func: Callable[..., Any], + enforce_type_annotations: bool = False, + original_outputs: Dict[str, ArtifactConfig] = None, ) -> Dict[str, OutputSignature]: """Parse the return type annotation of a step function. @@ -102,6 +105,7 @@ def parse_return_type_annotations( func: The step function. enforce_type_annotations: If `True`, raises an exception if a type annotation is missing. + original_outputs: The original outputs of the step function. Raises: RuntimeError: If the output annotation has variable length or contains @@ -132,6 +136,11 @@ def parse_return_type_annotations( else: return_annotation = Any + if original_outputs: + original_names = list(original_outputs.keys()) + else: + original_names = None + if typing_utils.get_origin(return_annotation) is tuple: requires_multiple_artifacts = has_tuple_return(func) if requires_multiple_artifacts: @@ -146,7 +155,13 @@ def parse_return_type_annotations( artifact_config = get_artifact_config_from_annotation_metadata( annotation ) - output_name = artifact_config.name if artifact_config else None + if artifact_config: + if artifact_config._is_dynamic and original_names: + output_name = original_names[i] + else: + output_name = artifact_config.name + else: + output_name = None has_custom_name = output_name is not None output_name = output_name or f"output_{i}" if output_name in output_signature: @@ -164,7 +179,13 @@ def parse_return_type_annotations( artifact_config = get_artifact_config_from_annotation_metadata( return_annotation ) - output_name = artifact_config.name if artifact_config else None + if artifact_config: + if artifact_config._is_dynamic and original_names: + output_name = original_names[0] + else: + output_name = artifact_config.name + else: + output_name = None has_custom_name = output_name is not None output_name = output_name or SINGLE_RETURN_OUT_NAME return { @@ -228,9 +249,11 @@ def get_artifact_config_from_annotation_metadata( error_message = ( "Artifact annotation should only contain two elements: the artifact " - "type, and either an output name or an `ArtifactConfig`, e.g.: " - "`Annotated[int, 'output_name']` or " - "`Annotated[int, ArtifactConfig(name='output_name'), ...]`." + "type, and one of the following: {an output name || " + "an `ArtifactConfig` || a callable returning string as name}, e.g.: " + "`Annotated[int, 'output_name']` || " + "`Annotated[int, ArtifactConfig(name='output_name')]`." + "`Annotated[int, lambda: 'name' + str(random_int(0,42))]`." ) if len(metadata) > 2: @@ -240,15 +263,20 @@ def get_artifact_config_from_annotation_metadata( # `Annotated[int, 'output_name', ArtifactConfig(...)]` output_name = None artifact_config = None + is_dynamic = False for metadata_instance in metadata: if isinstance(metadata_instance, str): - if output_name is not None: - raise ValueError(error_message) - output_name = metadata_instance + output_name = format_name_template(metadata_instance) + is_dynamic = output_name != metadata_instance elif isinstance(metadata_instance, ArtifactConfig): if artifact_config is not None: raise ValueError(error_message) artifact_config = metadata_instance + elif isinstance(metadata_instance, Callable): + output_name = metadata_instance() + if not isinstance(output_name, str): + raise ValueError(error_message) + is_dynamic = True else: raise ValueError(error_message) @@ -266,6 +294,8 @@ def get_artifact_config_from_annotation_metadata( if artifact_config and artifact_config.name == "": raise ValueError("Output name cannot be an empty string.") + artifact_config._is_dynamic = is_dynamic + return artifact_config diff --git a/tests/integration/functional/steps/test_naming.py b/tests/integration/functional/steps/test_naming.py new file mode 100644 index 00000000000..cfd6ad6ffe6 --- /dev/null +++ b/tests/integration/functional/steps/test_naming.py @@ -0,0 +1,117 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + +from typing import Callable, Tuple + +import pytest +from typing_extensions import Annotated + +from zenml import pipeline, step +from zenml.client import Client +from zenml.models.v2.core.pipeline_run import PipelineRunResponse + + +def func_namer(): + return "dummy_dynamic_" + str(43) + + +lambda_namer = lambda: "dummy_dynamic_" + str(42) +str_namer = "dummy_dynamic_time_{time}" +static_namer = "dummy_static" + + +def _validate_name_by_value(name: str, value: str) -> bool: + if value == "func_namer": + return name == func_namer() + if value == "lambda_namer": + return name == lambda_namer() + if value == "str_namer": + return name.startswith("dummy_dynamic_time_") + if value == "static_namer": + return name == "dummy_static" + return False + + +@step +def dynamic_single_lambda() -> Annotated[str, lambda_namer]: + return "lambda_namer" + + +@step +def dynamic_single_callable() -> Annotated[str, func_namer]: + return "func_namer" + + +@step +def dynamic_single_string() -> Annotated[str, str_namer]: + return "str_namer" + + +@step +def dynamic_tuple() -> ( + Tuple[ + Annotated[str, lambda_namer], + Annotated[str, func_namer], + Annotated[str, str_namer], + ] +): + return "lambda_namer", "func_namer", "str_namer" + + +@step +def mixed_tuple() -> ( + Tuple[ + Annotated[str, static_namer], + Annotated[str, lambda_namer], + Annotated[str, func_namer], + Annotated[str, str_namer], + ] +): + return "static_namer", "lambda_namer", "func_namer", "str_namer" + + +@step +def static_single() -> Annotated[str, static_namer]: + return "static_namer" + + +@pytest.mark.parametrize( + "step", + [ + (dynamic_single_lambda), + (dynamic_single_callable), + (dynamic_single_string), + (dynamic_tuple), + (mixed_tuple), + (static_single), + ], + ids=[ + "dynamic_single_lambda", + "dynamic_single_callable", + "dynamic_single_string", + "dynamic_tuple", + "mixed_tuple", + "static_single", + ], +) +def test_various_naming_scenarios(step: Callable, clean_client: Client): + @pipeline + def _inner(): + step() + + p: PipelineRunResponse = _inner() + for step_response in p.steps.values(): + for k in step_response.outputs.keys(): + value = clean_client.get_artifact_version(k).load() + assert _validate_name_by_value(k, value) From ab16afe29856e7b27fa936d85b6fe3a9c0a46901 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:47:04 +0100 Subject: [PATCH 02/14] add `ArtifactConfig` support --- src/zenml/artifacts/artifact_config.py | 6 ++-- src/zenml/steps/utils.py | 14 ++++++++++ .../functional/steps/test_naming.py | 28 ++++++++++++++----- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index 2ab30acd2b2..6a05ed43375 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -13,7 +13,7 @@ # permissions and limitations under the License. """Artifact Config classes to support Model Control Plane feature.""" -from typing import Any, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union from pydantic import BaseModel, Field, PrivateAttr, model_validator @@ -51,7 +51,9 @@ def my_step() -> Annotated[ is_deployment_artifact: Whether the artifact is a deployment artifact. """ - name: Optional[str] = None + name: Optional[Union[str, Callable]] = Field( + default=None, union_mode="smart" + ) version: Optional[Union[str, int]] = Field( default=None, union_mode="smart" ) diff --git a/src/zenml/steps/utils.py b/src/zenml/steps/utils.py index f6b9d02d47b..63e9c1b7c72 100644 --- a/src/zenml/steps/utils.py +++ b/src/zenml/steps/utils.py @@ -266,13 +266,27 @@ def get_artifact_config_from_annotation_metadata( is_dynamic = False for metadata_instance in metadata: if isinstance(metadata_instance, str): + if output_name is not None: + raise ValueError(error_message) output_name = format_name_template(metadata_instance) is_dynamic = output_name != metadata_instance elif isinstance(metadata_instance, ArtifactConfig): if artifact_config is not None: raise ValueError(error_message) artifact_config = metadata_instance + if isinstance(artifact_config.name, str): + _name = format_name_template(artifact_config.name) + is_dynamic = _name != metadata_instance + artifact_config.name = _name + elif isinstance(artifact_config.name, Callable): + _name = artifact_config.name() + if not isinstance(_name, str): + raise ValueError(error_message) + artifact_config.name = _name + is_dynamic = True elif isinstance(metadata_instance, Callable): + if output_name is not None: + raise ValueError(error_message) output_name = metadata_instance() if not isinstance(output_name, str): raise ValueError(error_message) diff --git a/tests/integration/functional/steps/test_naming.py b/tests/integration/functional/steps/test_naming.py index cfd6ad6ffe6..44e3d981083 100644 --- a/tests/integration/functional/steps/test_naming.py +++ b/tests/integration/functional/steps/test_naming.py @@ -17,7 +17,7 @@ import pytest from typing_extensions import Annotated -from zenml import pipeline, step +from zenml import ArtifactConfig, pipeline, step from zenml.client import Client from zenml.models.v2.core.pipeline_run import PipelineRunResponse @@ -86,15 +86,28 @@ def static_single() -> Annotated[str, static_namer]: return "static_namer" +@step +def mixed_tuple_artifact_config() -> ( + Tuple[ + Annotated[str, ArtifactConfig(name=static_namer)], + Annotated[str, ArtifactConfig(name=lambda_namer)], + Annotated[str, ArtifactConfig(name=func_namer)], + Annotated[str, ArtifactConfig(name=str_namer)], + ] +): + return "static_namer", "lambda_namer", "func_namer", "str_namer" + + @pytest.mark.parametrize( "step", [ - (dynamic_single_lambda), - (dynamic_single_callable), - (dynamic_single_string), - (dynamic_tuple), - (mixed_tuple), - (static_single), + dynamic_single_lambda, + dynamic_single_callable, + dynamic_single_string, + dynamic_tuple, + mixed_tuple, + static_single, + mixed_tuple_artifact_config, ], ids=[ "dynamic_single_lambda", @@ -103,6 +116,7 @@ def static_single() -> Annotated[str, static_namer]: "dynamic_tuple", "mixed_tuple", "static_single", + "mixed_tuple_artifact_config", ], ) def test_various_naming_scenarios(step: Callable, clean_client: Client): From 45cfbfaf5a441e0c4fa46643ca463831f1ece8dd Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:07:54 +0100 Subject: [PATCH 03/14] reshape docs --- .gitbook.yaml | 3 +- .../handle-data-artifacts/artifacts-naming.md | 4 + .../dynamically-assign-artifact-names.md | 143 ------------------ docs/book/toc.md | 2 +- 4 files changed, 7 insertions(+), 145 deletions(-) delete mode 100644 docs/book/how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md diff --git a/.gitbook.yaml b/.gitbook.yaml index 62e711dd848..bcf39e0c26f 100644 --- a/.gitbook.yaml +++ b/.gitbook.yaml @@ -47,7 +47,8 @@ redirects: how-to/build-pipelines/schedule-a-pipeline: how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md how-to/build-pipelines/delete-a-pipeline: how-to/pipeline-development/build-pipelines/delete-a-pipeline.md how-to/build-pipelines/compose-pipelines: how-to/pipeline-development/build-pipelines/compose-pipelines.md - how-to/build-pipelines/dynamically-assign-artifact-names: how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md + how-to/build-pipelines/dynamically-assign-artifact-names: how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md + how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names: how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md how-to/build-pipelines/retry-steps: how-to/pipeline-development/build-pipelines/retry-steps.md how-to/build-pipelines/run-pipelines-asynchronously: how-to/pipeline-development/build-pipelines/run-pipelines-asynchronously.md how-to/build-pipelines/control-execution-order-of-steps: how-to/pipeline-development/build-pipelines/control-execution-order-of-steps.md diff --git a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md index b603087ab91..9c366ba52dd 100644 --- a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md +++ b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md @@ -4,6 +4,10 @@ description: Understand how you can name your ZenML artifacts. # How Artifact Naming works in ZenML +In ZenML pipelines, you often need to reuse the same step multiple times with different inputs, resulting in multiple artifacts. However, the default naming convention for artifacts can make it challenging to track and differentiate between these outputs, especially when they need to be used in subsequent pipelines. Below you can find a detailed exploration of how you might name your output artifacts dynamically or statically, depending on your needs. + +ZenML uses type annotations in function definitions to determine artifact names. Output artifacts with the same name are saved with incremented version numbers. + ZenML provides flexible options for naming output artifacts, supporting both static and dynamic naming strategies: - Names can be generated dynamically at runtime - Support for lambda functions, callable functions, and string templates diff --git a/docs/book/how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md b/docs/book/how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md deleted file mode 100644 index 179913d9aee..00000000000 --- a/docs/book/how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md +++ /dev/null @@ -1,143 +0,0 @@ ---- -description: How to dynamically assign artifact names in your pipelines. ---- - -# Dynamically assign artifact names - -In ZenML pipelines, you often need to reuse the same step multiple times with -different inputs, resulting in multiple artifacts. However, the default naming -convention for artifacts can make it challenging to track and differentiate -between these outputs, especially when they need to be used in subsequent -pipelines. Below you can find a detailed exploration of how you might go about dynamically generating steps and artifacts to improve pipeline flexibility and maintainability. - -By default, ZenML uses type annotations in function definitions to determine artifact names. While this works well for steps used once in a pipeline, it becomes problematic when: - -1. The same step is called multiple times with different inputs. -2. The resulting artifacts need to be used in different pipelines later. -3. Output artifacts are saved with the same name and incremented version numbers. - -For example, when using a preprocessor step that needs to transform train, validation, and test data separately, you might end up with three versions of an artifact called `transformed_data`, making it difficult to track which is which. - -ZenML offers two possible ways to address this problem: - -1. Using factory functions to create dynamic steps with custom artifact names. -2. Using metadata to identify artifacts in a single step. - -## 1. Using factory functions for dynamic artifact names - -This approach allows you to create steps with custom artifact names dynamically: - -```python -from typing import Any, Dict -from typing_extensions import Annotated -from zenml import step, pipeline, get_step_context, ArtifactConfig - -def create_step(prefix: str): - def _entrypoint(data: Any) -> Annotated[Dict[str, Any], ArtifactConfig(name=f"{prefix}_artifact")]: - context = get_step_context() - return {"processed_data": data, "step_name": context.step_name} - - step_name = f"dynamic_step_{prefix}" - _entrypoint.__name__ = step_name - s = step(_entrypoint) - globals()[step_name] = s - return s - -# Create the dynamic steps -train_step = create_step(prefix="train") -validation_step = create_step(prefix="validation") -test_step = create_step(prefix="test") - -# Resolve the steps -train_step.resolve() -validation_step.resolve() -test_step.resolve() - -@pipeline -def dynamic_artifact_pipeline(train_data, val_data, test_data): - train_result = train_step(train_data) - validation_result = validation_step(val_data) - test_result = test_step(test_data) - - -dynamic_artifact_pipeline(train_data=1, val_data=2, test_data=3) -``` - -This method generates unique artifact names for each step, making it easier to -track and retrieve specific artifacts later in your workflow. - -![Dynamic artifact pipeline DAG in the -dashboard](../../../.gitbook/assets/dynamic_artifact_pipeline.png) - -One caveat applies to this first method which is that either of the following -two things must be true: - -- The factory must be in the same file as where the steps are defined -> This is - so the logic with `globals()` works -- The user must have use the same variable name for the step as the `__name__` - of the entrypoint function - -As you can see, this is not always possible or desirable and you should use -the second method if you can. - -## 2. Using Metadata for Custom Artifact Identification - -If you prefer using a single step and differentiating artifacts through metadata, try this approach: - -```python -from typing import Any, Dict -from typing_extensions import Annotated -from zenml import step, get_step_context, pipeline - -@step -def generic_step(data: Any, prefix: str) -> Annotated[Dict[str, Any], "dataset"]: - result = {"processed_data": data} - - # Add custom metadata - step_context = get_step_context() - step_context.add_output_metadata( - output_name="dataset", - metadata={"custom_prefix": prefix} - ) - - return result - -@pipeline -def metadata_artifact_pipeline(train_data, val_data, test_data): - generic_step(train_data, prefix="train") - generic_step(val_data, prefix="validation") - generic_step(test_data, prefix="test") - -metadata_artifact_pipeline(train_data=1, val_data=2, test_data=3) -``` - -We can see the metadata in the dashboard: - -![Metadata visible in the dashboard](../../../.gitbook/assets/metadata_artifact_pipeline.png) - -This method uses a single `generic_step` but adds custom metadata to each artifact. You can later use this metadata to identify and differentiate between artifacts: - -```python -from zenml.client import Client - -client = Client() -artifacts = client.list_artifact_versions("generic_artifact") -for artifact in artifacts: - prefix = artifact.run_metadata.get("custom_prefix") - if prefix == "train": - train_data = artifact.load() - elif prefix == "validation": - val_data = artifact.load() - elif prefix == "test": - test_data = artifact.load() -``` - -Both solutions provide ways to custom-identify your artifacts without modifying -ZenML's core functionality. The factory function approach offers more control -over the artifact name itself, while the metadata approach maintains consistent -artifact names but adds custom metadata for identification. - - -
ZenML Scarf
- - diff --git a/docs/book/toc.md b/docs/book/toc.md index 7ee4aea6e65..d346ec6b8ec 100644 --- a/docs/book/toc.md +++ b/docs/book/toc.md @@ -94,7 +94,6 @@ * [Schedule a pipeline](how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md) * [Deleting a pipeline](how-to/pipeline-development/build-pipelines/delete-a-pipeline.md) * [Compose pipelines](how-to/pipeline-development/build-pipelines/compose-pipelines.md) - * [Dynamically assign artifact names](how-to/pipeline-development/build-pipelines/dynamically-assign-artifact-names.md) * [Automatically retry steps](how-to/pipeline-development/build-pipelines/retry-steps.md) * [Run pipelines asynchronously](how-to/pipeline-development/build-pipelines/run-pipelines-asynchronously.md) * [Control execution order of steps](how-to/pipeline-development/build-pipelines/control-execution-order-of-steps.md) @@ -123,6 +122,7 @@ * [How ZenML stores data](how-to/data-artifact-management/handle-data-artifacts/artifact-versioning.md) * [Return multiple outputs from a step](how-to/data-artifact-management/handle-data-artifacts/return-multiple-outputs-from-a-step.md) * [Delete an artifact](how-to/data-artifact-management/handle-data-artifacts/delete-an-artifact.md) + * [Artifacts naming](how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md) * [Organize data with tags](how-to/data-artifact-management/handle-data-artifacts/tagging.md) * [Get arbitrary artifacts in a step](how-to/data-artifact-management/handle-data-artifacts/get-arbitrary-artifacts-in-a-step.md) * [Handle custom data types](how-to/data-artifact-management/handle-data-artifacts/handle-custom-data-types.md) From 2d20ad06a7265708b9fef1dc4adf75628a4c2aec Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:58:26 +0100 Subject: [PATCH 04/14] silent ruff by intent --- tests/integration/functional/steps/test_naming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/functional/steps/test_naming.py b/tests/integration/functional/steps/test_naming.py index 44e3d981083..ae42f912a60 100644 --- a/tests/integration/functional/steps/test_naming.py +++ b/tests/integration/functional/steps/test_naming.py @@ -26,7 +26,7 @@ def func_namer(): return "dummy_dynamic_" + str(43) -lambda_namer = lambda: "dummy_dynamic_" + str(42) +lambda_namer = lambda: "dummy_dynamic_" + str(42) # noqa str_namer = "dummy_dynamic_time_{time}" static_namer = "dummy_static" From f2c6d3a46d13fc7e7f1f5b8558b89256cce96d34 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:42:55 +0100 Subject: [PATCH 05/14] rework name evaluation logic and extend testing --- src/zenml/artifacts/artifact_config.py | 40 +++++++++++++++++- src/zenml/steps/utils.py | 42 ++++++++----------- .../functional/steps/test_naming.py | 18 +++++++- 3 files changed, 71 insertions(+), 29 deletions(-) diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index 6a05ed43375..d974ababe0b 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -20,6 +20,7 @@ from zenml.logger import get_logger from zenml.metadata.metadata_types import MetadataType from zenml.utils.pydantic_utils import before_validator_handler +from zenml.utils.string_utils import format_name_template logger = get_logger(__name__) @@ -43,7 +44,10 @@ def my_step() -> Annotated[ ``` Attributes: - name: The name of the artifact. + name: The name of the artifact: + - static string e.g. "name" + - dynamic callable e.g. lambda: "name"+str(42) + - dynamic string e.g. "name_{date}_{time}" version: The version of the artifact. tags: The tags of the artifact. run_metadata: Metadata to add to the artifact. @@ -51,7 +55,7 @@ def my_step() -> Annotated[ is_deployment_artifact: Whether the artifact is a deployment artifact. """ - name: Optional[Union[str, Callable]] = Field( + name: Optional[Union[str, Callable[[], str]]] = Field( default=None, union_mode="smart" ) version: Optional[Union[str, int]] = Field( @@ -87,3 +91,35 @@ def _remove_old_attributes(cls, data: Dict[str, Any]) -> Dict[str, Any]: ) return data + + @model_validator(mode="after") + def artifact_config_after_validator(self) -> "ArtifactConfig": + """Artifact config after validator. + + Returns: + The artifact config. + """ + if isinstance(self.name, str): + _name = format_name_template(self.name) + self._is_dynamic = _name != self.name + self.name = _name + elif callable(self.name): + self.name = self.name() + self._is_dynamic = True + return self + + @property + def _evaluated_name(self) -> Optional[str]: + """Evaluated name of the artifact. + + Returns: + The evaluated name of the artifact. + + Raises: + RuntimeError: If the name is still a callable. + """ + if callable(self.name): + raise RuntimeError( + "Artifact name is still a callable, evaluation error happened. Contact ZenML team to follow-up." + ) + return self.name diff --git a/src/zenml/steps/utils.py b/src/zenml/steps/utils.py index 63e9c1b7c72..1f35e25d4cd 100644 --- a/src/zenml/steps/utils.py +++ b/src/zenml/steps/utils.py @@ -18,7 +18,16 @@ import contextlib import inspect import textwrap -from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Mapping, + Optional, + Tuple, + Union, +) from uuid import UUID from pydantic import BaseModel @@ -26,6 +35,7 @@ from zenml.artifacts.artifact_config import ArtifactConfig from zenml.client import Client +from zenml.config.step_configurations import ArtifactConfiguration from zenml.enums import ( ArtifactSaveType, ExecutionStatus, @@ -36,7 +46,6 @@ from zenml.metadata.metadata_types import MetadataType from zenml.steps.step_context import get_step_context from zenml.utils import settings_utils, source_code_utils, typing_utils -from zenml.utils.string_utils import format_name_template if TYPE_CHECKING: from zenml.steps import BaseStep @@ -97,7 +106,7 @@ def get_args(obj: Any) -> Tuple[Any, ...]: def parse_return_type_annotations( func: Callable[..., Any], enforce_type_annotations: bool = False, - original_outputs: Dict[str, ArtifactConfig] = None, + original_outputs: Optional[Mapping[str, ArtifactConfiguration]] = None, ) -> Dict[str, OutputSignature]: """Parse the return type annotation of a step function. @@ -159,7 +168,7 @@ def parse_return_type_annotations( if artifact_config._is_dynamic and original_names: output_name = original_names[i] else: - output_name = artifact_config.name + output_name = artifact_config._evaluated_name else: output_name = None has_custom_name = output_name is not None @@ -183,7 +192,7 @@ def parse_return_type_annotations( if artifact_config._is_dynamic and original_names: output_name = original_names[0] else: - output_name = artifact_config.name + output_name = artifact_config._evaluated_name else: output_name = None has_custom_name = output_name is not None @@ -263,34 +272,19 @@ def get_artifact_config_from_annotation_metadata( # `Annotated[int, 'output_name', ArtifactConfig(...)]` output_name = None artifact_config = None - is_dynamic = False for metadata_instance in metadata: if isinstance(metadata_instance, str): if output_name is not None: raise ValueError(error_message) - output_name = format_name_template(metadata_instance) - is_dynamic = output_name != metadata_instance + output_name = metadata_instance elif isinstance(metadata_instance, ArtifactConfig): if artifact_config is not None: raise ValueError(error_message) artifact_config = metadata_instance - if isinstance(artifact_config.name, str): - _name = format_name_template(artifact_config.name) - is_dynamic = _name != metadata_instance - artifact_config.name = _name - elif isinstance(artifact_config.name, Callable): - _name = artifact_config.name() - if not isinstance(_name, str): - raise ValueError(error_message) - artifact_config.name = _name - is_dynamic = True - elif isinstance(metadata_instance, Callable): + elif callable(metadata_instance): if output_name is not None: raise ValueError(error_message) - output_name = metadata_instance() - if not isinstance(output_name, str): - raise ValueError(error_message) - is_dynamic = True + output_name = metadata_instance else: raise ValueError(error_message) @@ -308,8 +302,6 @@ def get_artifact_config_from_annotation_metadata( if artifact_config and artifact_config.name == "": raise ValueError("Output name cannot be an empty string.") - artifact_config._is_dynamic = is_dynamic - return artifact_config diff --git a/tests/integration/functional/steps/test_naming.py b/tests/integration/functional/steps/test_naming.py index ae42f912a60..860f14430f5 100644 --- a/tests/integration/functional/steps/test_naming.py +++ b/tests/integration/functional/steps/test_naming.py @@ -120,12 +120,26 @@ def mixed_tuple_artifact_config() -> ( ], ) def test_various_naming_scenarios(step: Callable, clean_client: Client): + """Test that dynamic naming works in both normal and cached runs. + + In cached run the names of the dynamic artifacts shall remain same as in real run. + """ + @pipeline def _inner(): step() - p: PipelineRunResponse = _inner() - for step_response in p.steps.values(): + p1: PipelineRunResponse = _inner.with_options(enable_cache=False)() + for step_response in p1.steps.values(): + for k in step_response.outputs.keys(): + value = clean_client.get_artifact_version(k).load() + assert _validate_name_by_value(k, value) + + p2: PipelineRunResponse = _inner.with_options(enable_cache=True)() + for step_response in p2.steps.values(): + assert set(step_response.outputs.keys()) == set( + p1.steps[step_response.name].outputs.keys() + ) for k in step_response.outputs.keys(): value = clean_client.get_artifact_version(k).load() assert _validate_name_by_value(k, value) From 32ea721c3f21d8df8cd5e7c636541a25affe9032 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:48:25 +0100 Subject: [PATCH 06/14] highlight the impact of cache on naming --- .../handle-data-artifacts/artifacts-naming.md | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md index 9c366ba52dd..9f9e34e1d68 100644 --- a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md +++ b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md @@ -85,5 +85,70 @@ def mixed_tuple() -> Tuple[ return "static_namer", "lambda_namer", "func_namer", "str_namer" ``` +## Naming in cached runs + +If your ZenML step is running with enabled caching and cache was used the names of the outputs artifacts (both static and dynamic) will remain the same as in the original run. + +```python +from typing_extensions import Annotated +from typing import Tuple +from random import randint + +from zenml import step, pipeline +from zenml.models import PipelineRunResponse + + +@step +def demo() -> ( + Tuple[ + Annotated[int, lambda: "dummy" + str(randint(0, 42))], + Annotated[int, "dummy_{date}_{time}"], + ] +): + return 42, 43 + + +@pipeline +def my_pipeline(): + demo() + + +if __name__ == "__main__": + run_without_cache: PipelineRunResponse = my_pipeline.with_options( + enable_cache=False + )() + run_with_cache: PipelineRunResponse = my_pipeline.with_options(enable_cache=True)() + + assert set(run_without_cache.steps["demo"].outputs.keys()) == set( + run_with_cache.steps["demo"].outputs.keys() + ) + print(list(run_without_cache.steps["demo"].outputs.keys())) +``` + +These 2 runs will produce output like the one below: +``` +Initiating a new run for the pipeline: my_pipeline. +Caching is disabled by default for my_pipeline. +Using user: default +Using stack: default + orchestrator: default + artifact_store: default +Dashboard URL for Pipeline Run: http://127.0.0.1:8237/runs/e4375452-a72e-45c2-a0df-c59b07089696 +Step demo has started. +Step demo has finished in 0.061s. +Pipeline run has finished in 0.100s. + +Initiating a new run for the pipeline: my_pipeline. +Using user: default +Using stack: default + orchestrator: default + artifact_store: default +Dashboard URL for Pipeline Run: http://127.0.0.1:8237/runs/cb4fffbf-b173-418f-bf4e-11d1bdac377c +Using cached version of step demo. +All steps of the pipeline run were cached. + +['dummy_2024_11_19_10_46_52_341267', 'dummy26'] +``` +
ZenML Scarf
From 791fc13a9df8bf4538cceff6d36f884ef7ed5f9c Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:08:47 +0100 Subject: [PATCH 07/14] reenable macos integration testing --- .github/workflows/integration-test-slow.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration-test-slow.yml b/.github/workflows/integration-test-slow.yml index 94d5f2e2fb5..46054a2f1dd 100644 --- a/.github/workflows/integration-test-slow.yml +++ b/.github/workflows/integration-test-slow.yml @@ -95,8 +95,8 @@ jobs: GCP_US_EAST4_SERVER_USERNAME: ${{ secrets.GCP_US_EAST4_SERVER_USERNAME }} GCP_US_EAST4_SERVER_PASSWORD: ${{ secrets.GCP_US_EAST4_SERVER_PASSWORD }} # TODO: add Windows testing for Python 3.11 and 3.12 back in - # TODO: add macos testing back in - if: ${{ ! startsWith(github.event.head_commit.message, 'GitBook:') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.11') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.12') && ! (inputs.os == 'macos-13' || inputs.os == 'macos-latest') }} + # TODO: add macos 3.9 testing back in after vlmm is resolved + if: ${{ ! startsWith(github.event.head_commit.message, 'GitBook:') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.11') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.12') && ! (inputs.os == 'macos-13' && inputs.python-version == '3.9') }} defaults: run: shell: bash From 63fc128bfd642cb87eb6b56ad739af01f9b22d27 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:12:53 +0100 Subject: [PATCH 08/14] Revert "reenable macos integration testing" This reverts commit 791fc13a9df8bf4538cceff6d36f884ef7ed5f9c. --- .github/workflows/integration-test-slow.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration-test-slow.yml b/.github/workflows/integration-test-slow.yml index 46054a2f1dd..94d5f2e2fb5 100644 --- a/.github/workflows/integration-test-slow.yml +++ b/.github/workflows/integration-test-slow.yml @@ -95,8 +95,8 @@ jobs: GCP_US_EAST4_SERVER_USERNAME: ${{ secrets.GCP_US_EAST4_SERVER_USERNAME }} GCP_US_EAST4_SERVER_PASSWORD: ${{ secrets.GCP_US_EAST4_SERVER_PASSWORD }} # TODO: add Windows testing for Python 3.11 and 3.12 back in - # TODO: add macos 3.9 testing back in after vlmm is resolved - if: ${{ ! startsWith(github.event.head_commit.message, 'GitBook:') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.11') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.12') && ! (inputs.os == 'macos-13' && inputs.python-version == '3.9') }} + # TODO: add macos testing back in + if: ${{ ! startsWith(github.event.head_commit.message, 'GitBook:') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.11') && ! (inputs.os == 'windows-latest' && inputs.python-version == '3.12') && ! (inputs.os == 'macos-13' || inputs.os == 'macos-latest') }} defaults: run: shell: bash From c0e95f5a0af4c3f84f91568db999224da6ef7d48 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:15:44 +0100 Subject: [PATCH 09/14] rework following suggestions --- .../handle-data-artifacts/artifacts-naming.md | 80 ++++++------- src/zenml/artifacts/artifact_config.py | 42 ++----- src/zenml/config/step_configurations.py | 1 + src/zenml/orchestrators/step_runner.py | 18 ++- src/zenml/steps/base_step.py | 9 ++ src/zenml/steps/step_decorator.py | 4 + src/zenml/steps/utils.py | 24 ++-- src/zenml/utils/string_utils.py | 8 +- .../zen_server/template_execution/utils.py | 1 + .../functional/steps/test_naming.py | 107 +++++++++++------- 10 files changed, 162 insertions(+), 132 deletions(-) diff --git a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md index 9f9e34e1d68..c3c513e5668 100644 --- a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md +++ b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md @@ -10,7 +10,7 @@ ZenML uses type annotations in function definitions to determine artifact names. ZenML provides flexible options for naming output artifacts, supporting both static and dynamic naming strategies: - Names can be generated dynamically at runtime -- Support for lambda functions, callable functions, and string templates +- Support for string templates (standard and custom placeholders supported) - Compatible with single and multiple output scenarios - Annotations help define naming strategy without modifying core logic @@ -28,41 +28,45 @@ def static_single() -> Annotated[str, "static_output_name"]: ### Dynamic Naming Dynamic names can be generated using: -#### Lambda Functions -```python -from random import randint +#### String Templates Using Standard Placeholders +Use the following placeholders that ZenML will replace automatically: -lambda_namer = lambda: "dynamic_name_" + str(randint(0,42)) +* `{date}` will resolve to the current date, e.g. `2024_11_18` +* `{time}` will resolve to the current time, e.g. `11_07_09_326492` + +```python +str_namer = "placeholder_name_{date}_{time}" @step -def dynamic_single_lambda() -> Annotated[str, lambda_namer]: +def dynamic_single_string() -> Annotated[str, str_namer]: return "null" ``` -#### Callable Functions -```python -from random import randint +#### String Templates Using Custom Placeholders +Use any placeholders that ZenML will replace for you, if they are provided into a step via `extra_name_placeholders` parameter: -def func_namer(): - return "dummy_dynamic_" + str(randint(0,42)) +```python +str_namer = "placeholder_name_{custom_placeholder}_{time}" -@step -def dynamic_single_callable() -> Annotated[str, func_namer]: +@step(extra_name_placeholders={"custom_placeholder": "some_substitute"}) +def dynamic_single_string() -> Annotated[str, str_namer]: return "null" ``` -#### String Templates -Use the following placeholders that ZenML will replace: - -* `{date}` will resolve to the current date, e.g. `2024_11_18` -* `{time}` will resolve to the current time, e.g. `11_07_09_326492` +Another option is to use `with_options` to dynamically redefine the placeholder, like this: ```python -str_namer = "placeholder_name_{date}_{time}" +str_namer = "{stage}_dataset" @step -def dynamic_single_string() -> Annotated[str, str_namer]: - return "null" +def extract_data(source: str) -> Annotated[str, str_namer]: + ... + return "my data" + +@pipeline +def extraction_pipeline(): + extract_data.with_options(extra_name_placeholders={"stage": "train"})(source="s3://train") + extract_data.with_options(extra_name_placeholders={"stage": "test"})(source="s3://test") ``` ### Multiple Output Handling @@ -70,19 +74,12 @@ def dynamic_single_string() -> Annotated[str, str_namer]: If you plan to return multiple artifacts from you ZenML step you can flexibly combine all naming options outlined above, like this: ```python -from random import randint - -def func_namer(): - return "dummy_dynamic_" + str(randint(0,42)) - @step def mixed_tuple() -> Tuple[ Annotated[str, "static_output_name"], - Annotated[str, lambda: "dynamic_name_" + str(randint(0,42))], - Annotated[str, func_namer], Annotated[str, "placeholder_name_{date}_{time}"], ]: - return "static_namer", "lambda_namer", "func_namer", "str_namer" + return "static_namer", "str_namer" ``` ## Naming in cached runs @@ -92,19 +89,16 @@ If your ZenML step is running with enabled caching and cache was used the names ```python from typing_extensions import Annotated from typing import Tuple -from random import randint from zenml import step, pipeline from zenml.models import PipelineRunResponse -@step -def demo() -> ( - Tuple[ - Annotated[int, lambda: "dummy" + str(randint(0, 42))], - Annotated[int, "dummy_{date}_{time}"], - ] -): +@step(extra_name_placeholders={"custom_placeholder": "resolution"}) +def demo() -> Tuple[ + Annotated[int, "dummy_{date}_{time}"], + Annotated[int, "dummy_{custom_placeholder}"], +]: return 42, 43 @@ -133,21 +127,19 @@ Using user: default Using stack: default orchestrator: default artifact_store: default -Dashboard URL for Pipeline Run: http://127.0.0.1:8237/runs/e4375452-a72e-45c2-a0df-c59b07089696 +You can visualize your pipeline runs in the ZenML Dashboard. In order to try it locally, please run zenml login --local. Step demo has started. -Step demo has finished in 0.061s. -Pipeline run has finished in 0.100s. - +Step demo has finished in 0.038s. +Pipeline run has finished in 0.064s. Initiating a new run for the pipeline: my_pipeline. Using user: default Using stack: default orchestrator: default artifact_store: default -Dashboard URL for Pipeline Run: http://127.0.0.1:8237/runs/cb4fffbf-b173-418f-bf4e-11d1bdac377c +You can visualize your pipeline runs in the ZenML Dashboard. In order to try it locally, please run zenml login --local. Using cached version of step demo. All steps of the pipeline run were cached. - -['dummy_2024_11_19_10_46_52_341267', 'dummy26'] +['dummy_2024_11_21_14_27_33_750134', 'dummy_resolution'] ``` diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index d974ababe0b..0d26edead54 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -13,7 +13,8 @@ # permissions and limitations under the License. """Artifact Config classes to support Model Control Plane feature.""" -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union +from uuid import UUID, uuid4 from pydantic import BaseModel, Field, PrivateAttr, model_validator @@ -55,9 +56,7 @@ def my_step() -> Annotated[ is_deployment_artifact: Whether the artifact is a deployment artifact. """ - name: Optional[Union[str, Callable[[], str]]] = Field( - default=None, union_mode="smart" - ) + name: Optional[str] = Field(default=None, union_mode="smart") version: Optional[Union[str, int]] = Field( default=None, union_mode="smart" ) @@ -67,7 +66,7 @@ def my_step() -> Annotated[ is_model_artifact: bool = False is_deployment_artifact: bool = False - _is_dynamic: bool = PrivateAttr(False) + _unique_name: UUID = PrivateAttr(default_factory=uuid4) @model_validator(mode="before") @classmethod @@ -92,34 +91,17 @@ def _remove_old_attributes(cls, data: Dict[str, Any]) -> Dict[str, Any]: return data - @model_validator(mode="after") - def artifact_config_after_validator(self) -> "ArtifactConfig": - """Artifact config after validator. - - Returns: - The artifact config. - """ - if isinstance(self.name, str): - _name = format_name_template(self.name) - self._is_dynamic = _name != self.name - self.name = _name - elif callable(self.name): - self.name = self.name() - self._is_dynamic = True - return self - - @property - def _evaluated_name(self) -> Optional[str]: + def _evaluated_name( + self, extra_name_placeholders: Dict[str, str] + ) -> Optional[str]: """Evaluated name of the artifact. + Args: + extra_name_placeholders: Extra placeholders to use in the name template. + Returns: The evaluated name of the artifact. - - Raises: - RuntimeError: If the name is still a callable. """ - if callable(self.name): - raise RuntimeError( - "Artifact name is still a callable, evaluation error happened. Contact ZenML team to follow-up." - ) + if self.name: + return format_name_template(self.name, **extra_name_placeholders) return self.name diff --git a/src/zenml/config/step_configurations.py b/src/zenml/config/step_configurations.py index b7c4764ba54..b8a76b16697 100644 --- a/src/zenml/config/step_configurations.py +++ b/src/zenml/config/step_configurations.py @@ -152,6 +152,7 @@ class StepConfigurationUpdate(StrictBaseModel): success_hook_source: Optional[SourceWithValidator] = None model: Optional[Model] = None retry: Optional[StepRetryConfig] = None + extra_name_placeholders: Optional[Dict[str, str]] = None outputs: Mapping[str, PartialArtifactConfiguration] = {} diff --git a/src/zenml/orchestrators/step_runner.py b/src/zenml/orchestrators/step_runner.py index 160b800a2c6..deeea623153 100644 --- a/src/zenml/orchestrators/step_runner.py +++ b/src/zenml/orchestrators/step_runner.py @@ -150,9 +150,25 @@ def run( output_annotations = parse_return_type_annotations( func=step_instance.entrypoint, - original_outputs=step_run_info.config.outputs, + original_output_names=list(output_materializers.keys()), ) + for k, v in list(output_annotations.items()): + if v.artifact_config: + _evaluated_name = v.artifact_config._evaluated_name( + step_run.config.extra_name_placeholders or {} + ) + if _evaluated_name: + output_materializers[_evaluated_name] = ( + output_materializers.pop(k) + ) + output_artifact_uris[_evaluated_name] = ( + output_artifact_uris.pop(k) + ) + output_annotations[_evaluated_name] = ( + output_annotations.pop(k) + ) + self._stack.prepare_step_run(info=step_run_info) # Initialize the step context singleton diff --git a/src/zenml/steps/base_step.py b/src/zenml/steps/base_step.py index b8ba79315ea..4dee61f7659 100644 --- a/src/zenml/steps/base_step.py +++ b/src/zenml/steps/base_step.py @@ -116,6 +116,7 @@ def __init__( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, retry: Optional[StepRetryConfig] = None, + extra_name_placeholders: Optional[Dict[str, str]] = None, ) -> None: """Initializes a step. @@ -144,6 +145,7 @@ def __init__( function (e.g. `module.my_function`). model: configuration of the model version in the Model Control Plane. retry: Configuration for retrying the step in case of failure. + extra_name_placeholders: Extra placeholders to use in the name template. """ from zenml.config.step_configurations import PartialStepConfiguration @@ -203,6 +205,7 @@ def __init__( on_success=on_success, model=model, retry=retry, + extra_name_placeholders=extra_name_placeholders, ) notebook_utils.try_to_save_notebook_cell_code(self.source_object) @@ -595,6 +598,7 @@ def configure( model: Optional["Model"] = None, merge: bool = True, retry: Optional[StepRetryConfig] = None, + extra_name_placeholders: Optional[Dict[str, str]] = None, ) -> T: """Configures the step. @@ -637,6 +641,7 @@ def configure( overwrite all existing ones. See the general description of this method for an example. retry: Configuration for retrying the step in case of failure. + extra_name_placeholders: Extra placeholders to use in the name template. Returns: The step instance that this method was called on. @@ -701,6 +706,7 @@ def _convert_to_tuple(value: Any) -> Tuple[Source, ...]: "success_hook_source": success_hook_source, "model": model, "retry": retry, + "extra_name_placeholders": extra_name_placeholders, } ) config = StepConfigurationUpdate(**values) @@ -725,6 +731,7 @@ def with_options( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, merge: bool = True, + extra_name_placeholders: Optional[Dict[str, str]] = None, ) -> "BaseStep": """Copies the step and applies the given configurations. @@ -756,6 +763,7 @@ def with_options( configurations. If `False` the given configurations will overwrite all existing ones. See the general description of this method for an example. + extra_name_placeholders: Extra placeholders for the step name. Returns: The copied step instance. @@ -776,6 +784,7 @@ def with_options( on_success=on_success, model=model, merge=merge, + extra_name_placeholders=extra_name_placeholders, ) return step_copy diff --git a/src/zenml/steps/step_decorator.py b/src/zenml/steps/step_decorator.py index 7ab22f99fbe..98dc1d6ca72 100644 --- a/src/zenml/steps/step_decorator.py +++ b/src/zenml/steps/step_decorator.py @@ -73,6 +73,7 @@ def step( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, retry: Optional["StepRetryConfig"] = None, + extra_name_placeholders: Optional[Dict[str, str]] = None, ) -> Callable[["F"], "BaseStep"]: ... @@ -93,6 +94,7 @@ def step( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, retry: Optional["StepRetryConfig"] = None, + extra_name_placeholders: Optional[Dict[str, str]] = None, ) -> Union["BaseStep", Callable[["F"], "BaseStep"]]: """Decorator to create a ZenML step. @@ -124,6 +126,7 @@ def step( (e.g. `module.my_function`). model: configuration of the model in the Model Control Plane. retry: configuration of step retry in case of step failure. + extra_name_placeholders: Extra placeholders for the step name. Returns: The step instance. @@ -157,6 +160,7 @@ def inner_decorator(func: "F") -> "BaseStep": on_success=on_success, model=model, retry=retry, + extra_name_placeholders=extra_name_placeholders, ) return step_instance diff --git a/src/zenml/steps/utils.py b/src/zenml/steps/utils.py index 1f35e25d4cd..f5863ebb06c 100644 --- a/src/zenml/steps/utils.py +++ b/src/zenml/steps/utils.py @@ -23,7 +23,7 @@ Any, Callable, Dict, - Mapping, + List, Optional, Tuple, Union, @@ -35,7 +35,6 @@ from zenml.artifacts.artifact_config import ArtifactConfig from zenml.client import Client -from zenml.config.step_configurations import ArtifactConfiguration from zenml.enums import ( ArtifactSaveType, ExecutionStatus, @@ -106,7 +105,7 @@ def get_args(obj: Any) -> Tuple[Any, ...]: def parse_return_type_annotations( func: Callable[..., Any], enforce_type_annotations: bool = False, - original_outputs: Optional[Mapping[str, ArtifactConfiguration]] = None, + original_output_names: Optional[List[str]] = None, ) -> Dict[str, OutputSignature]: """Parse the return type annotation of a step function. @@ -114,7 +113,7 @@ def parse_return_type_annotations( func: The step function. enforce_type_annotations: If `True`, raises an exception if a type annotation is missing. - original_outputs: The original outputs of the step function. + original_output_names: The original output names of the step function. Raises: RuntimeError: If the output annotation has variable length or contains @@ -145,11 +144,6 @@ def parse_return_type_annotations( else: return_annotation = Any - if original_outputs: - original_names = list(original_outputs.keys()) - else: - original_names = None - if typing_utils.get_origin(return_annotation) is tuple: requires_multiple_artifacts = has_tuple_return(func) if requires_multiple_artifacts: @@ -165,10 +159,10 @@ def parse_return_type_annotations( annotation ) if artifact_config: - if artifact_config._is_dynamic and original_names: - output_name = original_names[i] + if original_output_names: + output_name = original_output_names[i] else: - output_name = artifact_config._evaluated_name + output_name = artifact_config._unique_name.hex else: output_name = None has_custom_name = output_name is not None @@ -189,10 +183,10 @@ def parse_return_type_annotations( return_annotation ) if artifact_config: - if artifact_config._is_dynamic and original_names: - output_name = original_names[0] + if original_output_names: + output_name = original_output_names[0] else: - output_name = artifact_config._evaluated_name + output_name = artifact_config._unique_name.hex else: output_name = None has_custom_name = output_name is not None diff --git a/src/zenml/utils/string_utils.py b/src/zenml/utils/string_utils.py index 1b5091aff46..b76029fbb00 100644 --- a/src/zenml/utils/string_utils.py +++ b/src/zenml/utils/string_utils.py @@ -170,7 +170,13 @@ def format_name_template( "time", datetime.datetime.now(datetime.timezone.utc).strftime("%H_%M_%S_%f"), ) - return name_template.format(**kwargs) + try: + return name_template.format(**kwargs) + except KeyError as e: + raise KeyError( + f"Could not format the name template `{name_template}`. " + f"Missing key: {e}" + ) def substitute_string(value: V, substitution_func: Callable[[str], str]) -> V: diff --git a/src/zenml/zen_server/template_execution/utils.py b/src/zenml/zen_server/template_execution/utils.py index 0bc5620cf48..04424db409e 100644 --- a/src/zenml/zen_server/template_execution/utils.py +++ b/src/zenml/zen_server/template_execution/utils.py @@ -363,6 +363,7 @@ def deployment_request_from_template( "external_input_artifacts", "model_artifacts_or_metadata", "client_lazy_loaders", + "extra_name_placeholders", "outputs", } ), diff --git a/tests/integration/functional/steps/test_naming.py b/tests/integration/functional/steps/test_naming.py index 860f14430f5..c8d4a004ee3 100644 --- a/tests/integration/functional/steps/test_naming.py +++ b/tests/integration/functional/steps/test_naming.py @@ -21,64 +21,57 @@ from zenml.client import Client from zenml.models.v2.core.pipeline_run import PipelineRunResponse - -def func_namer(): - return "dummy_dynamic_" + str(43) - - -lambda_namer = lambda: "dummy_dynamic_" + str(42) # noqa -str_namer = "dummy_dynamic_time_{time}" +str_namer_standard = "dummy_dynamic_dt_{date}_{time}" +str_namer_custom = "dummy_dynamic_custom_{funny_name}" static_namer = "dummy_static" def _validate_name_by_value(name: str, value: str) -> bool: - if value == "func_namer": - return name == func_namer() - if value == "lambda_namer": - return name == lambda_namer() - if value == "str_namer": - return name.startswith("dummy_dynamic_time_") + if value == "str_namer_standard": + return name.startswith("dummy_dynamic_dt_") + if value == "str_namer_custom": + return name.startswith("dummy_dynamic_custom_") if value == "static_namer": return name == "dummy_static" return False @step -def dynamic_single_lambda() -> Annotated[str, lambda_namer]: - return "lambda_namer" +def dynamic_single_string_standard() -> Annotated[str, str_namer_standard]: + return "str_namer_standard" -@step -def dynamic_single_callable() -> Annotated[str, func_namer]: - return "func_namer" +@step(extra_name_placeholders={"funny_name": "name_placeholder"}) +def dynamic_single_string_custom() -> Annotated[str, str_namer_custom]: + return "str_namer_custom" @step -def dynamic_single_string() -> Annotated[str, str_namer]: - return "str_namer" +def dynamic_single_string_custom_no_default() -> ( + Annotated[str, str_namer_custom] +): + return "str_namer_custom" -@step +@step(extra_name_placeholders={"funny_name": "name_placeholder"}) def dynamic_tuple() -> ( Tuple[ - Annotated[str, lambda_namer], - Annotated[str, func_namer], - Annotated[str, str_namer], + Annotated[str, str_namer_standard], + Annotated[str, str_namer_custom], ] ): - return "lambda_namer", "func_namer", "str_namer" + return "str_namer_standard", "str_namer_custom" -@step +@step(extra_name_placeholders={"funny_name": "name_placeholder"}) def mixed_tuple() -> ( Tuple[ + Annotated[str, str_namer_standard], Annotated[str, static_namer], - Annotated[str, lambda_namer], - Annotated[str, func_namer], - Annotated[str, str_namer], + Annotated[str, str_namer_custom], ] ): - return "static_namer", "lambda_namer", "func_namer", "str_namer" + return "str_namer_standard", "static_namer", "str_namer_custom" @step @@ -86,33 +79,30 @@ def static_single() -> Annotated[str, static_namer]: return "static_namer" -@step +@step(extra_name_placeholders={"funny_name": "name_placeholder"}) def mixed_tuple_artifact_config() -> ( Tuple[ Annotated[str, ArtifactConfig(name=static_namer)], - Annotated[str, ArtifactConfig(name=lambda_namer)], - Annotated[str, ArtifactConfig(name=func_namer)], - Annotated[str, ArtifactConfig(name=str_namer)], + Annotated[str, ArtifactConfig(name=str_namer_standard)], + Annotated[str, ArtifactConfig(name=str_namer_custom)], ] ): - return "static_namer", "lambda_namer", "func_namer", "str_namer" + return "static_namer", "str_namer_standard", "str_namer_custom" @pytest.mark.parametrize( "step", [ - dynamic_single_lambda, - dynamic_single_callable, - dynamic_single_string, + dynamic_single_string_standard, + dynamic_single_string_custom, dynamic_tuple, mixed_tuple, static_single, mixed_tuple_artifact_config, ], ids=[ - "dynamic_single_lambda", - "dynamic_single_callable", - "dynamic_single_string", + "dynamic_single_string_standard", + "dynamic_single_string_custom", "dynamic_tuple", "mixed_tuple", "static_single", @@ -143,3 +133,38 @@ def _inner(): for k in step_response.outputs.keys(): value = clean_client.get_artifact_version(k).load() assert _validate_name_by_value(k, value) + + +def test_sequential_executions_have_different_names(clean_client: "Client"): + """Test that dynamic naming works each time for unique uncached runs.""" + + @pipeline(enable_cache=False) + def _inner(name_placeholder: str): + dynamic_single_string_custom.with_options( + extra_name_placeholders={"funny_name": name_placeholder} + )() + + p1: PipelineRunResponse = _inner("funny_name_42") + p2: PipelineRunResponse = _inner("this_is_not_funny") + + assert set(p1.steps["dynamic_single_string_custom"].outputs.keys()) != set( + p2.steps["dynamic_single_string_custom"].outputs.keys() + ) + + +def test_execution_fails_on_custom_but_not_provided_name( + clean_client: "Client", +): + """Test that dynamic naming fails on custom placeholder, if they are not provided.""" + + @pipeline(enable_cache=False) + def _inner(): + dynamic_single_string_custom_no_default.with_options( + extra_name_placeholders={"not_a_funny_name": "it's gonna fail"} + )() + + with pytest.raises( + KeyError, + match="Could not format the name template `dummy_dynamic_custom_{funny_name}`. Missing key: 'funny_name'", + ): + _inner() From 1a853c21fe5e4a202a546b3e20c56a6e5d2bb1d2 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:15:51 +0100 Subject: [PATCH 10/14] rework following suggestions --- src/zenml/artifacts/artifact_config.py | 2 +- src/zenml/utils/string_utils.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index 0d26edead54..869f996466b 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -56,7 +56,7 @@ def my_step() -> Annotated[ is_deployment_artifact: Whether the artifact is a deployment artifact. """ - name: Optional[str] = Field(default=None, union_mode="smart") + name: Optional[str] = None version: Optional[Union[str, int]] = Field( default=None, union_mode="smart" ) diff --git a/src/zenml/utils/string_utils.py b/src/zenml/utils/string_utils.py index b76029fbb00..8b2701c199c 100644 --- a/src/zenml/utils/string_utils.py +++ b/src/zenml/utils/string_utils.py @@ -161,6 +161,9 @@ def format_name_template( Returns: The formatted name template. + + Raises: + KeyError: If a key in template is missing in the kwargs. """ kwargs["date"] = kwargs.get( "date", From a2d61bb5f214e8e578560df96c12942137031d73 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:51:15 +0100 Subject: [PATCH 11/14] functional dynamic naming using placeholders --- src/zenml/artifacts/artifact_config.py | 18 +++++++--- src/zenml/artifacts/utils.py | 9 +++++ src/zenml/models/v2/core/artifact.py | 10 ++++++ src/zenml/models/v2/core/artifact_version.py | 4 +++ src/zenml/orchestrators/step_runner.py | 4 ++- src/zenml/steps/base_step.py | 3 +- src/zenml/steps/entrypoint_function_utils.py | 6 ++-- src/zenml/steps/utils.py | 30 ++++++++-------- ...6a2b_extend_artifact_with_original_name.py | 36 +++++++++++++++++++ .../zen_stores/schemas/artifact_schemas.py | 4 +++ src/zenml/zen_stores/sql_zen_store.py | 11 ++++-- .../functional/steps/test_naming.py | 35 ++++++++++++++++++ tests/unit/steps/test_utils.py | 4 +-- 13 files changed, 148 insertions(+), 26 deletions(-) create mode 100644 src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index 869f996466b..aff33eedb34 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -13,10 +13,10 @@ # permissions and limitations under the License. """Artifact Config classes to support Model Control Plane feature.""" +import re from typing import Any, Dict, List, Optional, Union -from uuid import UUID, uuid4 -from pydantic import BaseModel, Field, PrivateAttr, model_validator +from pydantic import BaseModel, Field, model_validator from zenml.logger import get_logger from zenml.metadata.metadata_types import MetadataType @@ -66,8 +66,6 @@ def my_step() -> Annotated[ is_model_artifact: bool = False is_deployment_artifact: bool = False - _unique_name: UUID = PrivateAttr(default_factory=uuid4) - @model_validator(mode="before") @classmethod @before_validator_handler @@ -105,3 +103,15 @@ def _evaluated_name( if self.name: return format_name_template(self.name, **extra_name_placeholders) return self.name + + @property + def _original_name(self) -> Optional[str]: + """Original name of the dynamic artifact. + + Returns: + The original name of the dynamic artifact. + """ + pattern = r"\{[^}]+\}" + if re.findall(pattern, str(self.name)): + return self.name + return None diff --git a/src/zenml/artifacts/utils.py b/src/zenml/artifacts/utils.py index e18485d42ab..415a2400da8 100644 --- a/src/zenml/artifacts/utils.py +++ b/src/zenml/artifacts/utils.py @@ -123,6 +123,7 @@ def _store_artifact_data_and_prepare_request( store_visualizations: bool = True, has_custom_name: bool = True, metadata: Optional[Dict[str, "MetadataType"]] = None, + original_name: Optional[str] = None, ) -> ArtifactVersionRequest: """Store artifact data and prepare a request to the server. @@ -141,6 +142,7 @@ def _store_artifact_data_and_prepare_request( has_custom_name: Whether the artifact has a custom name. metadata: Metadata to store for the artifact version. This will be ignored if `store_metadata` is set to `False`. + original_name: The original name of the dynamic artifact. Returns: Artifact version request for the artifact data that was stored. @@ -174,6 +176,7 @@ def _store_artifact_data_and_prepare_request( artifact_version_request = ArtifactVersionRequest( artifact_name=name, + artifact_original_name=original_name, version=version, tags=tags, type=materializer.ASSOCIATED_ARTIFACT_TYPE, @@ -209,6 +212,7 @@ def save_artifact( # TODO: remove these once external artifact does not use this function anymore save_type: ArtifactSaveType = ArtifactSaveType.MANUAL, has_custom_name: bool = True, + original_name: Optional[str] = None, ) -> "ArtifactVersionResponse": """Upload and publish an artifact. @@ -231,6 +235,7 @@ def save_artifact( save_type: The type of save operation that created the artifact version. has_custom_name: If the artifact name is custom and should be listed in the dashboard "Artifacts" tab. + original_name: The original name of the dynamic artifact. Returns: The saved artifact response. @@ -269,6 +274,7 @@ def save_artifact( artifact_version_request = _store_artifact_data_and_prepare_request( data=data, name=name, + original_name=original_name, uri=uri, materializer_class=materializer_class, save_type=save_type, @@ -302,6 +308,7 @@ def register_artifact( is_model_artifact: bool = False, is_deployment_artifact: bool = False, artifact_metadata: Dict[str, "MetadataType"] = {}, + original_name: Optional[str] = None, ) -> "ArtifactVersionResponse": """Register existing data stored in the artifact store as a ZenML Artifact. @@ -317,6 +324,7 @@ def register_artifact( is_model_artifact: If the artifact is a model artifact. is_deployment_artifact: If the artifact is a deployment artifact. artifact_metadata: Metadata dictionary to attach to the artifact version. + original_name: The original name of the dynamic artifact. Returns: The saved artifact response. @@ -344,6 +352,7 @@ def register_artifact( artifact_version_request = ArtifactVersionRequest( artifact_name=name, + artifact_original_name=original_name or name, version=version, tags=tags, type=ArtifactType.DATA, diff --git a/src/zenml/models/v2/core/artifact.py b/src/zenml/models/v2/core/artifact.py index c62a7cee1a5..112f2259c1c 100644 --- a/src/zenml/models/v2/core/artifact.py +++ b/src/zenml/models/v2/core/artifact.py @@ -42,6 +42,11 @@ class ArtifactRequest(BaseRequest): title="Name of the artifact.", max_length=STR_FIELD_MAX_LENGTH, ) + original_name: Optional[str] = Field( + title="Original name of the artifact.", + max_length=STR_FIELD_MAX_LENGTH, + default=None, + ) has_custom_name: bool = Field( title="Whether the name is custom (True) or auto-generated (False).", default=False, @@ -114,6 +119,11 @@ def get_hydrated_version(self) -> "ArtifactResponse": title="Name of the output in the parent step.", max_length=STR_FIELD_MAX_LENGTH, ) + original_name: Optional[str] = Field( + title="Original name of the artifact.", + max_length=STR_FIELD_MAX_LENGTH, + default=None, + ) # Body and metadata properties @property diff --git a/src/zenml/models/v2/core/artifact_version.py b/src/zenml/models/v2/core/artifact_version.py index d26b3bceef4..7e5bab230cb 100644 --- a/src/zenml/models/v2/core/artifact_version.py +++ b/src/zenml/models/v2/core/artifact_version.py @@ -75,6 +75,10 @@ class ArtifactVersionRequest(WorkspaceScopedRequest): default=None, title="Name of the artifact to which this version belongs.", ) + artifact_original_name: Optional[str] = Field( + default=None, + title="Original name of the artifact to which this version belongs.", + ) version: Optional[Union[int, str]] = Field( default=None, title="Version of the artifact." ) diff --git a/src/zenml/orchestrators/step_runner.py b/src/zenml/orchestrators/step_runner.py index deeea623153..c5185a53a53 100644 --- a/src/zenml/orchestrators/step_runner.py +++ b/src/zenml/orchestrators/step_runner.py @@ -597,8 +597,9 @@ def _store_output_artifacts( if artifact_config is not None: has_custom_name = bool(artifact_config.name) version = artifact_config.version + original_name = artifact_config._original_name else: - has_custom_name, version = False, None + has_custom_name, version, original_name = False, None, None # Override the artifact name if it is not a custom name. if has_custom_name: @@ -619,6 +620,7 @@ def _store_output_artifacts( artifact_request = _store_artifact_data_and_prepare_request( name=artifact_name, + original_name=original_name, data=return_value, materializer_class=materializer_class, uri=uri, diff --git a/src/zenml/steps/base_step.py b/src/zenml/steps/base_step.py index 4dee61f7659..20734a3ee6b 100644 --- a/src/zenml/steps/base_step.py +++ b/src/zenml/steps/base_step.py @@ -150,7 +150,8 @@ def __init__( from zenml.config.step_configurations import PartialStepConfiguration self.entrypoint_definition = validate_entrypoint_function( - self.entrypoint, reserved_arguments=["after", "id"] + self.entrypoint, + reserved_arguments=["after", "id"], ) name = name or self.__class__.__name__ diff --git a/src/zenml/steps/entrypoint_function_utils.py b/src/zenml/steps/entrypoint_function_utils.py index 9f87ea826b7..1651c179760 100644 --- a/src/zenml/steps/entrypoint_function_utils.py +++ b/src/zenml/steps/entrypoint_function_utils.py @@ -209,7 +209,8 @@ def _validate_input_value( def validate_entrypoint_function( - func: Callable[..., Any], reserved_arguments: Sequence[str] = () + func: Callable[..., Any], + reserved_arguments: Sequence[str] = (), ) -> EntrypointFunctionDefinition: """Validates a step entrypoint function. @@ -256,7 +257,8 @@ def validate_entrypoint_function( inputs[key] = parameter outputs = parse_return_type_annotations( - func=func, enforce_type_annotations=ENFORCE_TYPE_ANNOTATIONS + func=func, + enforce_type_annotations=ENFORCE_TYPE_ANNOTATIONS, ) return EntrypointFunctionDefinition( diff --git a/src/zenml/steps/utils.py b/src/zenml/steps/utils.py index f5863ebb06c..f4ba9fd7839 100644 --- a/src/zenml/steps/utils.py +++ b/src/zenml/steps/utils.py @@ -124,6 +124,20 @@ def parse_return_type_annotations( Returns: - A dictionary mapping output names to their output signatures. """ + + def _define_output_name( + artifact_config: Optional["ArtifactConfig"], + i: int, + original_output_names: Optional[List[str]] = original_output_names, + ) -> Optional[str]: + output_name: Optional[str] = None + if artifact_config: + if original_output_names: + output_name = original_output_names[i] + else: + output_name = artifact_config.name + return output_name + signature = inspect.signature(func, follow_wrapped=True) return_annotation = signature.return_annotation output_name: Optional[str] @@ -158,13 +172,7 @@ def parse_return_type_annotations( artifact_config = get_artifact_config_from_annotation_metadata( annotation ) - if artifact_config: - if original_output_names: - output_name = original_output_names[i] - else: - output_name = artifact_config._unique_name.hex - else: - output_name = None + output_name = _define_output_name(artifact_config, i) has_custom_name = output_name is not None output_name = output_name or f"output_{i}" if output_name in output_signature: @@ -182,13 +190,7 @@ def parse_return_type_annotations( artifact_config = get_artifact_config_from_annotation_metadata( return_annotation ) - if artifact_config: - if original_output_names: - output_name = original_output_names[0] - else: - output_name = artifact_config._unique_name.hex - else: - output_name = None + output_name = _define_output_name(artifact_config, 0) has_custom_name = output_name is not None output_name = output_name or SINGLE_RETURN_OUT_NAME return { diff --git a/src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py b/src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py new file mode 100644 index 00000000000..8a386850a3e --- /dev/null +++ b/src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py @@ -0,0 +1,36 @@ +"""extend_artifact_with_original_name [23c086a86a2b]. + +Revision ID: 23c086a86a2b +Revises: 0.70.0 +Create Date: 2024-11-22 10:26:43.027089 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "23c086a86a2b" +down_revision = "0.70.0" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Upgrade database schema and/or data, creating a new revision.""" + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("artifact", schema=None) as batch_op: + batch_op.add_column( + sa.Column("original_name", sa.TEXT(), nullable=True) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade database schema and/or data back to the previous revision.""" + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("artifact", schema=None) as batch_op: + batch_op.drop_column("original_name") + + # ### end Alembic commands ### diff --git a/src/zenml/zen_stores/schemas/artifact_schemas.py b/src/zenml/zen_stores/schemas/artifact_schemas.py index 8b08e51b562..be1c585278b 100644 --- a/src/zenml/zen_stores/schemas/artifact_schemas.py +++ b/src/zenml/zen_stores/schemas/artifact_schemas.py @@ -76,6 +76,7 @@ class ArtifactSchema(NamedSchema, table=True): # Fields has_custom_name: bool + original_name: str = Field(sa_column=Column(TEXT, nullable=True)) versions: List["ArtifactVersionSchema"] = Relationship( back_populates="artifact", sa_relationship_kwargs={"cascade": "delete"}, @@ -105,6 +106,7 @@ def from_request( return cls( name=artifact_request.name, has_custom_name=artifact_request.has_custom_name, + original_name=artifact_request.original_name, ) def to_model( @@ -149,6 +151,8 @@ def to_model( return ArtifactResponse( id=self.id, name=self.name, + # take self.name as a fallback for static names + original_name=self.original_name or self.name, body=body, metadata=metadata, ) diff --git a/src/zenml/zen_stores/sql_zen_store.py b/src/zenml/zen_stores/sql_zen_store.py index f9c314f774b..368eef07c44 100644 --- a/src/zenml/zen_stores/sql_zen_store.py +++ b/src/zenml/zen_stores/sql_zen_store.py @@ -2726,13 +2726,17 @@ def delete_artifact(self, artifact_id: UUID) -> None: # -------------------- Artifact Versions -------------------- def _get_or_create_artifact_for_name( - self, name: str, has_custom_name: bool + self, + name: str, + has_custom_name: bool, + original_name: Optional[str] = None, ) -> ArtifactSchema: """Get or create an artifact with a specific name. Args: name: The artifact name. has_custom_name: Whether the artifact has a custom name. + original_name: The original name of the dynamic artifact. Returns: Schema of the artifact. @@ -2747,7 +2751,9 @@ def _get_or_create_artifact_for_name( try: with session.begin_nested(): artifact_request = ArtifactRequest( - name=name, has_custom_name=has_custom_name + name=name, + has_custom_name=has_custom_name, + original_name=original_name, ) artifact = ArtifactSchema.from_request( artifact_request @@ -2814,6 +2820,7 @@ def create_artifact_version( artifact_schema = self._get_or_create_artifact_for_name( name=artifact_name, has_custom_name=artifact_version.has_custom_name, + original_name=artifact_version.artifact_original_name, ) artifact_version.artifact_id = artifact_schema.id diff --git a/tests/integration/functional/steps/test_naming.py b/tests/integration/functional/steps/test_naming.py index c8d4a004ee3..795dd16b74f 100644 --- a/tests/integration/functional/steps/test_naming.py +++ b/tests/integration/functional/steps/test_naming.py @@ -90,6 +90,13 @@ def mixed_tuple_artifact_config() -> ( return "static_namer", "str_namer_standard", "str_namer_custom" +@step +def dynamic_single_string_standard_controlled_return( + s: str, +) -> Annotated[str, str_namer_standard]: + return s + + @pytest.mark.parametrize( "step", [ @@ -168,3 +175,31 @@ def _inner(): match="Could not format the name template `dummy_dynamic_custom_{funny_name}`. Missing key: 'funny_name'", ): _inner() + + +def test_stored_info_not_affected_by_dynamic_naming(clean_client: "Client"): + """Test that dynamic naming does not affect stored info.""" + + @pipeline(enable_cache=False) + def _inner(ret: str): + dynamic_single_string_standard_controlled_return(ret) + + p1: PipelineRunResponse = _inner("output_1") + p2: PipelineRunResponse = _inner("output_2") + + a1 = clean_client.get_artifact_version( + list( + p1.steps[ + "dynamic_single_string_standard_controlled_return" + ].outputs.keys() + )[0] + ).load() + a2 = clean_client.get_artifact_version( + list( + p2.steps[ + "dynamic_single_string_standard_controlled_return" + ].outputs.keys() + )[0] + ).load() + assert a1 == "output_1" != a2 + assert a2 == "output_2" != a1 diff --git a/tests/unit/steps/test_utils.py b/tests/unit/steps/test_utils.py index a7bb27be4f9..c9af9cfbd21 100644 --- a/tests/unit/steps/test_utils.py +++ b/tests/unit/steps/test_utils.py @@ -277,7 +277,7 @@ def func_with_multiple_annotated_outputs_and_deployment_artifact_config() -> ( ], ) def test_step_output_annotation_parsing(func, expected_output): - assert parse_return_type_annotations(func) == expected_output + assert parse_return_type_annotations(func, {}) == expected_output def func_with_multiple_annotations() -> Annotated[int, "a", "b"]: @@ -323,4 +323,4 @@ def func_with_duplicate_output_name() -> ( ) def test_invalid_step_output_annotations(func, exception): with pytest.raises(exception): - parse_return_type_annotations(func) + parse_return_type_annotations(func, {}) From 140c84444b7ffb35918ed2da583496edb7c6f8fd Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:58:44 +0100 Subject: [PATCH 12/14] resolve branching --- .../23c086a86a2b_extend_artifact_with_original_name.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py b/src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py index 8a386850a3e..545469d6ca5 100644 --- a/src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py +++ b/src/zenml/zen_stores/migrations/versions/23c086a86a2b_extend_artifact_with_original_name.py @@ -1,7 +1,7 @@ """extend_artifact_with_original_name [23c086a86a2b]. Revision ID: 23c086a86a2b -Revises: 0.70.0 +Revises: ec6307720f92 Create Date: 2024-11-22 10:26:43.027089 """ @@ -11,7 +11,7 @@ # revision identifiers, used by Alembic. revision = "23c086a86a2b" -down_revision = "0.70.0" +down_revision = "ec6307720f92" branch_labels = None depends_on = None From 51214b497ad850fab190e0c282afe260758afbf0 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Fri, 22 Nov 2024 13:55:45 +0100 Subject: [PATCH 13/14] extra_name_placeholders -> name_subs --- .../handle-data-artifacts/artifacts-naming.md | 10 +++++----- src/zenml/artifacts/artifact_config.py | 8 +++----- src/zenml/config/step_configurations.py | 2 +- src/zenml/orchestrators/step_runner.py | 2 +- src/zenml/steps/base_step.py | 18 +++++++++--------- src/zenml/steps/step_decorator.py | 8 ++++---- .../zen_server/template_execution/utils.py | 2 +- .../functional/steps/test_naming.py | 12 ++++++------ 8 files changed, 30 insertions(+), 32 deletions(-) diff --git a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md index c3c513e5668..8ffdf5a3e8a 100644 --- a/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md +++ b/docs/book/how-to/data-artifact-management/handle-data-artifacts/artifacts-naming.md @@ -43,12 +43,12 @@ def dynamic_single_string() -> Annotated[str, str_namer]: ``` #### String Templates Using Custom Placeholders -Use any placeholders that ZenML will replace for you, if they are provided into a step via `extra_name_placeholders` parameter: +Use any placeholders that ZenML will replace for you, if they are provided into a step via `name_subs` parameter: ```python str_namer = "placeholder_name_{custom_placeholder}_{time}" -@step(extra_name_placeholders={"custom_placeholder": "some_substitute"}) +@step(name_subs={"custom_placeholder": "some_substitute"}) def dynamic_single_string() -> Annotated[str, str_namer]: return "null" ``` @@ -65,8 +65,8 @@ def extract_data(source: str) -> Annotated[str, str_namer]: @pipeline def extraction_pipeline(): - extract_data.with_options(extra_name_placeholders={"stage": "train"})(source="s3://train") - extract_data.with_options(extra_name_placeholders={"stage": "test"})(source="s3://test") + extract_data.with_options(name_subs={"stage": "train"})(source="s3://train") + extract_data.with_options(name_subs={"stage": "test"})(source="s3://test") ``` ### Multiple Output Handling @@ -94,7 +94,7 @@ from zenml import step, pipeline from zenml.models import PipelineRunResponse -@step(extra_name_placeholders={"custom_placeholder": "resolution"}) +@step(name_subs={"custom_placeholder": "resolution"}) def demo() -> Tuple[ Annotated[int, "dummy_{date}_{time}"], Annotated[int, "dummy_{custom_placeholder}"], diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index bf5bb72e531..f6f40cecbac 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -117,19 +117,17 @@ def _remove_old_attributes(cls, data: Dict[str, Any]) -> Dict[str, Any]: return data - def _evaluated_name( - self, extra_name_placeholders: Dict[str, str] - ) -> Optional[str]: + def _evaluated_name(self, name_subs: Dict[str, str]) -> Optional[str]: """Evaluated name of the artifact. Args: - extra_name_placeholders: Extra placeholders to use in the name template. + name_subs: Extra placeholders to use in the name template. Returns: The evaluated name of the artifact. """ if self.name: - return format_name_template(self.name, **extra_name_placeholders) + return format_name_template(self.name, **name_subs) return self.name @property diff --git a/src/zenml/config/step_configurations.py b/src/zenml/config/step_configurations.py index b8a76b16697..197f9acff28 100644 --- a/src/zenml/config/step_configurations.py +++ b/src/zenml/config/step_configurations.py @@ -152,7 +152,7 @@ class StepConfigurationUpdate(StrictBaseModel): success_hook_source: Optional[SourceWithValidator] = None model: Optional[Model] = None retry: Optional[StepRetryConfig] = None - extra_name_placeholders: Optional[Dict[str, str]] = None + name_subs: Optional[Dict[str, str]] = None outputs: Mapping[str, PartialArtifactConfiguration] = {} diff --git a/src/zenml/orchestrators/step_runner.py b/src/zenml/orchestrators/step_runner.py index 1f60a89d281..325d802ae96 100644 --- a/src/zenml/orchestrators/step_runner.py +++ b/src/zenml/orchestrators/step_runner.py @@ -156,7 +156,7 @@ def run( for k, v in list(output_annotations.items()): if v.artifact_config: _evaluated_name = v.artifact_config._evaluated_name( - step_run.config.extra_name_placeholders or {} + step_run.config.name_subs or {} ) if _evaluated_name: output_materializers[_evaluated_name] = ( diff --git a/src/zenml/steps/base_step.py b/src/zenml/steps/base_step.py index 20734a3ee6b..4244200a634 100644 --- a/src/zenml/steps/base_step.py +++ b/src/zenml/steps/base_step.py @@ -116,7 +116,7 @@ def __init__( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, retry: Optional[StepRetryConfig] = None, - extra_name_placeholders: Optional[Dict[str, str]] = None, + name_subs: Optional[Dict[str, str]] = None, ) -> None: """Initializes a step. @@ -145,7 +145,7 @@ def __init__( function (e.g. `module.my_function`). model: configuration of the model version in the Model Control Plane. retry: Configuration for retrying the step in case of failure. - extra_name_placeholders: Extra placeholders to use in the name template. + name_subs: Extra placeholders to use in the name template. """ from zenml.config.step_configurations import PartialStepConfiguration @@ -206,7 +206,7 @@ def __init__( on_success=on_success, model=model, retry=retry, - extra_name_placeholders=extra_name_placeholders, + name_subs=name_subs, ) notebook_utils.try_to_save_notebook_cell_code(self.source_object) @@ -599,7 +599,7 @@ def configure( model: Optional["Model"] = None, merge: bool = True, retry: Optional[StepRetryConfig] = None, - extra_name_placeholders: Optional[Dict[str, str]] = None, + name_subs: Optional[Dict[str, str]] = None, ) -> T: """Configures the step. @@ -642,7 +642,7 @@ def configure( overwrite all existing ones. See the general description of this method for an example. retry: Configuration for retrying the step in case of failure. - extra_name_placeholders: Extra placeholders to use in the name template. + name_subs: Extra placeholders to use in the name template. Returns: The step instance that this method was called on. @@ -707,7 +707,7 @@ def _convert_to_tuple(value: Any) -> Tuple[Source, ...]: "success_hook_source": success_hook_source, "model": model, "retry": retry, - "extra_name_placeholders": extra_name_placeholders, + "name_subs": name_subs, } ) config = StepConfigurationUpdate(**values) @@ -732,7 +732,7 @@ def with_options( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, merge: bool = True, - extra_name_placeholders: Optional[Dict[str, str]] = None, + name_subs: Optional[Dict[str, str]] = None, ) -> "BaseStep": """Copies the step and applies the given configurations. @@ -764,7 +764,7 @@ def with_options( configurations. If `False` the given configurations will overwrite all existing ones. See the general description of this method for an example. - extra_name_placeholders: Extra placeholders for the step name. + name_subs: Extra placeholders for the step name. Returns: The copied step instance. @@ -785,7 +785,7 @@ def with_options( on_success=on_success, model=model, merge=merge, - extra_name_placeholders=extra_name_placeholders, + name_subs=name_subs, ) return step_copy diff --git a/src/zenml/steps/step_decorator.py b/src/zenml/steps/step_decorator.py index 98dc1d6ca72..7440c5c885f 100644 --- a/src/zenml/steps/step_decorator.py +++ b/src/zenml/steps/step_decorator.py @@ -73,7 +73,7 @@ def step( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, retry: Optional["StepRetryConfig"] = None, - extra_name_placeholders: Optional[Dict[str, str]] = None, + name_subs: Optional[Dict[str, str]] = None, ) -> Callable[["F"], "BaseStep"]: ... @@ -94,7 +94,7 @@ def step( on_success: Optional["HookSpecification"] = None, model: Optional["Model"] = None, retry: Optional["StepRetryConfig"] = None, - extra_name_placeholders: Optional[Dict[str, str]] = None, + name_subs: Optional[Dict[str, str]] = None, ) -> Union["BaseStep", Callable[["F"], "BaseStep"]]: """Decorator to create a ZenML step. @@ -126,7 +126,7 @@ def step( (e.g. `module.my_function`). model: configuration of the model in the Model Control Plane. retry: configuration of step retry in case of step failure. - extra_name_placeholders: Extra placeholders for the step name. + name_subs: Extra placeholders for the step name. Returns: The step instance. @@ -160,7 +160,7 @@ def inner_decorator(func: "F") -> "BaseStep": on_success=on_success, model=model, retry=retry, - extra_name_placeholders=extra_name_placeholders, + name_subs=name_subs, ) return step_instance diff --git a/src/zenml/zen_server/template_execution/utils.py b/src/zenml/zen_server/template_execution/utils.py index 04424db409e..0b3a9d1ad31 100644 --- a/src/zenml/zen_server/template_execution/utils.py +++ b/src/zenml/zen_server/template_execution/utils.py @@ -363,7 +363,7 @@ def deployment_request_from_template( "external_input_artifacts", "model_artifacts_or_metadata", "client_lazy_loaders", - "extra_name_placeholders", + "name_subs", "outputs", } ), diff --git a/tests/integration/functional/steps/test_naming.py b/tests/integration/functional/steps/test_naming.py index 795dd16b74f..a8c2c409919 100644 --- a/tests/integration/functional/steps/test_naming.py +++ b/tests/integration/functional/steps/test_naming.py @@ -41,7 +41,7 @@ def dynamic_single_string_standard() -> Annotated[str, str_namer_standard]: return "str_namer_standard" -@step(extra_name_placeholders={"funny_name": "name_placeholder"}) +@step(name_subs={"funny_name": "name_placeholder"}) def dynamic_single_string_custom() -> Annotated[str, str_namer_custom]: return "str_namer_custom" @@ -53,7 +53,7 @@ def dynamic_single_string_custom_no_default() -> ( return "str_namer_custom" -@step(extra_name_placeholders={"funny_name": "name_placeholder"}) +@step(name_subs={"funny_name": "name_placeholder"}) def dynamic_tuple() -> ( Tuple[ Annotated[str, str_namer_standard], @@ -63,7 +63,7 @@ def dynamic_tuple() -> ( return "str_namer_standard", "str_namer_custom" -@step(extra_name_placeholders={"funny_name": "name_placeholder"}) +@step(name_subs={"funny_name": "name_placeholder"}) def mixed_tuple() -> ( Tuple[ Annotated[str, str_namer_standard], @@ -79,7 +79,7 @@ def static_single() -> Annotated[str, static_namer]: return "static_namer" -@step(extra_name_placeholders={"funny_name": "name_placeholder"}) +@step(name_subs={"funny_name": "name_placeholder"}) def mixed_tuple_artifact_config() -> ( Tuple[ Annotated[str, ArtifactConfig(name=static_namer)], @@ -148,7 +148,7 @@ def test_sequential_executions_have_different_names(clean_client: "Client"): @pipeline(enable_cache=False) def _inner(name_placeholder: str): dynamic_single_string_custom.with_options( - extra_name_placeholders={"funny_name": name_placeholder} + name_subs={"funny_name": name_placeholder} )() p1: PipelineRunResponse = _inner("funny_name_42") @@ -167,7 +167,7 @@ def test_execution_fails_on_custom_but_not_provided_name( @pipeline(enable_cache=False) def _inner(): dynamic_single_string_custom_no_default.with_options( - extra_name_placeholders={"not_a_funny_name": "it's gonna fail"} + name_subs={"not_a_funny_name": "it's gonna fail"} )() with pytest.raises( From be0c12d86ad3fba356ea2d54ce7137c9a9362a8c Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:43:46 +0100 Subject: [PATCH 14/14] fix doc string --- src/zenml/artifacts/artifact_config.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/zenml/artifacts/artifact_config.py b/src/zenml/artifacts/artifact_config.py index f6f40cecbac..49c08d38f37 100644 --- a/src/zenml/artifacts/artifact_config.py +++ b/src/zenml/artifacts/artifact_config.py @@ -49,8 +49,11 @@ def my_step() -> Annotated[ Attributes: name: The name of the artifact: - static string e.g. "name" - - dynamic callable e.g. lambda: "name"+str(42) - - dynamic string e.g. "name_{date}_{time}" + - dynamic string e.g. "name_{date}_{time}_{custom_placeholder}" + If you use any placeholders besides `date` and `time`, + you need to provide the values for them in the `name_subs` + argument of the step decorator or the `name_subs` argument + of `with_options` of the step. version: The version of the artifact. tags: The tags of the artifact. run_metadata: Metadata to add to the artifact.