Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Spark DL notebooks with PyTriton on Databricks/Dataproc #483

Merged
merged 13 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 66 additions & 41 deletions examples/ML+DL-Examples/Spark-DL/dl_inference/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# Spark DL Inference Using External Frameworks
# Deep Learning Inference on Spark

Example notebooks for the [predict_batch_udf](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.functions.predict_batch_udf.html#pyspark.ml.functions.predict_batch_udf) function introduced in Spark 3.4.
Example notebooks demonstrating **distributed deep learning inference** using the [predict_batch_udf](https://developer.nvidia.com/blog/distributed-deep-learning-made-easy-with-spark-3-4/) introduced in Spark 3.4.0.
These notebooks also demonstrate integration with [Triton Inference Server](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/index.html), an open-source, GPU-accelerated serving solution for DL.

## Overview
## Contents:
- [Overview](#overview)
- [Running Locally](#running-locally)
- [Running on Cloud](#running-on-cloud-environments)
- [Integration with Triton Inference Server](#inference-with-triton)

This directory contains notebooks for each DL framework (based on their own published examples). The goal is to demonstrate how models trained and saved on single-node machines can be easily used for parallel inferencing on Spark clusters.
## Overview

These notebooks demonstrate how models from external frameworks (Torch, Huggingface, Tensorflow) trained on single-worker machines can be used for large-scale distributed inference on Spark clusters.
For example, a basic model trained in TensorFlow and saved on disk as "mnist_model" can be used in Spark as follows:
```
import numpy as np
Expand All @@ -28,35 +34,33 @@ df = spark.read.parquet("mnist_data")
predictions = df.withColumn("preds", mnist("data")).collect()
```

In this simple case, the `predict_batch_fn` will use TensorFlow APIs to load the model and return a simple `predict` function which operates on numpy arrays. The `predict_batch_udf` will automatically convert the Spark DataFrame columns to the expected numpy inputs.
In this simple case, the `predict_batch_fn` will use TensorFlow APIs to load the model and return a simple `predict` function. The `predict_batch_udf` will handle the data conversion from Spark DataFrame columns into batched numpy inputs.


All notebooks have been saved with sample outputs for quick browsing.
Here is a full list of the notebooks with their published example links:
#### Notebook List

| | Category | Notebook Name | Description | Link
Below is a full list of the notebooks with links to the examples they are based on. All notebooks have been saved with sample outputs for quick browsing.

| | Framework | Notebook Name | Description | Link
| ------------- | ------------- | ------------- | ------------- | -------------
| 1 | PyTorch | Image Classification | Training a model to predict clothing categories in FashionMNIST, including accelerated inference with Torch-TensorRT. | [Link](https://pytorch.org/tutorials/beginner/basics/quickstart_tutorial.html)
| 2 | PyTorch | Regression | Training a model to predict housing prices in the California Housing Dataset, including accelerated inference with Torch-TensorRT. | [Link](https://github.com/christianversloot/machine-learning-articles/blob/main/how-to-create-a-neural-network-for-regression-with-pytorch.md)
| 2 | PyTorch | Housing Regression | Training a model to predict housing prices in the California Housing Dataset, including accelerated inference with Torch-TensorRT. | [Link](https://github.com/christianversloot/machine-learning-articles/blob/main/how-to-create-a-neural-network-for-regression-with-pytorch.md)
| 3 | Tensorflow | Image Classification | Training a model to predict hand-written digits in MNIST. | [Link](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/keras/save_and_load.ipynb)
| 4 | Tensorflow | Feature Columns | Training a model with preprocessing layers to predict likelihood of pet adoption in the PetFinder mini dataset. | [Link](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/structured_data/preprocessing_layers.ipynb)
| 5 | Tensorflow | Keras Metadata | Training ResNet-50 to perform flower recognition on Databricks. | [Link](https://docs.databricks.com/en/_extras/notebooks/source/deep-learning/keras-metadata.html)
| 4 | Tensorflow | Keras Preprocessing | Training a model with preprocessing layers to predict likelihood of pet adoption in the PetFinder mini dataset. | [Link](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/structured_data/preprocessing_layers.ipynb)
| 5 | Tensorflow | Keras Resnet50 | Training ResNet-50 to perform flower recognition from flower images. | [Link](https://docs.databricks.com/en/_extras/notebooks/source/deep-learning/keras-metadata.html)
| 6 | Tensorflow | Text Classification | Training a model to perform sentiment analysis on the IMDB dataset. | [Link](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/keras/text_classification.ipynb)
| 7+8 | HuggingFace | Conditional Generation | Sentence translation using the T5 text-to-text transformer, with notebooks demoing both Torch and Tensorflow. | [Link](https://huggingface.co/docs/transformers/model_doc/t5#t5)
| 9+10 | HuggingFace | Pipelines | Sentiment analysis using Huggingface pipelines, with notebooks demoing both Torch and Tensorflow. | [Link](https://huggingface.co/docs/transformers/quicktour#pipeline-usage)
| 11 | HuggingFace | Sentence Transformers | Sentence embeddings using the SentenceTransformers framework in Torch. | [Link](https://huggingface.co/sentence-transformers)
| 7+8 | HuggingFace | Conditional Generation | Sentence translation using the T5 text-to-text transformer for both Torch and Tensorflow. | [Link](https://huggingface.co/docs/transformers/model_doc/t5#t5)
| 9+10 | HuggingFace | Pipelines | Sentiment analysis using Huggingface pipelines for both Torch and Tensorflow. | [Link](https://huggingface.co/docs/transformers/quicktour#pipeline-usage)
| 11 | HuggingFace | Sentence Transformers | Sentence embeddings using SentenceTransformers in Torch. | [Link](https://huggingface.co/sentence-transformers)

## Running the Notebooks
## Running Locally

If you want to run the notebooks yourself, please follow these instructions.

**Notes**:
- The notebooks require a GPU environment for the executors.
- Please create separate environments for PyTorch and Tensorflow examples as specified below. This will avoid conflicts between the CUDA libraries bundled with their respective versions. The Huggingface examples will have a `_torch` or `_tf` suffix to specify the environment used.
- The PyTorch notebooks include model compilation and accelerated inference with TensorRT. While not included in the notebooks, Tensorflow also supports [integration with TensorRT](https://docs.nvidia.com/deeplearning/frameworks/tf-trt-user-guide/index.html), but may require downgrading the TF version.
- For demonstration purposes, these examples just use a local Spark Standalone cluster with a single executor, but you should be able to run them on any distributed Spark cluster.
To run the notebooks locally, please follow these instructions:

#### Create environment

Each notebook has a suffix `_torch` or `_tf` specifying the environment used.

**For PyTorch:**
```
conda create -n spark-dl-torch python=3.11
Expand All @@ -70,36 +74,57 @@ conda activate spark-dl-tf
pip install -r tf_requirements.txt
```

#### Launch Jupyter + Spark
#### Start Cluster

For demonstration, these instructions just use a local Standalone cluster with a single executor, but they can be run on any distributed Spark cluster. For cloud environments, see [below](#running-on-cloud-environments).

```shell
# Replace with your Spark installation path
export SPARK_HOME=</path/to/spark>
```
# setup environment variables
export SPARK_HOME=/path/to/spark

```shell
# Configure and start cluster
export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=1
export CORES_PER_WORKER=8
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='lab'

# start spark standalone cluster
export SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=1 -Dspark.worker.resource.gpu.discoveryScript=$SPARK_HOME/examples/src/main/scripts/getGpusResources.sh"
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-worker.sh -c ${CORES_PER_WORKER} -m 16G ${MASTER}
```

# start jupyter with pyspark
${SPARK_HOME}/bin/pyspark --master ${MASTER} \
--driver-memory 8G \
--executor-memory 8G \
--conf spark.python.worker.reuse=True
The notebooks are ready to run! Each notebook has a cell to connect to the standalone cluster and create a SparkSession.

# BROWSE to localhost:8888 to view/run notebooks
**Notes**:
- Please create separate environments for PyTorch and Tensorflow notebooks as specified above. This will avoid conflicts between the CUDA libraries bundled with their respective versions.
- `requirements.txt` installs pyspark>=3.4.0. Make sure the installed PySpark version is compatible with your system's Spark installation.
- The notebooks require a GPU environment for the executors.
- The PyTorch notebooks include model compilation and accelerated inference with TensorRT. While not included in the notebooks, Tensorflow also supports [integration with TensorRT](https://docs.nvidia.com/deeplearning/frameworks/tf-trt-user-guide/index.html), but as of writing it is not supported in TF==2.17.0.

# stop spark standalone cluster
${SPARK_HOME}/sbin/stop-worker.sh; ${SPARK_HOME}/sbin/stop-master.sh
**Troubleshooting:**
If you encounter issues starting the Triton server, you may need to link your libstdc++ file to the conda environment, e.g.:
```shell
ln -sf /usr/lib/x86_64-linux-gnu/libstdc++.so.6 ${CONDA_PREFIX}/lib/libstdc++.so.6
```

## Triton Inference Server
## Running on Cloud Environments

We also provide instructions to run the notebooks on CSP Spark environments.
See the instructions for [Databricks](databricks/README.md) and [GCP Dataproc](dataproc/README.md).

## Inference with Triton

The notebooks also demonstrate integration with the [Triton Inference Server](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/index.html), an open-source serving platform for deep learning models, which includes many [features and performance optimizations](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/index.html#triton-major-features) to streamline inference.
The notebooks use [PyTriton](https://github.com/triton-inference-server/pytriton), a Flask-like Python framework that handles communication with the Triton server.

<img src="images/spark-pytriton.png" alt="drawing" width="1000"/>

The example notebooks also demonstrate integration with [Triton Inference Server](https://developer.nvidia.com/nvidia-triton-inference-server), an open-source, GPU-accelerated serving solution for DL.
The diagram above shows how Spark distributes inference tasks to run on the Triton Inference Server, with PyTriton handling request/response communication with the server.

**Note**: Some examples may require special configuration of server as highlighted in the notebooks.
The process looks like this:
- Distribute a PyTriton task across the Spark cluster, instructing each worker to launch a Triton server process.
- Use stage-level scheduling to ensure there is a 1:1 mapping between worker nodes and servers.
- Define a Triton inference function, which contains a client that binds to the local server on a given worker and sends inference requests.
- Wrap the Triton inference function in a predict_batch_udf to launch parallel inference requests using Spark.
- Finally, distribute a shutdown signal to terminate the Triton server processes on each worker.

**Note**: for demonstration purposes, the Triton Inference Server integrations just launch the server in a docker container on the local host, so you will need to [install docker](https://docs.docker.com/engine/install/) on your local host. Most real-world deployments will likely be hosted on remote machines.
For more information on how PyTriton works, see the [PyTriton docs](https://triton-inference-server.github.io/pytriton/latest/high_level_design/).
55 changes: 55 additions & 0 deletions examples/ML+DL-Examples/Spark-DL/dl_inference/databricks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Spark DL Inference on Databricks

**Note**: fields in \<brackets\> require user inputs.

## Setup

1. Install the latest [databricks-cli](https://docs.databricks.com/en/dev-tools/cli/tutorial.html) and configure for your workspace.

2. Specify the path to your Databricks workspace:
```shell
export WS_PATH=</Users/[email protected]>

export NOTEBOOK_DEST=${WS_PATH}/spark-dl/notebook_torch.ipynb
export UTILS_DEST=${WS_PATH}/spark-dl/pytriton_utils.py
export INIT_DEST=${WS_PATH}/spark-dl/init_spark_dl.sh
```
3. Specify the local paths to the notebook you wish to run, the utils file, and the init script.
As an example for a PyTorch notebook:
```shell
export NOTEBOOK_SRC=</path/to/notebook_torch.ipynb>
export UTILS_SRC=</path/to/pytriton_utils.py>
export INIT_SRC=$(pwd)/setup/init_spark_dl.sh
```
4. Specify the framework to torch or tf, corresponding to the notebook you wish to run. Continuing with the PyTorch example:
```shell
export FRAMEWORK=torch
```
This will tell the init script which libraries to install on the cluster.

5. Copy the files to the Databricks Workspace:
```shell
databricks workspace import $NOTEBOOK_DEST --format JUPYTER --file $NOTEBOOK_SRC
databricks workspace import $UTILS_DEST --format AUTO --file $UTILS_SRC
databricks workspace import $INIT_DEST --format AUTO --file $INIT_SRC
```

6. Launch the cluster with the provided script (note that the script specifies **Azure instances** by default; change as needed):
```shell
cd setup
chmod +x start_cluster.sh
./start_cluster.sh
```

OR, start the cluster from the Databricks UI:

- Go to `Compute > Create compute` and set the desired cluster settings.
- Integration with Triton inference server uses stage-level scheduling (Spark>=3.4.0). Make sure to:
- use a cluster with GPU resources
- set a value for `spark.executor.cores`
- ensure that `spark.executor.resource.gpu.amount` = 1
- Under `Advanced Options > Init Scripts`, upload the init script from your workspace.
- Under environment variables, set `FRAMEWORK=torch` or `FRAMEWORK=tf` based on the notebook used.
- For Tensorflow notebooks, we recommend setting the environment variable `TF_GPU_ALLOCATOR=cuda_malloc_async` (especially for Huggingface LLM models), which enables the CUDA driver to implicity release unused memory from the pool.

7. Navigate to the notebook in your workspace and attach it to the cluster. The default cluster name is `spark-dl-inference-$FRAMEWORK`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash
# Copyright (c) 2025, NVIDIA CORPORATION.

set -euxo pipefail

# install requirements
sudo /databricks/python3/bin/pip3 install --upgrade pip

if [[ "${FRAMEWORK}" == "torch" ]]; then
cat <<EOF > temp_requirements.txt
datasets==3.*
transformers
urllib3<2
nvidia-pytriton
torch
torchvision --extra-index-url https://download.pytorch.org/whl/cu121
torch-tensorrt
tensorrt --extra-index-url https://download.pytorch.org/whl/cu121
sentence_transformers
sentencepiece
nvidia-modelopt[all] --extra-index-url https://pypi.nvidia.com
EOF
elif [[ "${FRAMEWORK}" == "tf" ]]; then
cat <<EOF > temp_requirements.txt
datasets==3.*
transformers
urllib3<2
nvidia-pytriton
EOF
else
echo "Please export FRAMEWORK as torch or tf per README"
exit 1
fi

sudo /databricks/python3/bin/pip3 install --upgrade --force-reinstall -r temp_requirements.txt
rm temp_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash
# Copyright (c) 2025, NVIDIA CORPORATION.

set -eo pipefail

# configure arguments
if [[ -z ${INIT_DEST} ]]; then
echo "Please make sure INIT_DEST is exported per README.md"
exit 1
fi

if [[ -z ${FRAMEWORK} ]]; then
echo "Please make sure FRAMEWORK is exported to torch or tf per README.md"
exit 1
fi

json_config=$(cat <<EOF
{
"cluster_name": "spark-dl-inference-${FRAMEWORK}",
"spark_version": "15.4.x-gpu-ml-scala2.12",
"spark_conf": {
"spark.executor.resource.gpu.amount": "1",
"spark.python.worker.reuse": "true",
"spark.task.resource.gpu.amount": "0.125",
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.executor.cores": "8"
},
"node_type_id": "Standard_NC8as_T4_v3",
"driver_node_type_id": "Standard_NC8as_T4_v3",
"spark_env_vars": {
"TF_GPU_ALLOCATOR": "cuda_malloc_async",
"FRAMEWORK": "${FRAMEWORK}"
},
"autotermination_minutes": 60,
"enable_elastic_disk": true,
"init_scripts": [
{
"workspace": {
"destination": "${INIT_DEST}"
}
}
],
"runtime_engine": "STANDARD",
"num_workers": 4
}
EOF
)

databricks clusters create --json "$json_config"
70 changes: 70 additions & 0 deletions examples/ML+DL-Examples/Spark-DL/dl_inference/dataproc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Spark DL Inference on Dataproc

## Setup

**Note**: fields in \<brackets\> require user inputs.

#### Setup GCloud CLI

1. Install the latest [gcloud-cli](https://cloud.google.com/sdk/docs/install) and initialize with `gcloud init`.

2. Configure the following settings:
```shell
export PROJECT=<your_project>
export DATAPROC_REGION=<your_dataproc_region>
export COMPUTE_REGION=<your_compute_region>
export COMPUTE_ZONE=<your_compute_zone>

gcloud config set project ${PROJECT}
gcloud config set dataproc/region ${DATAPROC_REGION}
gcloud config set compute/region ${COMPUTE_REGION}
gcloud config set compute/zone ${COMPUTE_ZONE}
```

#### Copy files to GCS

3. Create a GCS bucket if you don't already have one:
```shell
export GCS_BUCKET=<your_gcs_bucket_name>

gcloud storage buckets create gs://${GCS_BUCKET}
```

4. Specify the local path to the notebook(s) and copy to the GCS bucket.
As an example for a torch notebook:
```shell
export SPARK_DL_HOME=${GCS_BUCKET}/spark-dl

gcloud storage cp </path/to/notebook_name_torch.ipynb> gs://${SPARK_DL_HOME}/notebooks/
```
Repeat this step for any notebooks you wish to run. All notebooks under `gs://${SPARK_DL_HOME}/notebooks/` will be copied to the master node during initialization.

5. Copy the utils file to the GCS bucket.
```shell
gcloud storage cp </path/to/pytriton_utils.py> gs://${SPARK_DL_HOME}/
```

#### Start cluster and run

5. Specify the framework to use (torch or tf), which will determine what libraries to install on the cluster. For example:
```shell
export FRAMEWORK=torch
```
Run the cluster startup script. The script will also retrieve and use the [spark-rapids initialization script](https://github.com/GoogleCloudDataproc/initialization-actions/blob/master/spark-rapids/spark-rapids.sh) to setup GPU resources.
```shell
cd setup
chmod +x start_cluster.sh
./start_cluster.sh
```
By default, the script creates a 4 node GPU cluster named `${USER}-spark-dl-inference-${FRAMEWORK}`.

7. Browse to the Jupyter web UI:
- Go to `Dataproc` > `Clusters` > `(Cluster Name)` > `Web Interfaces` > `Jupyter/Lab`

Or, get the link by running this command (under httpPorts > Jupyter/Lab):
```shell
gcloud dataproc clusters describe ${CLUSTER_NAME} --region=${COMPUTE_REGION}
```

8. Open and run the notebook interactively with the **Python 3 kernel**.
The notebooks can be found under `Local Disk/spark-dl-notebooks` on the master node (folder icon on the top left > Local Disk).
Loading