diff --git a/docs/book/component-guide/model-deployers/vertex.md b/docs/book/component-guide/model-deployers/vertex.md new file mode 100644 index 00000000000..727f2d763fe --- /dev/null +++ b/docs/book/component-guide/model-deployers/vertex.md @@ -0,0 +1,179 @@ +# Vertex AI Model Deployer + +[Vertex AI](https://cloud.google.com/vertex-ai) provides managed infrastructure for deploying machine learning models at scale. The Vertex AI Model Deployer in ZenML allows you to deploy models to Vertex AI endpoints, providing a scalable and managed solution for model serving. + +## When to use it? + +You should use the Vertex AI Model Deployer when: + +* You're already using Google Cloud Platform (GCP) and want to leverage its native ML infrastructure +* You need enterprise-grade model serving capabilities with autoscaling +* You want a fully managed solution for hosting ML models +* You need to handle high-throughput prediction requests +* You want to deploy models with GPU acceleration +* You need to monitor and track your model deployments + +This is particularly useful in the following scenarios: +* Deploying models to production with high availability requirements +* Serving models that need GPU acceleration +* Handling varying prediction workloads with autoscaling +* Integrating model serving with other GCP services + +{% hint style="warning" %} +The Vertex AI Model Deployer requires a Vertex AI Model Registry to be present in your stack. Make sure you have configured both components properly. +{% endhint %} + +## How to deploy it? + +The Vertex AI Model Deployer is provided by the GCP ZenML integration. First, install the integration: + +```shell +zenml integration install gcp -y +``` + +### Authentication and Service Connector Configuration + +The Vertex AI Model Deployer requires proper GCP authentication. The recommended way to configure this is using the ZenML Service Connector functionality: + +```shell +# Register the service connector with a service account key +zenml service-connector register vertex_deployer_connector \ + --type gcp \ + --auth-method=service-account \ + --project_id= \ + --service_account_json=@vertex-deployer-sa.json \ + --resource-type gcp-generic + +# Register the model deployer +zenml model-deployer register vertex_deployer \ + --flavor=vertex \ + --location=us-central1 + +# Connect the model deployer to the service connector +zenml model-deployer connect vertex_deployer --connector vertex_deployer_connector +``` + +{% hint style="info" %} +The service account needs the following permissions: +- `Vertex AI User` role for deploying models +- `Vertex AI Service Agent` role for managing model endpoints +{% endhint %} + +## How to use it + +### Deploy a model in a pipeline + +Here's an example of how to use the Vertex AI Model Deployer in a ZenML pipeline: + +```python +from typing_extensions import Annotated +from zenml import ArtifactConfig, get_step_context, step +from zenml.client import Client +from zenml.integrations.gcp.services.vertex_deployment import ( + VertexDeploymentConfig, + VertexDeploymentService, +) + +@step(enable_cache=False) +def model_deployer( +) -> Annotated[ + VertexDeploymentService, + ArtifactConfig(name="vertex_deployment", is_deployment_artifact=True) +]: + """Model deployer step.""" + zenml_client = Client() + current_model = get_step_context().model + model_registry_uri = current_model.get_model_artifact("THE_MODEL_ARTIFACT_NAME_GIVEN_IN_TRAINING_STEP").uri + model_deployer = zenml_client.active_stack.model_deployer + + # Configure the deployment + vertex_deployment_config = VertexDeploymentConfig( + location="europe-west1", + name="zenml-vertex-quickstart", + model_name=current_model.name, + description="Vertex AI model deployment example", + model_id=model_registry_uri, + machine_type="n1-standard-4", # Optional: specify machine type + min_replica_count=1, # Optional: minimum number of replicas + max_replica_count=3, # Optional: maximum number of replicas + ) + + # Deploy the model + service = model_deployer.deploy_model( + config=vertex_deployment_config, + service_type=VertexDeploymentService.SERVICE_TYPE, + ) + + return service +``` + +### Configuration Options + +The Vertex AI Model Deployer accepts a rich set of configuration options through `VertexDeploymentConfig`: + +* Basic Configuration: + * `location`: GCP region for deployment (e.g., "us-central1") + * `name`: Name for the deployment endpoint + * `model_name`: Name of the model being deployed + * `model_id`: Model ID from the Vertex AI Model Registry + +* Infrastructure Configuration: + * `machine_type`: Type of machine to use (e.g., "n1-standard-4") + * `accelerator_type`: GPU accelerator type if needed + * `accelerator_count`: Number of GPUs per replica + * `min_replica_count`: Minimum number of serving replicas + * `max_replica_count`: Maximum number of serving replicas + +* Advanced Configuration: + * `service_account`: Custom service account for the deployment + * `network`: VPC network configuration + * `encryption_spec_key_name`: Customer-managed encryption key + * `enable_access_logging`: Enable detailed access logging + * `explanation_metadata`: Model explanation configuration + * `autoscaling_target_cpu_utilization`: Target CPU utilization for autoscaling + +### Running Predictions + +Once a model is deployed, you can run predictions using the service: + +```python +from zenml.integrations.gcp.model_deployers import VertexModelDeployer +from zenml.services import ServiceState + +# Get the deployed service +model_deployer = VertexModelDeployer.get_active_model_deployer() +services = model_deployer.find_model_server( + pipeline_name="deployment_pipeline", + pipeline_step_name="model_deployer", + model_name="my_model", +) + +if services: + service = services[0] + if service.is_running: + # Run prediction + prediction = service.predict( + instances=[{"feature1": 1.0, "feature2": 2.0}] + ) + print(f"Prediction: {prediction}") +``` + +### Limitations and Considerations + +1. **Stack Requirements**: + - Requires a Vertex AI Model Registry in the stack + - All stack components must be non-local + +2. **Authentication**: + - Requires proper GCP credentials with Vertex AI permissions + - Best practice is to use service connectors for authentication + +3. **Costs**: + - Vertex AI endpoints incur costs based on machine type and uptime + - Consider using autoscaling to optimize costs + +4. **Region Availability**: + - Service availability depends on Vertex AI regional availability + - Model and endpoint must be in the same region + +Check out the [SDK docs](https://sdkdocs.zenml.io) for more detailed information about the implementation. \ No newline at end of file diff --git a/docs/book/component-guide/model-registries/vertex.md b/docs/book/component-guide/model-registries/vertex.md new file mode 100644 index 00000000000..eef9096ce62 --- /dev/null +++ b/docs/book/component-guide/model-registries/vertex.md @@ -0,0 +1,156 @@ +# Vertex AI Model Registry + +[Vertex AI](https://cloud.google.com/vertex-ai) is Google Cloud's unified ML platform that helps you build, deploy, and scale ML models. The Vertex AI Model Registry is a centralized repository for managing your ML models throughout their lifecycle. ZenML's Vertex AI Model Registry integration allows you to register, version, and manage your models using Vertex AI's infrastructure. + +## When would you want to use it? + +You should consider using the Vertex AI Model Registry when: + +* You're already using Google Cloud Platform (GCP) and want to leverage its native ML infrastructure +* You need enterprise-grade model management capabilities with fine-grained access control +* You want to track model lineage and metadata in a centralized location +* You're building ML pipelines that need to integrate with other Vertex AI services +* You need to manage model deployment across different GCP environments + +This is particularly useful in the following scenarios: + +* Building production ML pipelines that need to integrate with GCP services +* Managing multiple versions of models across development and production environments +* Tracking model artifacts and metadata in a centralized location +* Deploying models to Vertex AI endpoints for serving + +{% hint style="warning" %} +Important: The Vertex AI Model Registry implementation only supports the model version interface, not the model interface. This means you cannot register, delete, or update models directly - you can only work with model versions. Operations like `register_model()`, `delete_model()`, and `update_model()` are not supported. + +Unlike platforms like MLflow where you first create a model container and then add versions to it, Vertex AI combines model registration and versioning into a single operation: + +- When you upload a model, it automatically creates both the model and its first version +- Each subsequent upload with the same display name creates a new version +- You cannot create an empty model container without a version +{% endhint %} + +## How do you deploy it? + +The Vertex AI Model Registry flavor is provided by the GCP ZenML integration. First, install the integration: + +```shell +zenml integration install gcp -y +``` + +### Authentication and Service Connector Configuration + +The Vertex AI Model Registry requires proper GCP authentication. The recommended way to configure this is using the ZenML Service Connector functionality. You have several options for authentication: + +1. Using a GCP Service Connector with a dedicated service account (Recommended): +```shell +# Register the service connector with a service account key +zenml service-connector register vertex_registry_connector \ + --type gcp \ + --auth-method=service-account \ + --project_id= \ + --service_account_json=@vertex-registry-sa.json \ + --resource-type gcp-generic + +# Register the model registry +zenml model-registry register vertex_registry \ + --flavor=vertex \ + --location=us-central1 + +# Connect the model registry to the service connector +zenml model-registry connect vertex_registry --connector vertex_registry_connector +``` + +2. Using local gcloud credentials: +```shell +# Register the model registry using local gcloud auth +zenml model-registry register vertex_registry \ + --flavor=vertex \ + --location=us-central1 +``` + +{% hint style="info" %} +The service account used needs the following permissions: +- `Vertex AI User` role for creating and managing model versions +- `Storage Object Viewer` role if accessing models stored in Google Cloud Storage +{% endhint %} + +## How do you use it? + +### Register models inside a pipeline + +Here's an example of how to use the Vertex AI Model Registry in your ZenML pipeline using the provided model registration step: + +```python +from typing_extensions import Annotated +from zenml import ArtifactConfig, get_step_context, step +from zenml.client import Client +from zenml.logger import get_logger + +logger = get_logger(__name__) + +@step(enable_cache=False) +def model_register() -> Annotated[str, ArtifactConfig(name="model_registry_uri")]: + """Model registration step.""" + # Get the current model from the context + current_model = get_step_context().model + + client = Client() + model_registry = client.active_stack.model_registry + model_version = model_registry.register_model_version( + name=current_model.name, + version=str(current_model.version), + model_source_uri=current_model.get_model_artifact("sklearn_classifier").uri, + description="ZenML model registered after promotion", + ) + logger.info( + f"Model version {model_version.version} registered in Model Registry" + ) + + return model_version.model_source_uri +``` + +### Configuration Options + +The Vertex AI Model Registry accepts the following configuration options: + +* `location`: The GCP region where the model registry will be created (e.g., "us-central1") +* `project_id`: (Optional) The GCP project ID. If not specified, will use the default project +* `credentials`: (Optional) GCP credentials configuration + +### Working with Model Versions + +Since the Vertex AI Model Registry only supports version-level operations, here's how to work with model versions: + +```shell +# List all model versions +zenml model-registry models list-versions + +# Get details of a specific model version +zenml model-registry models get-version -v + +# Delete a model version +zenml model-registry models delete-version -v +``` + +### Key Differences from MLflow Model Registry + +Unlike the MLflow Model Registry, the Vertex AI implementation has some important differences: + +1. **Version-Only Interface**: Vertex AI only supports model version operations. You cannot register, delete, or update models directly - only their versions. +2. **Authentication**: Uses GCP service connectors for authentication, similar to other Vertex AI services in ZenML. +3. **Staging Levels**: Vertex AI doesn't have built-in staging levels (like Production, Staging, etc.) - these are handled through metadata. +4. **Default Container Images**: Vertex AI requires a serving container image URI, which defaults to the scikit-learn prediction container if not specified. +5. **Managed Service**: As a fully managed service, you don't need to worry about infrastructure management, but you need valid GCP credentials. + +### Limitations + +Based on the implementation, there are some limitations to be aware of: + +1. The `register_model()`, `update_model()`, and `delete_model()` methods are not implemented as Vertex AI only supports registering model versions +3. It's preferable for the models to be given a serving container image URI specified to avoid using the default scikit-learn prediction container and to ensure compatibility with Vertex AI endpoints +when deploying models. +4. All registered models by the integration are automatically labeled with `managed_by="zenml"` for tracking purposes + +Check out the [SDK docs](https://sdkdocs.zenml.io/latest/integration\_code\_docs/integrations-gcp/#zenml.integrations.gcp.model\_registry) to see more about the interface and implementation. + +
ZenML Scarf
\ No newline at end of file diff --git a/docs/book/toc.md b/docs/book/toc.md index aff3ce0c7b9..879470d5ef0 100644 --- a/docs/book/toc.md +++ b/docs/book/toc.md @@ -272,6 +272,7 @@ * [Develop a custom experiment tracker](component-guide/experiment-trackers/custom.md) * [Model Deployers](component-guide/model-deployers/model-deployers.md) * [MLflow](component-guide/model-deployers/mlflow.md) + * [VertexAI](component-guide/model-deployers/vertex.md) * [Seldon](component-guide/model-deployers/seldon.md) * [BentoML](component-guide/model-deployers/bentoml.md) * [Hugging Face](component-guide/model-deployers/huggingface.md) @@ -304,6 +305,7 @@ * [Develop a Custom Annotator](component-guide/annotators/custom.md) * [Model Registries](component-guide/model-registries/model-registries.md) * [MLflow Model Registry](component-guide/model-registries/mlflow.md) + * [VertexAI](component-guide/model-registries/vertex.md) * [Develop a Custom Model Registry](component-guide/model-registries/custom.md) * [Feature Stores](component-guide/feature-stores/feature-stores.md) * [Feast](component-guide/feature-stores/feast.md) diff --git a/src/zenml/cli/model_registry.py b/src/zenml/cli/model_registry.py index 93c55391817..838132b9e9a 100644 --- a/src/zenml/cli/model_registry.py +++ b/src/zenml/cli/model_registry.py @@ -18,6 +18,7 @@ import click +from zenml import __version__ from zenml.cli import utils as cli_utils from zenml.cli.cli import TagGroup, cli from zenml.enums import StackComponentType @@ -643,7 +644,7 @@ def register_model_version( # Parse metadata metadata = dict(metadata) if metadata else {} registered_metadata = ModelRegistryModelMetadata(**dict(metadata)) - registered_metadata.zenml_version = zenml_version + registered_metadata.zenml_version = zenml_version or __version__ registered_metadata.zenml_run_name = zenml_run_name registered_metadata.zenml_pipeline_name = zenml_pipeline_name registered_metadata.zenml_step_name = zenml_step_name diff --git a/src/zenml/integrations/gcp/__init__.py b/src/zenml/integrations/gcp/__init__.py index 0c3508546d8..e25d9441427 100644 --- a/src/zenml/integrations/gcp/__init__.py +++ b/src/zenml/integrations/gcp/__init__.py @@ -33,6 +33,11 @@ GCP_VERTEX_ORCHESTRATOR_FLAVOR = "vertex" GCP_VERTEX_STEP_OPERATOR_FLAVOR = "vertex" +# Model deployer constants +VERTEX_MODEL_REGISTRY_FLAVOR = "vertex" +VERTEX_MODEL_DEPLOYER_FLAVOR = "vertex" +VERTEX_SERVICE_ARTIFACT = "vertex_deployment_service" + # Service connector constants GCP_CONNECTOR_TYPE = "gcp" GCP_RESOURCE_TYPE = "gcp-generic" @@ -72,6 +77,8 @@ def flavors(cls) -> List[Type[Flavor]]: GCPImageBuilderFlavor, VertexOrchestratorFlavor, VertexStepOperatorFlavor, + VertexModelDeployerFlavor, + VertexModelRegistryFlavor, ) return [ @@ -79,6 +86,8 @@ def flavors(cls) -> List[Type[Flavor]]: GCPImageBuilderFlavor, VertexOrchestratorFlavor, VertexStepOperatorFlavor, + VertexModelRegistryFlavor, + VertexModelDeployerFlavor, ] diff --git a/src/zenml/integrations/gcp/flavors/__init__.py b/src/zenml/integrations/gcp/flavors/__init__.py index 73bb6259aa5..25067703d55 100644 --- a/src/zenml/integrations/gcp/flavors/__init__.py +++ b/src/zenml/integrations/gcp/flavors/__init__.py @@ -29,6 +29,14 @@ VertexStepOperatorConfig, VertexStepOperatorFlavor, ) +from zenml.integrations.gcp.flavors.vertex_model_deployer_flavor import ( + VertexModelDeployerConfig, + VertexModelDeployerFlavor, +) +from zenml.integrations.gcp.flavors.vertex_model_registry_flavor import ( + VertexAIModelRegistryConfig, + VertexModelRegistryFlavor, +) __all__ = [ "GCPArtifactStoreFlavor", @@ -39,4 +47,8 @@ "VertexOrchestratorConfig", "VertexStepOperatorFlavor", "VertexStepOperatorConfig", + "VertexModelDeployerFlavor", + "VertexModelDeployerConfig", + "VertexModelRegistryFlavor", + "VertexAIModelRegistryConfig", ] diff --git a/src/zenml/integrations/gcp/flavors/vertex_model_deployer_flavor.py b/src/zenml/integrations/gcp/flavors/vertex_model_deployer_flavor.py new file mode 100644 index 00000000000..1b526cf0f2f --- /dev/null +++ b/src/zenml/integrations/gcp/flavors/vertex_model_deployer_flavor.py @@ -0,0 +1,161 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Vertex AI model deployer flavor.""" + +from typing import TYPE_CHECKING, Dict, Optional, Sequence, Type + +from pydantic import BaseModel + +from zenml.integrations.gcp import ( + GCP_RESOURCE_TYPE, + VERTEX_MODEL_DEPLOYER_FLAVOR, +) +from zenml.integrations.gcp.google_credentials_mixin import ( + GoogleCredentialsConfigMixin, +) +from zenml.model_deployers.base_model_deployer import ( + BaseModelDeployerConfig, + BaseModelDeployerFlavor, +) +from zenml.models.v2.misc.service_connector_type import ( + ServiceConnectorRequirements, +) + +if TYPE_CHECKING: + from zenml.integrations.gcp.model_deployers.vertex_model_deployer import ( + VertexModelDeployer, + ) + + +class VertexBaseConfig(BaseModel): + """Vertex AI Inference Endpoint configuration.""" + + location: Optional[str] = None + version: Optional[str] = None + serving_container_image_uri: Optional[str] = None + artifact_uri: Optional[str] = None + model_id: Optional[str] = None + is_default_version: Optional[bool] = None + serving_container_command: Optional[Sequence[str]] = None + serving_container_args: Optional[Sequence[str]] = None + serving_container_environment_variables: Optional[Dict[str, str]] = None + serving_container_ports: Optional[Sequence[int]] = None + serving_container_grpc_ports: Optional[Sequence[int]] = None + deployed_model_display_name: Optional[str] = None + traffic_percentage: Optional[int] = 0 + traffic_split: Optional[Dict[str, int]] = None + machine_type: Optional[str] = None + accelerator_type: Optional[str] = None + accelerator_count: Optional[int] = None + min_replica_count: Optional[int] = 1 + max_replica_count: Optional[int] = 1 + service_account: Optional[str] = None + metadata: Optional[Dict[str, str]] = None + network: Optional[str] = None + encryption_spec_key_name: Optional[str] = None + sync: Optional[bool] = False + deploy_request_timeout: Optional[int] = None + autoscaling_target_cpu_utilization: Optional[float] = None + autoscaling_target_accelerator_duty_cycle: Optional[float] = None + enable_access_logging: Optional[bool] = None + disable_container_logging: Optional[bool] = None + explanation_metadata: Optional[Dict[str, str]] = None + explanation_parameters: Optional[Dict[str, str]] = None + existing_endpoint: Optional[str] = None + labels: Optional[Dict[str, str]] = None + + +class VertexModelDeployerConfig( + BaseModelDeployerConfig, VertexBaseConfig, GoogleCredentialsConfigMixin +): + """Configuration for the Vertex AI model deployer.""" + + +class VertexModelDeployerFlavor(BaseModelDeployerFlavor): + """Vertex AI model deployer flavor.""" + + @property + def name(self) -> str: + """Name of the flavor. + + Returns: + The name of the flavor. + """ + return VERTEX_MODEL_DEPLOYER_FLAVOR + + @property + def service_connector_requirements( + self, + ) -> Optional[ServiceConnectorRequirements]: + """Service connector resource requirements for service connectors. + + Specifies resource requirements that are used to filter the available + service connector types that are compatible with this flavor. + + Returns: + Requirements for compatible service connectors, if a service + connector is required for this flavor. + """ + return ServiceConnectorRequirements( + resource_type=GCP_RESOURCE_TYPE, + ) + + @property + def docs_url(self) -> Optional[str]: + """A url to point at docs explaining this flavor. + + Returns: + A flavor docs url. + """ + return self.generate_default_docs_url() + + @property + def sdk_docs_url(self) -> Optional[str]: + """A url to point at SDK docs explaining this flavor. + + Returns: + A flavor SDK docs url. + """ + return self.generate_default_sdk_docs_url() + + @property + def logo_url(self) -> str: + """A url to represent the flavor in the dashboard. + + Returns: + The flavor logo. + """ + return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/gcp.png" + + @property + def config_class(self) -> Type[VertexModelDeployerConfig]: + """Returns `VertexModelDeployerConfig` config class. + + Returns: + The config class. + """ + return VertexModelDeployerConfig + + @property + def implementation_class(self) -> Type["VertexModelDeployer"]: + """Implementation class for this flavor. + + Returns: + The implementation class. + """ + from zenml.integrations.gcp.model_deployers.vertex_model_deployer import ( + VertexModelDeployer, + ) + + return VertexModelDeployer diff --git a/src/zenml/integrations/gcp/flavors/vertex_model_registry_flavor.py b/src/zenml/integrations/gcp/flavors/vertex_model_registry_flavor.py new file mode 100644 index 00000000000..e16cf548685 --- /dev/null +++ b/src/zenml/integrations/gcp/flavors/vertex_model_registry_flavor.py @@ -0,0 +1,128 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""VertexAI model registry flavor.""" + +from typing import TYPE_CHECKING, Optional, Type + +from zenml.config.base_settings import BaseSettings +from zenml.integrations.gcp import ( + GCP_RESOURCE_TYPE, + VERTEX_MODEL_REGISTRY_FLAVOR, +) +from zenml.integrations.gcp.google_credentials_mixin import ( + GoogleCredentialsConfigMixin, +) +from zenml.model_registries.base_model_registry import ( + BaseModelRegistryConfig, + BaseModelRegistryFlavor, +) +from zenml.models import ServiceConnectorRequirements + +if TYPE_CHECKING: + from zenml.integrations.gcp.model_registries import ( + VertexAIModelRegistry, + ) + + +class VertexAIModelRegistrySettings(BaseSettings): + """Settings for the VertexAI model registry.""" + + location: str + + +class VertexAIModelRegistryConfig( + BaseModelRegistryConfig, + GoogleCredentialsConfigMixin, + VertexAIModelRegistrySettings, +): + """Configuration for the VertexAI model registry.""" + + +class VertexModelRegistryFlavor(BaseModelRegistryFlavor): + """Model registry flavor for VertexAI models.""" + + @property + def name(self) -> str: + """Name of the flavor. + + Returns: + The name of the flavor. + """ + return VERTEX_MODEL_REGISTRY_FLAVOR + + @property + def service_connector_requirements( + self, + ) -> Optional[ServiceConnectorRequirements]: + """Service connector resource requirements for service connectors. + + Specifies resource requirements that are used to filter the available + service connector types that are compatible with this flavor. + + Returns: + Requirements for compatible service connectors, if a service + connector is required for this flavor. + """ + return ServiceConnectorRequirements( + resource_type=GCP_RESOURCE_TYPE, + ) + + @property + def docs_url(self) -> Optional[str]: + """A url to point at docs explaining this flavor. + + Returns: + A flavor docs url. + """ + return self.generate_default_docs_url() + + @property + def sdk_docs_url(self) -> Optional[str]: + """A url to point at SDK docs explaining this flavor. + + Returns: + A flavor SDK docs url. + """ + return self.generate_default_sdk_docs_url() + + @property + def logo_url(self) -> str: + """A url to represent the flavor in the dashboard. + + Returns: + The flavor logo. + """ + return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/gcp.png" + + @property + def config_class(self) -> Type[VertexAIModelRegistryConfig]: + """Returns `VertexAIModelRegistryConfig` config class. + + Returns: + The config class. + """ + return VertexAIModelRegistryConfig + + @property + def implementation_class(self) -> Type["VertexAIModelRegistry"]: + """Implementation class for this flavor. + + Returns: + The implementation class. + """ + from zenml.integrations.gcp.model_registries import ( + VertexAIModelRegistry, + ) + + return VertexAIModelRegistry diff --git a/src/zenml/integrations/gcp/model_deployers/__init__.py b/src/zenml/integrations/gcp/model_deployers/__init__.py new file mode 100644 index 00000000000..203f57c096f --- /dev/null +++ b/src/zenml/integrations/gcp/model_deployers/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Vertex AI model deployers.""" + +from zenml.integrations.gcp.model_deployers.vertex_model_deployer import ( # noqa + VertexModelDeployer, +) + +__all__ = ["VertexModelDeployer"] diff --git a/src/zenml/integrations/gcp/model_deployers/vertex_model_deployer.py b/src/zenml/integrations/gcp/model_deployers/vertex_model_deployer.py new file mode 100644 index 00000000000..3b6d31820cc --- /dev/null +++ b/src/zenml/integrations/gcp/model_deployers/vertex_model_deployer.py @@ -0,0 +1,259 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the Vertex AI Model Deployer.""" + +from typing import ClassVar, Dict, Optional, Tuple, Type, cast +from uuid import UUID + +from google.cloud import aiplatform + +from zenml.analytics.enums import AnalyticsEvent +from zenml.analytics.utils import track_handler +from zenml.client import Client +from zenml.enums import StackComponentType +from zenml.integrations.gcp.flavors.vertex_model_deployer_flavor import ( + VertexModelDeployerConfig, + VertexModelDeployerFlavor, +) +from zenml.integrations.gcp.google_credentials_mixin import ( + GoogleCredentialsMixin, +) +from zenml.integrations.gcp.model_registries.vertex_model_registry import ( + VertexAIModelRegistry, +) +from zenml.integrations.gcp.services.vertex_deployment import ( + VertexDeploymentConfig, + VertexDeploymentService, +) +from zenml.logger import get_logger +from zenml.model_deployers import BaseModelDeployer +from zenml.model_deployers.base_model_deployer import ( + DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, + BaseModelDeployerFlavor, +) +from zenml.services import BaseService, ServiceConfig +from zenml.stack.stack import Stack +from zenml.stack.stack_validator import StackValidator + +logger = get_logger(__name__) + + +class VertexModelDeployer(BaseModelDeployer, GoogleCredentialsMixin): + """Vertex AI endpoint model deployer.""" + + NAME: ClassVar[str] = "Vertex AI" + FLAVOR: ClassVar[Type["BaseModelDeployerFlavor"]] = ( + VertexModelDeployerFlavor + ) + + @property + def config(self) -> VertexModelDeployerConfig: + """Returns the `VertexModelDeployerConfig` config. + + Returns: + The configuration. + """ + return cast(VertexModelDeployerConfig, self._config) + + def setup_aiplatform(self) -> None: + """Setup the Vertex AI platform.""" + credentials, project_id = self._get_authentication() + aiplatform.init( + project=project_id, + location=self.config.location, + credentials=credentials, + ) + + @property + def validator(self) -> Optional[StackValidator]: + """Validates that the stack contains a model registry. + + Also validates that the artifact store is not local. + + Returns: + A StackValidator instance. + """ + + def _validate_stack_requirements(stack: "Stack") -> Tuple[bool, str]: + """Validates that all the stack components are not local. + + Args: + stack: The stack to validate. + + Returns: + A tuple of (is_valid, error_message). + """ + model_registry = stack.model_registry + if not isinstance(model_registry, VertexAIModelRegistry): + return False, ( + "The Vertex AI model deployer requires a Vertex AI model " + "registry to be present in the stack. Please add a Vertex AI " + "model registry to the stack." + ) + + # Validate that the rest of the components are not local. + for stack_comp in stack.components.values(): + local_path = stack_comp.local_path + if not local_path: + continue + return False, ( + f"The '{stack_comp.name}' {stack_comp.type.value} is a " + f"local stack component. The Vertex AI Pipelines " + f"orchestrator requires that all the components in the " + f"stack used to execute the pipeline have to be not local, " + f"because there is no way for Vertex to connect to your " + f"local machine. You should use a flavor of " + f"{stack_comp.type.value} other than '" + f"{stack_comp.flavor}'." + ) + + return True, "" + + return StackValidator( + required_components={ + StackComponentType.MODEL_REGISTRY, + }, + custom_validation_function=_validate_stack_requirements, + ) + + def _create_deployment_service( + self, id: UUID, timeout: int, config: VertexDeploymentConfig + ) -> VertexDeploymentService: + """Creates a new VertexAIDeploymentService. + + Args: + id: the UUID of the model to be deployed with Vertex model deployer. + timeout: the timeout in seconds to wait for the Vertex inference endpoint + to be provisioned and successfully started or updated. + config: the configuration of the model to be deployed with Vertex model deployer. + + Returns: + The VertexDeploymentService object that can be used to interact + with the Vertex inference endpoint. + """ + # create a new service for the new model + service = VertexDeploymentService(uuid=id, config=config) + logger.info( + "Creating an artifact %s with service instance attached as metadata.", + "attached as metadata. If there's an active pipeline and/or model, " + "this artifact will be associated with it.", + ) + service.start(timeout=timeout) + return service + + def perform_deploy_model( + self, + id: UUID, + config: ServiceConfig, + timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, + ) -> BaseService: + """Deploy a model to Vertex AI. + + Args: + id: the UUID of the service to be created. + config: the configuration of the model to be deployed. + timeout: the timeout for the deployment operation. + + Returns: + The ZenML Vertex AI deployment service object. + """ + with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler: + config = cast(VertexDeploymentConfig, config) + service = self._create_deployment_service( + id=id, config=config, timeout=timeout + ) + logger.info( + f"Creating a new Vertex AI deployment service: {service}" + ) + + client = Client() + stack = client.active_stack + stack_metadata = { + component_type.value: component.flavor + for component_type, component in stack.components.items() + } + analytics_handler.metadata = { + "store_type": client.zen_store.type.value, + **stack_metadata, + } + return service + + def perform_stop_model( + self, + service: BaseService, + timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, + force: bool = False, + ) -> BaseService: + """Stop a Vertex AI deployment service. + + Args: + service: The service to stop. + timeout: Timeout in seconds to wait for the service to stop. + force: If True, force the service to stop. + + Returns: + The stopped service. + """ + service.stop(timeout=timeout, force=force) + return service + + def perform_start_model( + self, + service: BaseService, + timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, + ) -> BaseService: + """Start a Vertex AI deployment service. + + Args: + service: The service to start. + timeout: Timeout in seconds to wait for the service to start. + + Returns: + The started service. + """ + service.start(timeout=timeout) + return service + + def perform_delete_model( + self, + service: BaseService, + timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, + force: bool = False, + ) -> None: + """Delete a Vertex AI deployment service. + + Args: + service: The service to delete. + timeout: Timeout in seconds to wait for the service to stop. + force: If True, force the service to stop. + """ + service = cast(VertexDeploymentService, service) + service.stop(timeout=timeout, force=force) + + @staticmethod + def get_model_server_info( # type: ignore[override] + service_instance: "VertexDeploymentService", + ) -> Dict[str, Optional[str]]: + """Get information about the deployed model server. + + Args: + service_instance: The VertexDeploymentService instance. + + Returns: + A dictionary containing information about the model server. + """ + return { + "PREDICTION_URL": service_instance.prediction_url, + "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(), + } diff --git a/src/zenml/integrations/gcp/model_registries/__init__.py b/src/zenml/integrations/gcp/model_registries/__init__.py new file mode 100644 index 00000000000..672c7c19619 --- /dev/null +++ b/src/zenml/integrations/gcp/model_registries/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Vertex AI model registry.""" + +from zenml.integrations.gcp.model_registries.vertex_model_registry import ( + VertexAIModelRegistry +) + +__all__ = ["VertexAIModelRegistry"] diff --git a/src/zenml/integrations/gcp/model_registries/vertex_model_registry.py b/src/zenml/integrations/gcp/model_registries/vertex_model_registry.py new file mode 100644 index 00000000000..fcc57001867 --- /dev/null +++ b/src/zenml/integrations/gcp/model_registries/vertex_model_registry.py @@ -0,0 +1,308 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Vertex AI model registry integration for ZenML.""" + +from datetime import datetime +from typing import Any, Dict, List, Optional, cast + +from google.cloud import aiplatform + +from zenml.integrations.gcp.flavors.vertex_model_registry_flavor import ( + VertexAIModelRegistryConfig, +) +from zenml.integrations.gcp.google_credentials_mixin import ( + GoogleCredentialsMixin, +) +from zenml.logger import get_logger +from zenml.model_registries.base_model_registry import ( + BaseModelRegistry, + ModelRegistryModelMetadata, + ModelVersionStage, + RegisteredModel, + RegistryModelVersion, +) + +logger = get_logger(__name__) + + +class VertexAIModelRegistry(BaseModelRegistry, GoogleCredentialsMixin): + """Register models using Vertex AI.""" + + @property + def config(self) -> VertexAIModelRegistryConfig: + """Returns the config of the model registry. + + Returns: + The configuration. + """ + return cast(VertexAIModelRegistryConfig, self._config) + + def setup_aiplatform(self) -> None: + """Setup the Vertex AI platform.""" + credentials, project_id = self._get_authentication() + aiplatform.init( + project=project_id, + location=self.config.location, + credentials=credentials, + ) + + def register_model( + self, + name: str, + description: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + ) -> RegisteredModel: + """Register a model to the Vertex AI model registry.""" + raise NotImplementedError( + "Vertex AI does not support registering models, you can only register model versions, skipping model registration..." + ) + + def delete_model( + self, + name: str, + ) -> None: + """Delete a model from the Vertex AI model registry.""" + try: + model = aiplatform.Model(model_name=name) + model.delete() + except Exception as e: + raise RuntimeError(f"Failed to delete model: {str(e)}") + + def update_model( + self, + name: str, + description: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + remove_metadata: Optional[List[str]] = None, + ) -> RegisteredModel: + """Update a model in the Vertex AI model registry.""" + raise NotImplementedError( + "Vertex AI does not support updating models, you can only update model versions, skipping model registration..." + ) + + def get_model(self, name: str) -> RegisteredModel: + """Get a model from the Vertex AI model registry.""" + try: + model = aiplatform.Model(model_name=name) + return RegisteredModel( + name=model.name, + description=model.description, + metadata=model.labels, + ) + except Exception as e: + raise RuntimeError(f"Failed to get model: {str(e)}") + + def list_models( + self, + name: Optional[str] = None, + metadata: Optional[Dict[str, str]] = None, + ) -> List[RegisteredModel]: + """List models in the Vertex AI model registry.""" + self.setup_aiplatform() + filter_expr = 'labels.managed_by="zenml"' + if name: + filter_expr = filter_expr + f' AND display_name="{name}"' + if metadata: + for key, value in metadata.items(): + filter_expr = filter_expr + f' AND labels.{key}="{value}"' + try: + models = aiplatform.Model.list(filter=filter_expr) + return [ + RegisteredModel( + name=model.display_name, + description=model.description, + metadata=model.labels, + ) + for model in models + ] + except Exception as e: + raise RuntimeError(f"Failed to list models: {str(e)}") + + def register_model_version( + self, + name: str, + version: Optional[str] = None, + model_source_uri: Optional[str] = None, + description: Optional[str] = None, + metadata: Optional[ModelRegistryModelMetadata] = None, + **kwargs: Any, + ) -> RegistryModelVersion: + """Register a model version to the Vertex AI model registry.""" + self.setup_aiplatform() + metadata_dict = metadata.model_dump() if metadata else {} + serving_container_image_uri = metadata_dict.get( + "serving_container_image_uri", + None + or "europe-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest", + ) + is_default_version = metadata_dict.get("is_default_version", False) + try: + version_info = aiplatform.Model.upload( + artifact_uri=model_source_uri, + display_name=f"{name}_{version}", + serving_container_image_uri=serving_container_image_uri, + description=description, + is_default_version=is_default_version, + labels=metadata_dict, + ) + return RegistryModelVersion( + version=version_info.version_id, + model_source_uri=version_info.resource_name, + model_format="Custom", # Vertex AI doesn't provide this info directly + registered_model=self.get_model(version_info.name), + description=description, + created_at=version_info.create_time, + last_updated_at=version_info.update_time, + stage=ModelVersionStage.NONE, # Vertex AI doesn't have built-in stages + metadata=metadata, + ) + except Exception as e: + raise RuntimeError(f"Failed to register model version: {str(e)}") + + def delete_model_version( + self, + name: str, + version: str, + ) -> None: + """Delete a model version from the Vertex AI model registry.""" + self.setup_aiplatform() + try: + model_version = aiplatform.ModelVersion( + model_name=f"{name}@{version}" + ) + model_version.delete() + except Exception as e: + raise RuntimeError(f"Failed to delete model version: {str(e)}") + + def update_model_version( + self, + name: str, + version: str, + description: Optional[str] = None, + metadata: Optional[ModelRegistryModelMetadata] = None, + remove_metadata: Optional[List[str]] = None, + stage: Optional[ModelVersionStage] = None, + ) -> RegistryModelVersion: + """Update a model version in the Vertex AI model registry.""" + self.setup_aiplatform() + try: + model_version = aiplatform.Model(model_name=f"{name}@{version}") + labels = model_version.labels + if metadata: + metadata_dict = metadata.model_dump() if metadata else {} + for key, value in metadata_dict.items(): + labels[key] = value + if remove_metadata: + for key in remove_metadata: + labels.pop(key, None) + model_version.update(description=description, labels=labels) + return self.get_model_version(name, version) + except Exception as e: + raise RuntimeError(f"Failed to update model version: {str(e)}") + + def get_model_version( + self, name: str, version: str + ) -> RegistryModelVersion: + """Get a model version from the Vertex AI model registry.""" + self.setup_aiplatform() + try: + model_version = aiplatform.Model(model_name=f"{name}@{version}") + return RegistryModelVersion( + version=model_version.version_id, + model_source_uri=model_version.artifact_uri, + model_format="Custom", # Vertex AI doesn't provide this info directly + registered_model=self.get_model(model_version.name), + description=model_version.description, + created_at=model_version.create_time, + last_updated_at=model_version.update_time, + stage=ModelVersionStage.NONE, # Vertex AI doesn't have built-in stages + metadata=ModelRegistryModelMetadata(**model_version.labels), + ) + except Exception as e: + raise RuntimeError(f"Failed to get model version: {str(e)}") + + def list_model_versions( + self, + name: Optional[str] = None, + model_source_uri: Optional[str] = None, + metadata: Optional[ModelRegistryModelMetadata] = None, + stage: Optional[ModelVersionStage] = None, + count: Optional[int] = None, + created_after: Optional[datetime] = None, + created_before: Optional[datetime] = None, + order_by_date: Optional[str] = None, + **kwargs: Any, + ) -> List[RegistryModelVersion]: + """List model versions from the Vertex AI model registry.""" + self.setup_aiplatform() + filter_expr = [] + if name: + filter_expr.append(f"display_name={name}") + if metadata: + for key, value in metadata.dict().items(): + filter_expr.append(f"labels.{key}={value}") + if created_after: + filter_expr.append(f"create_time>{created_after.isoformat()}") + if created_before: + filter_expr.append(f"create_time<{created_before.isoformat()}") + + filter_str = " AND ".join(filter_expr) if filter_expr else None + + try: + model = aiplatform.Model(model_name=name) + versions = model.list_versions(filter=filter_str) + + results = [ + RegistryModelVersion( + version=v.version_id, + model_source_uri=v.artifact_uri, + model_format="Custom", # Vertex AI doesn't provide this info directly + registered_model=self.get_model(v.name), + description=v.description, + created_at=v.create_time, + last_updated_at=v.update_time, + stage=ModelVersionStage.NONE, # Vertex AI doesn't have built-in stages + metadata=ModelRegistryModelMetadata(**v.labels), + ) + for v in versions + ] + + if count: + results = results[:count] + + return results + except Exception as e: + raise RuntimeError(f"Failed to list model versions: {str(e)}") + + def load_model_version( + self, + name: str, + version: str, + **kwargs: Any, + ) -> Any: + """Load a model version from the Vertex AI model registry.""" + try: + model_version = aiplatform.ModelVersion( + model_name=f"{name}@{version}" + ) + return model_version + except Exception as e: + raise RuntimeError(f"Failed to load model version: {str(e)}") + + def get_model_uri_artifact_store( + self, + model_version: RegistryModelVersion, + ) -> str: + """Get the model URI artifact store.""" + return model_version.model_source_uri diff --git a/src/zenml/integrations/gcp/services/__init__.py b/src/zenml/integrations/gcp/services/__init__.py new file mode 100644 index 00000000000..392a48e9694 --- /dev/null +++ b/src/zenml/integrations/gcp/services/__init__.py @@ -0,0 +1,21 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Vertex Service.""" + +from zenml.integrations.gcp.services.vertex_deployment import ( # noqa + VertexDeploymentConfig, + VertexDeploymentService, +) + +__all__ = ["VertexDeploymentConfig", "VertexDeploymentService"] \ No newline at end of file diff --git a/src/zenml/integrations/gcp/services/vertex_deployment.py b/src/zenml/integrations/gcp/services/vertex_deployment.py new file mode 100644 index 00000000000..c8a4e02f0a5 --- /dev/null +++ b/src/zenml/integrations/gcp/services/vertex_deployment.py @@ -0,0 +1,431 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the Vertex AI Deployment service.""" + +import re +import time +from typing import Any, Dict, Generator, List, Optional, Tuple + +from google.api_core import exceptions +from google.cloud import aiplatform +from google.cloud import logging as vertex_logging +from pydantic import BaseModel, Field + +from zenml.client import Client +from zenml.integrations.gcp.flavors.vertex_model_deployer_flavor import ( + VertexBaseConfig, +) +from zenml.logger import get_logger +from zenml.services import ServiceState, ServiceStatus, ServiceType +from zenml.services.service import BaseDeploymentService, ServiceConfig + +logger = get_logger(__name__) + +# Increase timeout for long-running operations +POLLING_TIMEOUT = ( + 1800 # Increased from 1200 to allow for longer deployment times +) +UUID_SLICE_LENGTH: int = 8 + + +def sanitize_vertex_label(value: str) -> str: + """Sanitize a label value to comply with Vertex AI requirements. + + Args: + value: The label value to sanitize + + Returns: + Sanitized label value + """ + # Handle empty string + if not value: + return "" + + # Convert to lowercase + value = value.lower() + # Replace any character that's not lowercase letter, number, dash or underscore + value = re.sub(r"[^a-z0-9\-_]", "-", value) + # Ensure it starts with a letter/number by prepending 'x' if needed + if not value[0].isalnum(): + value = f"x{value}" + # Truncate to 63 chars to stay under limit + return value[:63] + + +class VertexDeploymentConfig(VertexBaseConfig, ServiceConfig): + """Vertex AI service configurations.""" + + def get_vertex_deployment_labels(self) -> Dict[str, str]: + """Generate labels for the VertexAI deployment from the service configuration.""" + labels = self.labels or {} + labels["managed_by"] = "zenml" + if self.pipeline_name: + labels["pipeline-name"] = sanitize_vertex_label(self.pipeline_name) + if self.pipeline_step_name: + labels["step-name"] = sanitize_vertex_label( + self.pipeline_step_name + ) + if self.model_name: + labels["model-name"] = sanitize_vertex_label(self.model_name) + if self.service_name: + labels["service-name"] = sanitize_vertex_label(self.service_name) + return labels + + +class VertexPredictionServiceEndpoint(BaseModel): + """Vertex AI Prediction Service Endpoint.""" + + endpoint_name: str + deployed_model_id: str + endpoint_url: Optional[str] = None + + +class VertexServiceStatus(ServiceStatus): + """Vertex AI service status.""" + + endpoint: Optional[VertexPredictionServiceEndpoint] = None + + +class VertexDeploymentService(BaseDeploymentService): + """Vertex AI model deployment service.""" + + SERVICE_TYPE = ServiceType( + name="vertex-deployment", + type="model-serving", + flavor="vertex", + description="Vertex AI inference endpoint prediction service", + ) + config: VertexDeploymentConfig + status: VertexServiceStatus = Field( + default_factory=lambda: VertexServiceStatus() + ) + + def _initialize_gcp_clients(self) -> None: + """Initialize GCP clients with consistent credentials.""" + # Initialize aiplatform with project and location + from zenml.integrations.gcp.model_deployers.vertex_model_deployer import ( + VertexModelDeployer, + ) + + zenml_client = Client() + model_deployer = zenml_client.active_stack.model_deployer + if not isinstance(model_deployer, VertexModelDeployer): + raise RuntimeError( + "Active model deployer must be Vertex AI Model Deployer" + ) + + # get credentials from model deployer + credentials, project_id = model_deployer._get_authentication() + + # Initialize aiplatform + aiplatform.init( + project=project_id, + location=self.config.location, + credentials=credentials, + ) + + # Initialize logging client + self.logging_client = vertex_logging.Client( + project=project_id, credentials=credentials + ) + + def __init__(self, config: VertexDeploymentConfig, **attrs: Any): + """Initialize the Vertex AI deployment service.""" + super().__init__(config=config, **attrs) + self._initialize_gcp_clients() + + @property + def prediction_url(self) -> Optional[str]: + """The prediction URI exposed by the prediction service.""" + if not self.status.endpoint or not self.status.endpoint.endpoint_url: + return None + + # Construct proper prediction URL + return f"https://{self.config.location}-aiplatform.googleapis.com/v1/{self.status.endpoint.endpoint_url}" + + def get_endpoints(self) -> List[aiplatform.Endpoint]: + """Get all endpoints for the current project and location.""" + try: + # Use proper filtering and pagination + return list( + aiplatform.Endpoint.list( + filter='labels.managed_by="zenml"', + location=self.config.location, + ) + ) + except Exception as e: + logger.error(f"Failed to list endpoints: {e}") + return [] + + def _generate_endpoint_name(self) -> str: + """Generate a unique name for the Vertex AI Inference Endpoint.""" + # Make name more descriptive and conformant to Vertex AI naming rules + sanitized_model_name = re.sub( + r"[^a-zA-Z0-9-]", "-", self.config.model_name.lower() + ) + return f"{sanitized_model_name}-{str(self.uuid)[:UUID_SLICE_LENGTH]}" + + def provision(self) -> None: + """Provision or update remote Vertex AI deployment instance.""" + try: + if self.config.existing_endpoint: + # Use the existing endpoint + endpoint = aiplatform.Endpoint( + endpoint_name=self.config.existing_endpoint, + location=self.config.location, + ) + logger.info( + f"Using existing Vertex AI inference endpoint: {endpoint.resource_name}" + ) + else: + # Create the endpoint + endpoint_name = self._generate_endpoint_name() + endpoint = aiplatform.Endpoint.create( + display_name=endpoint_name, + location=self.config.location, + encryption_spec_key_name=self.config.encryption_spec_key_name, + labels=self.config.get_vertex_deployment_labels(), + ) + logger.info( + f"Vertex AI inference endpoint created: {endpoint.resource_name}" + ) + + # Then get the model + model = aiplatform.Model( + model_name=self.config.model_id, + location=self.config.location, + ) + logger.info( + f"Found existing model to deploy: {model.resource_name} to the endpoint." + ) + if not model: + raise RuntimeError( + f"Model {self.config.model_id} not found in the project." + ) + + # Deploy the model to the endpoint + endpoint.deploy( + model=model, + deployed_model_display_name=f"{endpoint_name}-deployment", + machine_type=self.config.machine_type, + min_replica_count=self.config.min_replica_count, + max_replica_count=self.config.max_replica_count, + accelerator_type=self.config.accelerator_type, + accelerator_count=self.config.accelerator_count, + service_account=self.config.service_account, + explanation_metadata=self.config.explanation_metadata, + explanation_parameters=self.config.explanation_parameters, + sync=self.config.sync, + ) + logger.info( + f"Model {model.resource_name} successfully deployed to endpoint {endpoint.resource_name}" + ) + + # Store both endpoint and deployment information + self.status.endpoint = VertexPredictionServiceEndpoint( + endpoint_name=endpoint.resource_name, + endpoint_url=endpoint.resource_name, + deployed_model_id=model.resource_name, + ) + self.status.update_state(ServiceState.PENDING_STARTUP) + + logger.info( + f"Vertex AI inference endpoint successfully deployed. Pending startup" + f"Endpoint: {endpoint.resource_name}, " + ) + + except Exception as e: + self.status.update_state( + new_state=ServiceState.ERROR, + error=f"Deployment failed: {str(e)}", + ) + raise RuntimeError( + f"An error occurred while provisioning the Vertex AI inference endpoint: {e}" + ) + + def deprovision(self, force: bool = False) -> None: + """Deprovision the remote Vertex AI deployment instance.""" + if not self.status.endpoint: + return + + try: + endpoint = aiplatform.Endpoint( + endpoint_name=self.status.endpoint.endpoint_name, + location=self.config.location, + ) + + # First undeploy the specific model if we have its ID + if self.status.endpoint.deployed_model_id: + try: + endpoint.undeploy( + deployed_model_id=self.status.endpoint.deployed_model_id, + sync=self.config.sync, + ) + except exceptions.NotFound: + logger.warning("Deployed model already undeployed") + + # Then delete the endpoint + endpoint.delete(force=force, sync=self.config.sync) + + self.status.endpoint = None + self.status.update_state(ServiceState.INACTIVE) + + logger.info("Vertex AI Inference Endpoint has been deprovisioned.") + + except exceptions.NotFound: + logger.warning( + "Vertex AI Inference Endpoint not found. It may have been already deleted." + ) + self.status.update_state(ServiceState.INACTIVE) + except Exception as e: + error_msg = ( + f"Failed to deprovision Vertex AI Inference Endpoint: {e}" + ) + logger.error(error_msg) + if not force: + raise RuntimeError(error_msg) + + def check_status(self) -> Tuple[ServiceState, str]: + """Check the current operational state of the Vertex AI deployment.""" + if not self.status.endpoint: + return ServiceState.INACTIVE, "Endpoint not provisioned" + try: + logger.info( + f"Checking status of Vertex AI Inference Endpoint: {self.status.endpoint.endpoint_name}" + ) + endpoint = aiplatform.Endpoint( + endpoint_name=self.status.endpoint.endpoint_name, + location=self.config.location, + ) + + # Get detailed deployment status + deployment = None + if self.status.endpoint.deployed_model_id: + deployments = [ + d + for d in endpoint.list_models() + if d.model == self.status.endpoint.deployed_model_id + ] + if deployments: + deployment = deployments[0] + logger.info( + f"Model {self.status.endpoint.deployed_model_id} was deployed to the endpoint" + ) + + if not deployment: + logger.warning( + "No matching deployment found, endpoint may be inactive or failed to deploy" + ) + return ServiceState.INACTIVE, "No matching deployment found" + + return ServiceState.ACTIVE, "Deployment is ready" + + except exceptions.NotFound: + return ServiceState.INACTIVE, "Endpoint not found" + except Exception as e: + return ServiceState.ERROR, f"Error checking status: {str(e)}" + + def predict(self, instances: List[Any]) -> List[Any]: + """Make a prediction using the service.""" + if not self.is_running: + raise Exception( + "Vertex AI endpoint inference service is not running. " + "Please start the service before making predictions." + ) + + if not self.status.endpoint: + raise Exception("Endpoint information is missing.") + + try: + endpoint = aiplatform.Endpoint( + endpoint_name=self.status.endpoint.endpoint_name, + location=self.config.location, + ) + + # Add proper prediction parameters and handle sync/async + predictions = endpoint.predict( + instances=instances, + deployed_model_id=self.status.endpoint.deployed_model_id.split( + "/" + )[-1] + if self.status.endpoint.deployed_model_id + else None, + timeout=30, # Add reasonable timeout + ) + + if not predictions: + raise RuntimeError("No predictions returned") + + except Exception as e: + logger.error(f"Prediction failed: {e}") + raise RuntimeError(f"Prediction failed: {str(e)}") + + return [predictions] + + def get_logs( + self, follow: bool = False, tail: Optional[int] = None + ) -> Generator[str, bool, None]: + """Retrieve the service logs from Cloud Logging. + + Args: + follow: If True, continuously yield new logs + tail: Number of most recent logs to return + """ + if not self.status.endpoint: + yield "No endpoint deployed yet" + return + + try: + # Create filter for Vertex AI endpoint logs + endpoint_id = self.status.endpoint.endpoint_name.split("/")[-1] + filter_str = ( + f'resource.type="aiplatform.googleapis.com/Endpoint" ' + f'resource.labels.endpoint_id="{endpoint_id}" ' + f'resource.labels.location="{self.config.location}"' + ) + + # Set time range for logs + if tail: + filter_str += f" limit {tail}" + + # Get log iterator + iterator = self.logging_client.list_entries( + filter_=filter_str, order_by=vertex_logging.DESCENDING + ) + + # Yield historical logs + for entry in iterator: + yield f"[{entry.timestamp}] {entry.severity}: {entry.payload.get('message', '')}" + + # If following logs, continue to stream new entries + if follow: + while True: + time.sleep(2) # Poll every 2 seconds + for entry in self.logging_client.list_entries( + filter_=filter_str, + order_by=vertex_logging.DESCENDING, + page_size=1, + ): + yield f"[{entry.timestamp}] {entry.severity}: {entry.payload.get('message', '')}" + + except Exception as e: + error_msg = f"Failed to retrieve logs: {str(e)}" + logger.error(error_msg) + yield error_msg + + @property + def is_running(self) -> bool: + """Check if the service is running.""" + self.update_status() + return self.status.state == ServiceState.ACTIVE diff --git a/src/zenml/integrations/sklearn/materializers/sklearn_materializer.py b/src/zenml/integrations/sklearn/materializers/sklearn_materializer.py index b11f7fe7080..d0b22d99e83 100644 --- a/src/zenml/integrations/sklearn/materializers/sklearn_materializer.py +++ b/src/zenml/integrations/sklearn/materializers/sklearn_materializer.py @@ -1,20 +1,9 @@ -# Copyright (c) ZenML GmbH 2021. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. """Implementation of the sklearn materializer.""" +import os from typing import Any, ClassVar, Tuple, Type +import cloudpickle from sklearn.base import ( BaseEstimator, BiclusterMixin, @@ -29,13 +18,20 @@ ) from zenml.enums import ArtifactType +from zenml.environment import Environment +from zenml.logger import get_logger from zenml.materializers.cloudpickle_materializer import ( + DEFAULT_FILENAME, CloudpickleMaterializer, ) +logger = get_logger(__name__) + +SKLEARN_MODEL_FILENAME = "model.pkl" + class SklearnMaterializer(CloudpickleMaterializer): - """Materializer to read data to and from sklearn.""" + """Materializer to read data to and from sklearn with backward compatibility.""" ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ( BaseEstimator, @@ -50,3 +46,63 @@ class SklearnMaterializer(CloudpickleMaterializer): TransformerMixin, ) ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL + + def load(self, data_type: Type[Any]) -> Any: + """Reads a sklearn model from pickle file with backward compatibility. + + Args: + data_type: The data type of the artifact. + + Returns: + The loaded sklearn model. + """ + # First try to load from model.pkl + model_filepath = os.path.join(self.uri, SKLEARN_MODEL_FILENAME) + artifact_filepath = os.path.join(self.uri, DEFAULT_FILENAME) + + # Check which file exists and load accordingly + if self.artifact_store.exists(model_filepath): + filepath = model_filepath + elif self.artifact_store.exists(artifact_filepath): + logger.info( + f"Loading from legacy filepath {artifact_filepath}. Future saves " + f"will use {model_filepath}" + ) + filepath = artifact_filepath + else: + raise FileNotFoundError( + f"Neither {model_filepath} nor {artifact_filepath} found in artifact store" + ) + + # validate python version before loading + source_python_version = self._load_python_version() + current_python_version = Environment().python_version() + if ( + source_python_version != "unknown" + and source_python_version != current_python_version + ): + logger.warning( + f"Your artifact was materialized under Python version " + f"'{source_python_version}' but you are currently using " + f"'{current_python_version}'. This might cause unexpected " + "behavior since pickle is not reproducible across Python " + "versions. Attempting to load anyway..." + ) + + # Load the model + with self.artifact_store.open(filepath, "rb") as fid: + return cloudpickle.load(fid) + + def save(self, data: Any) -> None: + """Saves a sklearn model to pickle file using the new filename. + + Args: + data: The sklearn model to save. + """ + # Save python version for validation on loading + self._save_python_version() + + # Save using the new filename + filepath = os.path.join(self.uri, SKLEARN_MODEL_FILENAME) + with self.artifact_store.open(filepath, "wb") as fid: + cloudpickle.dump(data, fid) diff --git a/src/zenml/model_deployers/base_model_deployer.py b/src/zenml/model_deployers/base_model_deployer.py index 40a65128f26..814e4f28175 100644 --- a/src/zenml/model_deployers/base_model_deployer.py +++ b/src/zenml/model_deployers/base_model_deployer.py @@ -32,6 +32,7 @@ from zenml.logger import get_logger from zenml.services import BaseService, ServiceConfig from zenml.services.service import BaseDeploymentService +from zenml.services.service_status import ServiceState from zenml.services.service_type import ServiceType from zenml.stack import StackComponent from zenml.stack.flavor import Flavor @@ -180,6 +181,12 @@ def deploy_model( logger.info( f"Existing model server found for {config.name or config.model_name} with the exact same configuration. Returning the existing service named {services[0].config.service_name}." ) + status, _ = services[0].check_status() + if status != ServiceState.ACTIVE: + logger.info( + f"Service found for {config.name or config.model_name} is not active. Starting the service." + ) + services[0].start(timeout=timeout) return services[0] else: # Find existing model server diff --git a/src/zenml/model_registries/base_model_registry.py b/src/zenml/model_registries/base_model_registry.py index 578d97d396c..3e3019dfb6f 100644 --- a/src/zenml/model_registries/base_model_registry.py +++ b/src/zenml/model_registries/base_model_registry.py @@ -62,6 +62,7 @@ class ModelRegistryModelMetadata(BaseModel): model and its development process. """ + _managed_by: str = "zenml" zenml_version: Optional[str] = None zenml_run_name: Optional[str] = None zenml_pipeline_name: Optional[str] = None @@ -70,6 +71,15 @@ class ModelRegistryModelMetadata(BaseModel): zenml_step_name: Optional[str] = None zenml_workspace: Optional[str] = None + @property + def managed_by(self) -> str: + """Returns the managed_by attribute. + + Returns: + The managed_by attribute. + """ + return self._managed_by + @property def custom_attributes(self) -> Dict[str, str]: """Returns a dictionary of custom attributes. diff --git a/src/zenml/services/service.py b/src/zenml/services/service.py index 7b607aae611..0077a3a945f 100644 --- a/src/zenml/services/service.py +++ b/src/zenml/services/service.py @@ -35,6 +35,7 @@ from zenml.console import console from zenml.logger import get_logger +from zenml.model.model import Model from zenml.services.service_endpoint import BaseServiceEndpoint from zenml.services.service_monitor import HTTPEndpointHealthMonitor from zenml.services.service_status import ServiceState, ServiceStatus @@ -109,6 +110,7 @@ class ServiceConfig(BaseTypedModel): pipeline_name: name of the pipeline that spun up the service pipeline_step_name: name of the pipeline step that spun up the service run_name: name of the pipeline run that spun up the service. + zenml_model: the ZenML model object to be deployed. """ name: str = "" @@ -118,6 +120,7 @@ class ServiceConfig(BaseTypedModel): model_name: str = "" model_version: str = "" service_name: str = "" + zenml_model: Optional[Model] = None # TODO: In Pydantic v2, the `model_` is a protected namespaces for all # fields defined under base models. If not handled, this raises a warning.