diff --git a/python/README.md b/python/README.md
index 7607b0430..c8c55b227 100644
--- a/python/README.md
+++ b/python/README.md
@@ -28,11 +28,11 @@
 * [TextToBigQuery](/python/dataproc_templates/gcs#text-to-bigquery)
 
 
-Dataproc Templates (Python - PySpark) submit jobs to Dataproc Serverless using [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark).
+Dataproc Templates (Python - PySpark) supports submitting jobs to both Dataproc Serverless using [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark) and Dataproc Cluster using [jobs submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/pyspark)
 
 ## Run using PyPi package
 
-In this README, you see instructions on how to submit Dataproc Serverless template jobs.  
+In this README, you see instructions on how to run the templates.  
 Currently, 3 options are described:
 - Using bin/start.sh
 - Using gcloud CLI
@@ -110,7 +110,33 @@ coverage run \
 coverage report --show-missing
 ```
 
-## Submitting templates to Dataproc Serverless
+
+## Running Templates
+
+The Dataproc Templates (Python - PySpark) support both serverless and cluster modes. By default, serverless mode is used. To run these templates use the `gcloud` CLI directly or the provided `start.sh` shell script.
+
+### Serverless Mode (Default)
+
+Submits job to Dataproc Serverless using the [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark) command.
+
+### Cluster Mode
+
+Submits job to a Dataproc Standard cluster using the [jobs submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/pyspark) command.
+
+To run the templates on an existing cluster, you must additionally specify the `JOB_TYPE=CLUSTER` and `CLUSTER=<full clusterId>` environment variables. For example:
+
+```sh
+export GCP_PROJECT=my-gcp-project
+export REGION=gcp-region
+export GCS_STAGING_LOCATION=gs://my-bucket/temp
+export JOB_TYPE=CLUSTER
+export CLUSTER=${DATAPROC_CLUSTER_NAME}
+./bin/start.sh \
+-- --template HIVETOBIGQUERY
+```
+
+
+## Submitting templates
 
 A shell script is provided to:
 - Build the python package
@@ -126,10 +152,6 @@ When submitting, there are 3 types of properties/parameters for the user to prov
   - The **--log_level** parameter is optional, it defaults to INFO.
     - Possible choices are the Spark log levels: ["ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"].
 
-
-
-
-
 <hr>
 
 **bin/start.sh usage**:
diff --git a/python/bin/start.sh b/python/bin/start.sh
index bef01e3a3..42d38a9de 100755
--- a/python/bin/start.sh
+++ b/python/bin/start.sh
@@ -53,6 +53,9 @@ OPT_PY_FILES="--py-files=${PROJECT_ROOT_DIR}/${PACKAGE_EGG_FILE}"
 if [ -n "${SUBNET}" ]; then
   OPT_SUBNET="--subnet=${SUBNET}"
 fi
+if [ -n "${CLUSTER}" ]; then
+  OPT_CLUSTER="--cluster=${CLUSTER}"
+fi
 if [ -n "${HISTORY_SERVER_CLUSTER}" ]; then
   OPT_HISTORY_SERVER_CLUSTER="--history-server-cluster=${HISTORY_SERVER_CLUSTER}"
 fi
@@ -71,6 +74,10 @@ fi
 if [ -n "${SPARK_PROPERTIES}" ]; then
   OPT_PROPERTIES="--properties=${SPARK_PROPERTIES}"
 fi
+if [ -z "${JOB_TYPE}" ]; then
+  JOB_TYPE=SERVERLESS
+fi
+
 #if Hbase catalog is passed, then required hbase dependency are copied to staging location and added to jars
 if [ -n "${CATALOG}" ]; then
   echo "Downloading Hbase jar dependency"
@@ -100,23 +107,46 @@ if [ -n "${HBASE_SITE_PATH}" ]; then
   fi
 fi
 
-command=$(cat << EOF
-gcloud beta dataproc batches submit pyspark \
-    ${PROJECT_ROOT_DIR}/main.py \
-    ${OPT_SPARK_VERSION} \
-    ${OPT_PROJECT} \
-    ${OPT_REGION} \
-    ${OPT_JARS} \
-    ${OPT_LABELS} \
-    ${OPT_DEPS_BUCKET} \
-    ${OPT_FILES} \
-    ${OPT_PY_FILES} \
-    ${OPT_PROPERTIES} \
-    ${OPT_SUBNET} \
-    ${OPT_HISTORY_SERVER_CLUSTER} \
-    ${OPT_METASTORE_SERVICE}
+# Construct the command based on JOB_TYPE
+if [ "${JOB_TYPE}" == "CLUSTER" ]; then
+  echo "JOB_TYPE is CLUSTER, so will submit on an existing Dataproc cluster"
+  check_required_envvar CLUSTER
+  command=$(cat << EOF
+  gcloud dataproc jobs submit pyspark \
+      ${PROJECT_ROOT_DIR}/main.py \
+      ${OPT_PROJECT} \
+      ${OPT_REGION} \
+      ${OPT_CLUSTER} \
+      ${OPT_JARS} \
+      ${OPT_LABELS} \
+      ${OPT_FILES} \
+      ${OPT_PY_FILES} \
+      ${OPT_PROPERTIES}
 EOF
 )
+elif [ "${JOB_TYPE}" == "SERVERLESS" ]; then
+  echo "JOB_TYPE is SERVERLESS, so will submit on serverless Spark"
+  command=$(cat << EOF
+  gcloud beta dataproc batches submit pyspark \
+      ${PROJECT_ROOT_DIR}/main.py \
+      ${OPT_SPARK_VERSION} \
+      ${OPT_PROJECT} \
+      ${OPT_REGION} \
+      ${OPT_JARS} \
+      ${OPT_LABELS} \
+      ${OPT_DEPS_BUCKET} \
+      ${OPT_FILES} \
+      ${OPT_PY_FILES} \
+      ${OPT_PROPERTIES} \
+      ${OPT_SUBNET} \
+      ${OPT_HISTORY_SERVER_CLUSTER} \
+      ${OPT_METASTORE_SERVICE}
+EOF
+)
+else
+  echo "Unknown JOB_TYPE \"${JOB_TYPE}\""
+  exit 1
+fi
 
 echo "Triggering Spark Submit job"
 echo ${command} "$@"