Skip to content

Commit

Permalink
Add target environment parameter to deployment and inference pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
safoinme committed Nov 28, 2024
1 parent e7fe052 commit 5fa513b
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 97 deletions.
3 changes: 3 additions & 0 deletions train_and_deploy/configs/deployer_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ model:
extra:
notify_on_failure: True


parameters:
target_env: staging
2 changes: 2 additions & 0 deletions train_and_deploy/configs/inference_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ model:
extra:
notify_on_failure: True

parameters:
target_env: staging
5 changes: 4 additions & 1 deletion train_and_deploy/pipelines/batch_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@


@pipeline(on_failure=notify_on_failure)
def gitguarden_batch_inference():
def gitguarden_batch_inference(
target_env: str,
):
"""
Model batch inference pipeline.
Expand Down Expand Up @@ -66,6 +68,7 @@ def gitguarden_batch_inference():
########## Inference stage ##########
inference_predict(
dataset_inf=df_inference,
target_env=target_env,
after=["drift_quality_gate"],
)

Expand Down
9 changes: 5 additions & 4 deletions train_and_deploy/pipelines/local_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@


@pipeline(on_failure=notify_on_failure, enable_cache=False)
def gitguarden_local_deployment():
"""
Model deployment pipeline.
def gitguarden_local_deployment(
target_env: str,
):
"""Model deployment pipeline.
This is a pipeline deploys trained model for future inference.
"""
Expand All @@ -33,7 +34,7 @@ def gitguarden_local_deployment():
########## Deployment stage ##########
# Get the production model artifact
bento = bento_builder()
deployment_deploy(bento=bento)
deployment_deploy(bento=bento, target_env=target_env)

notify_on_success(after=["deployment_deploy"])
### YOUR CODE ENDS HERE ###
2 changes: 1 addition & 1 deletion train_and_deploy/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@

@svc.api(input=input_spec, output=NumpyNdarray())
async def predict(input_arr):
return await gitguarden_runner.predict.async_run(input_arr)
return await gitguarden_runner.predict.async_run(input_arr)
128 changes: 44 additions & 84 deletions train_and_deploy/steps/deployment/deployment_deploy.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,60 @@
# Apache Software License 2.0
# Copyright (c) ZenML GmbH 2022. All Rights Reserved.
#
# Copyright (c) ZenML GmbH 2024. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# https://www.apache.org/licenses/LICENSE-2.0
#

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.

from typing import Optional

from bentoml._internal.bento import bento
from typing_extensions import Annotated
from zenml import (
ArtifactConfig,
Model,
get_step_context,
log_artifact_metadata,
step,
)
from zenml import get_step_context, step
from zenml.client import Client
from zenml.integrations.bentoml.services.bentoml_container_deployment import (
BentoMLContainerDeploymentService,
from zenml.integrations.bentoml.services.bentoml_local_deployment import (
BentoMLLocalDeploymentConfig,
BentoMLLocalDeploymentService,
)
from zenml.integrations.bentoml.services.deployment_type import (
BentoMLDeploymentType,
)
from zenml.integrations.bentoml.steps import bentoml_model_deployer_step
from zenml.logger import get_logger
from zenml.utils import source_utils

logger = get_logger(__name__)


@step
def deployment_deploy(
bento: bento.Bento,
) -> (
Annotated[
Optional[BentoMLContainerDeploymentService],
ArtifactConfig(name="bentoml_deployment", is_deployment_artifact=True),
]
):
"""Predictions step.
This is an example of a predictions step that takes the data in and returns
predicted values.
This step is parameterized, which allows you to configure the step
independently of the step code, before running it in a pipeline.
In this example, the step can be configured to use different input data.
See the documentation for more information:
https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines
Args:
dataset_inf: The inference dataset.
Returns:
The predictions as pandas series
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
if Client().active_stack.orchestrator.flavor == "local":
model = get_step_context().model

# deploy predictor service
bentoml_deployment = bentoml_model_deployer_step.entrypoint(
model_name=model.name, # Name of the model
port=3009, # Port to be used by the http server
production=True, # Deploy the model in production mode
timeout=1000,
bento=bento,
deployment_type=BentoMLDeploymentType.CONTAINER,
)

bentoml_service = Client().get_service(name_id_or_prefix=bentoml_deployment.uuid)

log_artifact_metadata(
metadata={
"service_type": "bentoml",
"status": bentoml_service.state,
"prediction_url": bentoml_service.prediction_url,
"health_check_url": bentoml_service.health_check_url,
"model_uri": model.get_artifact(name="model").uri,
"bento_tag" : bentoml_service.config.get("bento_tag"),
"bentoml_model_image": bentoml_service.config.get("image"),
}
)
else:
logger.warning("Skipping deployment as the orchestrator is not local.")
bentoml_deployment = None
### YOUR CODE ENDS HERE ###
return bentoml_deployment
target_env: str,
) -> Optional[BentoMLLocalDeploymentService]:
# Deploy a model using the MLflow Model Deployer
zenml_client = Client()
step_context = get_step_context()
pipeline_name = step_context.pipeline.name
step_name = step_context.step_run.name
model_deployer = zenml_client.active_stack.model_deployer
bentoml_deployment_config = BentoMLLocalDeploymentConfig(
model_name=step_context.model.name,
model_version=target_env,
description="An example of deploying a model using the MLflow Model Deployer",
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_uri=bento.info.labels.get("model_uri"),
bento_tag=str(bento.tag),
bento_uri=bento.info.labels.get("bento_uri"),
working_dir=source_utils.get_source_root(),
timeout=1500,
)
service = model_deployer.deploy_model(
config=bentoml_deployment_config,
service_type=BentoMLLocalDeploymentService.SERVICE_TYPE,
)
logger.info(
f"The deployed service info: {model_deployer.get_model_server_info(service)}"
)
return service
21 changes: 14 additions & 7 deletions train_and_deploy/steps/inference/inference_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
#


from typing import Optional

from typing import Optional, cast
from zenml.client import Client
import pandas as pd
from typing_extensions import Annotated
from zenml import get_step_context, step
from zenml.integrations.bentoml.services.bentoml_container_deployment import (
BentoMLContainerDeploymentService,
from zenml.integrations.bentoml.services.bentoml_local_deployment import (
BentoMLLocalDeploymentService,
)
from zenml.logger import get_logger

Expand All @@ -32,6 +32,7 @@
@step
def inference_predict(
dataset_inf: pd.DataFrame,
target_env: str,
) -> Annotated[pd.Series, "predictions"]:
"""Predictions step.
Expand All @@ -55,12 +56,18 @@ def inference_predict(
model = get_step_context().model

# get predictor
predictor_service: Optional[BentoMLContainerDeploymentService] = model.load_artifact(
"bentomldeployment"
zenml_client = Client()
model_deployer = zenml_client.active_stack.model_deployer

# fetch existing services with same pipeline name, step name and model name
existing_services = model_deployer.find_model_server(
model_name=model.name,
model_version=target_env,
)
predictor_service = cast(BentoMLLocalDeploymentService, existing_services[0])
if predictor_service is not None:
# run prediction from service
predictions = predictor_service.predict(request=dataset_inf)
predictions = predictor_service.predict(api_endpoint="predict",data=dataset_inf)
else:
logger.warning(
"Predicting from loaded model instead of deployment service "
Expand Down

0 comments on commit 5fa513b

Please sign in to comment.