From bed0d622ec17cebd6af64d774e266a2a164afc3a Mon Sep 17 00:00:00 2001 From: Vanshaj Bhatia Date: Tue, 17 Oct 2023 08:41:28 +0000 Subject: [PATCH 1/2] feat: added support for running python templates on an existing dataproc cluster --- python/bin/start.sh | 60 +++++++++++++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/python/bin/start.sh b/python/bin/start.sh index bef01e3a3..42d38a9de 100755 --- a/python/bin/start.sh +++ b/python/bin/start.sh @@ -53,6 +53,9 @@ OPT_PY_FILES="--py-files=${PROJECT_ROOT_DIR}/${PACKAGE_EGG_FILE}" if [ -n "${SUBNET}" ]; then OPT_SUBNET="--subnet=${SUBNET}" fi +if [ -n "${CLUSTER}" ]; then + OPT_CLUSTER="--cluster=${CLUSTER}" +fi if [ -n "${HISTORY_SERVER_CLUSTER}" ]; then OPT_HISTORY_SERVER_CLUSTER="--history-server-cluster=${HISTORY_SERVER_CLUSTER}" fi @@ -71,6 +74,10 @@ fi if [ -n "${SPARK_PROPERTIES}" ]; then OPT_PROPERTIES="--properties=${SPARK_PROPERTIES}" fi +if [ -z "${JOB_TYPE}" ]; then + JOB_TYPE=SERVERLESS +fi + #if Hbase catalog is passed, then required hbase dependency are copied to staging location and added to jars if [ -n "${CATALOG}" ]; then echo "Downloading Hbase jar dependency" @@ -100,23 +107,46 @@ if [ -n "${HBASE_SITE_PATH}" ]; then fi fi -command=$(cat << EOF -gcloud beta dataproc batches submit pyspark \ - ${PROJECT_ROOT_DIR}/main.py \ - ${OPT_SPARK_VERSION} \ - ${OPT_PROJECT} \ - ${OPT_REGION} \ - ${OPT_JARS} \ - ${OPT_LABELS} \ - ${OPT_DEPS_BUCKET} \ - ${OPT_FILES} \ - ${OPT_PY_FILES} \ - ${OPT_PROPERTIES} \ - ${OPT_SUBNET} \ - ${OPT_HISTORY_SERVER_CLUSTER} \ - ${OPT_METASTORE_SERVICE} +# Construct the command based on JOB_TYPE +if [ "${JOB_TYPE}" == "CLUSTER" ]; then + echo "JOB_TYPE is CLUSTER, so will submit on an existing Dataproc cluster" + check_required_envvar CLUSTER + command=$(cat << EOF + gcloud dataproc jobs submit pyspark \ + ${PROJECT_ROOT_DIR}/main.py \ + ${OPT_PROJECT} \ + ${OPT_REGION} \ + ${OPT_CLUSTER} \ + ${OPT_JARS} \ + ${OPT_LABELS} \ + ${OPT_FILES} \ + ${OPT_PY_FILES} \ + ${OPT_PROPERTIES} EOF ) +elif [ "${JOB_TYPE}" == "SERVERLESS" ]; then + echo "JOB_TYPE is SERVERLESS, so will submit on serverless Spark" + command=$(cat << EOF + gcloud beta dataproc batches submit pyspark \ + ${PROJECT_ROOT_DIR}/main.py \ + ${OPT_SPARK_VERSION} \ + ${OPT_PROJECT} \ + ${OPT_REGION} \ + ${OPT_JARS} \ + ${OPT_LABELS} \ + ${OPT_DEPS_BUCKET} \ + ${OPT_FILES} \ + ${OPT_PY_FILES} \ + ${OPT_PROPERTIES} \ + ${OPT_SUBNET} \ + ${OPT_HISTORY_SERVER_CLUSTER} \ + ${OPT_METASTORE_SERVICE} +EOF +) +else + echo "Unknown JOB_TYPE \"${JOB_TYPE}\"" + exit 1 +fi echo "Triggering Spark Submit job" echo ${command} "$@" From 5ae1999b486f1736f1cb4993dcac1505d0e622b0 Mon Sep 17 00:00:00 2001 From: Vanshaj Bhatia Date: Tue, 17 Oct 2023 08:42:22 +0000 Subject: [PATCH 2/2] docs: enhanced instructions for better clarity on serverless and cluster modes --- python/README.md | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/python/README.md b/python/README.md index 7607b0430..c8c55b227 100644 --- a/python/README.md +++ b/python/README.md @@ -28,11 +28,11 @@ * [TextToBigQuery](/python/dataproc_templates/gcs#text-to-bigquery) -Dataproc Templates (Python - PySpark) submit jobs to Dataproc Serverless using [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark). +Dataproc Templates (Python - PySpark) supports submitting jobs to both Dataproc Serverless using [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark) and Dataproc Cluster using [jobs submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/pyspark) ## Run using PyPi package -In this README, you see instructions on how to submit Dataproc Serverless template jobs. +In this README, you see instructions on how to run the templates. Currently, 3 options are described: - Using bin/start.sh - Using gcloud CLI @@ -110,7 +110,33 @@ coverage run \ coverage report --show-missing ``` -## Submitting templates to Dataproc Serverless + +## Running Templates + +The Dataproc Templates (Python - PySpark) support both serverless and cluster modes. By default, serverless mode is used. To run these templates use the `gcloud` CLI directly or the provided `start.sh` shell script. + +### Serverless Mode (Default) + +Submits job to Dataproc Serverless using the [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark) command. + +### Cluster Mode + +Submits job to a Dataproc Standard cluster using the [jobs submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/pyspark) command. + +To run the templates on an existing cluster, you must additionally specify the `JOB_TYPE=CLUSTER` and `CLUSTER=` environment variables. For example: + +```sh +export GCP_PROJECT=my-gcp-project +export REGION=gcp-region +export GCS_STAGING_LOCATION=gs://my-bucket/temp +export JOB_TYPE=CLUSTER +export CLUSTER=${DATAPROC_CLUSTER_NAME} +./bin/start.sh \ +-- --template HIVETOBIGQUERY +``` + + +## Submitting templates A shell script is provided to: - Build the python package @@ -126,10 +152,6 @@ When submitting, there are 3 types of properties/parameters for the user to prov - The **--log_level** parameter is optional, it defaults to INFO. - Possible choices are the Spark log levels: ["ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"]. - - - -
**bin/start.sh usage**: