Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Spark DL notebooks with PyTriton on Databricks/Dataproc #483

Merged
merged 13 commits into from
Feb 4, 2025

Conversation

rishic3
Copy link
Collaborator

@rishic3 rishic3 commented Jan 16, 2025

Support for running DL Inference notebooks on CSP environments.

  • Refactored Triton sections to use PyTriton, a Python API for the Triton inference server which avoids Docker. Once this PR is merged, Triton sections no longer need to be skipped in the CI pipeline @YanxuanLiu .
  • Updated notebooks with instructions to run on Databricks/Dataproc
  • Updated Torch notebooks with best practices for ahead-of-time TensorRT compilation.
  • Cleaned up README, removing instructions to start Jupyter with PySpark (we need a cell to attach to standalone for CI/CD anyway, so hoping to reduce confusion for user).

Notebook outputs are saved from running locally, but all notebooks were tested on Databricks/Dataproc.

@rishic3 rishic3 marked this pull request as ready for review January 17, 2025 00:36
Copy link
Collaborator

@eordentlich eordentlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. A few comments.

In a future optimization we can look at something like https://github.com/triton-inference-server/client/blob/main/src/python/examples/simple_http_cudashm_client.py or for regular shm to reduce data copy (if I'm interpreting these correctly).

sudo /databricks/python3/bin/pip3 install --upgrade --force-reinstall -r temp_requirements.txt
rm temp_requirements.txt

set +x
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a carriage return at the end of last line in all files this symbol appears.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted, also merged the tf/torch scripts into one for convenience.

"df = spark.read.parquet(\"imdb_test\").limit(100).cache()"
"def _use_stage_level_scheduling(spark, rdd):\n",
"\n",
" if spark.version < \"3.4.0\":\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check probably not needed since predict_batch_udf is also not in spark < 3.4

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"metadata": {},
"outputs": [],
"source": [
"df = spark.read.parquet(data_path).limit(256).repartition(8)"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is limit and repartition needed? And is this the right order? And why these numbers? A comment might be in order. Propagate any changes to other notebooks.

Copy link
Collaborator Author

@rishic3 rishic3 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was intended to test the minimal scenario of 1 batch per task—especially with tensorflow, too high of a number can be really slow (>1 min). (In previous versions we were limiting to 100 rows: https://github.com/NVIDIA/spark-rapids-examples/blob/branch-23.06/examples/ML%2BDL-Examples/Spark-DL/dl_inference/huggingface/conditional_generation.ipynb?short_path=d3949f8#L1208)

]
},
{
"cell_type": "code",
"execution_count": 56,
"execution_count": null,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, spark.stop() below might be bad for databricks. It puts the cluster in a bad state. (at least in older versions like 13.3 from what I've seen).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, issue persists on latest runtime - addressed

"def stop_triton(it):\n",
" import docker\n",
" import time\n",
"def stop_triton(pids):\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this along with all the other triton related code that is common across the notebooks be moved to a single python file triton_utils.py that gets shipped via pyfiles with each Spark job and then imported in the notebooks? Would avoid a lot of repetition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@rishic3
Copy link
Collaborator Author

rishic3 commented Jan 27, 2025

Looks good overall. A few comments.

In a future optimization we can look at something like https://github.com/triton-inference-server/client/blob/main/src/python/examples/simple_http_cudashm_client.py or for regular shm to reduce data copy (if I'm interpreting these correctly).

Good idea, will definitely follow-up with this improvement. Note per pytriton team—with shm, there still will be an additional inter-process data copy (until Triton 3 release):
shm -> python backend -> (copy input) -> pytriton server -> (copy output) -> python backend -> shm
but per their benchmarks this is a few ms of latency (for ~4MB inputs — with larger inputs it might be more significant but still likely within the range of noise).

Copy link
Collaborator

@eordentlich eordentlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments.

fi

sudo /databricks/python3/bin/pip3 install --upgrade --force-reinstall -r temp_requirements.txt
rm temp_requirements.txt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like carriage returns still needed at end of last lines in some files.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

" return [True]\n",
"\n",
"nodeRDD.barrier().mapPartitions(stop_triton).collect()"
"shutdownRDD = sc.parallelize(list(range(num_nodes)), num_nodes)\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, benefit of leaving shutdownRDD ... out of stop_triton utility fn?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in separate PR.

"%%time\n",
"preds = df1.withColumn(\"preds\", generate(\"input\"))\n",
"results = preds.collect()"
"pids = nodeRDD.barrier().mapPartitions(lambda _: start_triton(triton_server_fn=triton_server,\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any benefit to not having nodeRdd.barrier... as part of start_triton utility?

Copy link
Collaborator Author

@rishic3 rishic3 Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I've implemented this along with organizing the utils into a "ServerManager" class (since we are passing around a bunch of redundant parameters), but I think it warrants a separate PR. Will follow-up with it.

"rm -rf models\n",
"mkdir -p models\n",
"cp -r models_config/hf_generation_tf models\n",
"sc.addPyFile(\"https://raw.githubusercontent.com/NVIDIA/spark-rapids-examples/branch-25.02/examples/ML%2BDL-Examples/Spark-DL/dl_inference/pytriton_utils.py\")\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to upload this file from checkout repo in setup instructions vs hard coding a version/link here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does it work to just have the file in the notebooks directory? Only needed on the driver.
Should work on databricks and dataproc but not on EMR (since the latter runs driver in cluster mode for notebooks).

Copy link
Collaborator

@eordentlich eordentlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few more.

return [True]
time.sleep(5)

return [False]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

carriage return

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah didn't know git shows missing returns in the file diff; will check there next time

ipykernel
urllib3<2
nvidia-pytriton
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more

"metadata": {},
"outputs": [],
"source": [
"if on_standalone:\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid this special case, possible to add symbolic link in each notebook directory to the pytriton_utils.py file? these can be checked into git

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool didn't know you could do that; done

task_gpus = 1.0
treqs = TaskResourceRequests().cpus(task_cores).resource("gpu", task_gpus)
rp = ResourceProfileBuilder().require(treqs).build
print(f"Reqesting stage-level resources: (cores={task_cores}, gpu={task_gpus})")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reqesting -> Requesting throughout several places.

Copy link
Collaborator

@eordentlich eordentlich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@rishic3 rishic3 merged commit ae292de into NVIDIA:branch-25.02 Feb 4, 2025
3 checks passed
@rishic3 rishic3 deleted the dl-pytriton branch February 4, 2025 17:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants