Skip to content

Commit

Permalink
feat!: Allow the Elasticsearch Authorization using API Key (#1019)
Browse files Browse the repository at this point in the history
* Allow authentication using API key

* feat: Allow the Elasticseach Authorization using API Key

---------

Co-authored-by: Raj Patel <[email protected]>
  • Loading branch information
rohilla-anuj and rajc242 authored Nov 9, 2024
1 parent 6c6e005 commit 4ecbfa6
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 44 deletions.
6 changes: 3 additions & 3 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
- [BigQueryToGCS](/python/dataproc_templates/bigquery#bigquery-to-gcs) (blogpost [link](https://medium.com/google-cloud/moving-data-from-bigquery-to-gcs-using-gcp-dataproc-serverless-and-pyspark-f6481b86bcd1))
- [CassandraToBigquery](/python/dataproc_templates/cassandra#cassandra-to-bigquery)
- [CassandraToGCS](/python/dataproc_templates/cassandra#cassandra-to-gcs) (blogpost [link](https://medium.com/google-cloud/export-data-from-cassandra-to-google-cloud-storage-using-dataproc-serverless-2569a00e17fe))
- [ElasticsearchToBigQuery](/python/dataproc_templates/elasticsearch#elasticsearch-to-bq)
- [ElasticsearchToBigtable](/python/dataproc_templates/elasticsearch#elasticsearch-to-bigtable)
- [ElasticsearchToGCS](/python/dataproc_templates/elasticsearch#elasticsearch-to-gcs)
- [ElasticsearchToBigQuery](/python/dataproc_templates/elasticsearch#elasticsearch-to-bq) (blogpost [link](https://medium.com/@anujrohilla197/exporting-data-from-elasticsearch-to-bigquery-using-pyspark-on-dataproc-serverless-47633f620ce3))
- [ElasticsearchToBigtable](/python/dataproc_templates/elasticsearch#elasticsearch-to-bigtable) (blogpost [link](https://medium.com/@anujrohilla197/exporting-data-from-elasticsearch-to-bigtable-using-pyspark-on-dataproc-serverless-d06f4b124b8a))
- [ElasticsearchToGCS](/python/dataproc_templates/elasticsearch#elasticsearch-to-gcs) (blogpost [link](https://medium.com/@anujrohilla197/exporting-data-from-elasticsearch-to-gcs-using-pyspark-on-dataproc-serverless-c6ca5a93e05e))
- [GCSToBigQuery](/python/dataproc_templates/gcs#gcs-to-bigquery) (blogpost [link](https://medium.com/@ppaglilla/getting-started-with-dataproc-serverless-pyspark-templates-e32278a6a06e))
- [GCSToBigTable](/python/dataproc_templates/gcs#gcs-to-bigtable) (blogpost [link](https://medium.com/google-cloud/load-data-from-gcs-to-bigtable-with-gcp-dataproc-serverless-3862399718d2))
- [GCSToGCS](/python/dataproc_templates/gcs#gcs-to-gcs---sql-transformation)(blogpost [link](https://medium.com/@ankuljain/migrate-gcs-to-gcs-using-dataproc-serverless-3b7b0f6ad6b9))
Expand Down
41 changes: 24 additions & 17 deletions python/dataproc_templates/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The template can support the Elasticsearch versions >= 7.12.0, using the appropr
- `es.gcs.input.index`: Elasticsearch Input Index Name (format: <index>/<type>)
- `es.gcs.input.user`: Elasticsearch Username
- `es.gcs.input.password`: Elasticsearch Password
- `es.gcs.input.api.key`: API Key for Elasticsearch Authorization
- `es.gcs.output.format`: Cloud Storage Output File Format (one of: avro,parquet,csv,json)
- `es.gcs.output.location`: Cloud Storage Location to put Output Files (format: `gs://BUCKET/...`)
- `es.gcs.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)
Expand Down Expand Up @@ -45,7 +46,6 @@ The template can support the Elasticsearch versions >= 7.12.0, using the appropr
- `es.gcs.input.es.scroll.size`: Number of results/items/documents returned per scroll request on each executor/worker/task (default 1000)
- `es.gcs.input.es.scroll.limit`: Number of total results/items returned by each individual scroll. A negative value indicates that all documents that match should be returned (default -1)
- `es.gcs.input.es.action.heart.beat.lead`: The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart (default 15s)
- `es.gcs.input.es.net.http.header.Authorization`: API Key for Elasticsearch Authorization
- `es.gcs.input.es.net.ssl`: Enable SSL (default false)
- `es.gcs.input.es.net.ssl.cert.allow.self.signed`: Whether or not to allow self signed certificates (default false)
- `es.gcs.input.es.net.ssl.protocol`: SSL protocol to be used (default TLS)
Expand Down Expand Up @@ -84,6 +84,8 @@ The template can support the Elasticsearch versions >= 7.12.0, using the appropr
- `es.gcs.output.timestampformat`: Sets the string that indicates a timestamp with timezone format
- `es.gcs.output.timestampntzformat`: Sets the string that indicates a timestamp without timezone format

**Note:** Make sure that either ```es.gcs.input.api.key``` or both ```es.gcs.input.user``` and ```es.gcs.input.password``` is provided. Setting or not setting all three properties at the same time will throw an error.

## Usage

```
Expand All @@ -92,8 +94,9 @@ $ python main.py --template ELASTICSEARCHTOGCS --help
usage: main.py [-h]
--es.gcs.input.node ES.GCS.INPUT.NODE
--es.gcs.input.index ES.GCS.INPUT.INDEX
--es.gcs.input.user es.gcs.input.user
--es.gcs.input.password es.gcs.input.password
--es.gcs.input.user ES.GCS.INPUT.USER
--es.gcs.input.password ES.GCS.INPUT.PASSWORD
--es.gcs.input.api.key ES.GCS.INPUT.API.KEY
--es.gcs.output.format {avro,parquet,csv,json}
--es.gcs.output.location ES.GCS.OUTPUT.LOCATION
[--es.gcs.input.es.nodes.path.prefix ES.GCS.INPUT.ES.NODES.PATH.PREFIX]
Expand Down Expand Up @@ -122,7 +125,6 @@ usage: main.py [-h]
[--es.gcs.input.es.scroll.size ES.GCS.INPUT.ES.SCROLL.SIZE]
[--es.gcs.input.es.scroll.limit ES.GCS.INPUT.ES.SCROLL.LIMIT]
[--es.gcs.input.es.action.heart.beat.lead ES.GCS.INPUT.ES.ACTION.HEART.BEAT.LEAD]
[--es.gcs.input.es.net.http.header.Authorization ES.GCS.INPUT.ES.NET.HTTP.HEADER.AUTHORIZATION]
[--es.gcs.input.es.net.ssl ES.GCS.INPUT.ES.NET.SSL]
[--es.gcs.input.es.net.ssl.cert.allow.self.signed ES.GCS.INPUT.ES.NET.SSL.CERT.ALLOW.SELF.SIGNED]
[--es.gcs.input.es.net.ssl.protocol ES.GCS.INPUT.ES.NET.SSL.PROTOCOL]
Expand Down Expand Up @@ -171,6 +173,8 @@ options:
Elasticsearch Username
--es.gcs.input.password ES.GCS.INPUT.PASSWORD
Elasticsearch Password
--es.gcs.input.api.key ES.GCS.INPUT.API.KEY
API Key for Elasticsearch Authorization
--es.gcs.input.es.nodes.path.prefix ES.GCS.INPUT.ES.NODES.PATH.PREFIX
Prefix to add to all requests made to Elasticsearch
--es.gcs.input.es.query ES.GCS.INPUT.ES.QUERY
Expand Down Expand Up @@ -223,8 +227,6 @@ options:
Number of total results/items returned by each individual scroll. A negative value indicates that all documents that match should be returned
--es.gcs.input.es.action.heart.beat.lead ES.GCS.INPUT.ES.ACTION.HEART.BEAT.LEAD
The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart
--es.gcs.input.es.net.http.header.Authorization ES.GCS.INPUT.ES.NET.HTTP.HEADER.AUTHORIZATION
API Key for Elasticsearch Authorization
--es.gcs.input.es.net.ssl ES.GCS.INPUT.ES.NET.SSL
Enable SSL
--es.gcs.input.es.net.ssl.cert.allow.self.signed ES.GCS.INPUT.ES.NET.SSL.CERT.ALLOW.SELF.SIGNED
Expand Down Expand Up @@ -319,7 +321,7 @@ The template can support the Elasticsearch versions >= 7.12.0

```
export GCP_PROJECT=my-project
export JARS="gs://spark-lib/elasticsearch/elasticsearch-spark-30_2.12-8.11.4.jar"
export JARS="gs://<your_bucket_to_store_dependencies>/elasticsearch-spark-30_2.12-8.11.4.jar"
export GCS_STAGING_LOCATION="gs://my-bucket"
export REGION=us-central1
export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet
Expand Down Expand Up @@ -358,6 +360,7 @@ This template has been tested with the following versions of the above mentioned
- `es.bq.input.index`: Elasticsearch Input Index Name (format: <index>/<type>)
- `es.bq.input.user`: Elasticsearch Username
- `es.bq.input.password`: Elasticsearch Password
- `es.bq.input.api.key`: API Key for Elasticsearch Authorization
- `es.bq.output.dataset`: BigQuery dataset id (format: Dataset_id)
- `es.bq.output.table`: BigQuery table name (format: Table_name)
- `es.bq.temp.bucket.name`: Temporary bucket for the Spark BigQuery connector
Expand Down Expand Up @@ -390,7 +393,6 @@ This template has been tested with the following versions of the above mentioned
- `es.bq.input.es.scroll.size`: Number of results/items/documents returned per scroll request on each executor/worker/task (default 1000)
- `es.bq.input.es.scroll.limit`: Number of total results/items returned by each individual scroll. A negative value indicates that all documents that match should be returned (default -1)
- `es.bq.input.es.action.heart.beat.lead`: The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart (default 15s)
- `es.bq.input.es.net.http.header.Authorization`: API Key for Elasticsearch Authorization
- `es.bq.input.es.net.ssl`: Enable SSL (default false)
- `es.bq.input.es.net.ssl.cert.allow.self.signed`: Whether or not to allow self signed certificates (default false)
- `es.bq.input.es.net.ssl.protocol`: SSL protocol to be used (default TLS)
Expand All @@ -413,6 +415,8 @@ This template has been tested with the following versions of the above mentioned
- `es.bq.flatten.array.fields`: Flatten the n-D array fields to 1-D array fields, it needs es.bq.flatten.struct.fields option to be passed
- `es.bq.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)

**Note:** Make sure that either ```es.bq.input.api.key``` or both ```es.bq.input.user``` and ```es.bq.input.password``` is provided. Setting or not setting all three properties at the same time will throw an error.

## Usage

```
Expand All @@ -422,6 +426,7 @@ usage: main.py [-h]
--es.bq.input.index ES.BQ.INPUT.INDEX
--es.bq.input.user ES.BQ.INPUT.USER
--es.bq.input.password ES.BQ.INPUT.PASSWORD
--es.bq.input.api.key ES.BQ.INPUT.API.KEY
--es.bq.output.dataset ES.BQ.OUTPUT.DATASET
--es.bq.output.table ES.BQ.OUTPUT.TABLE
--es.bq.temp.bucket.name ES.BQ.TEMP.BUCKET.NAME
Expand Down Expand Up @@ -452,7 +457,6 @@ usage: main.py [-h]
[--es.bq.input.es.scroll.size ES.BQ.INPUT.ES.SCROLL.SIZE]
[--es.bq.input.es.scroll.limit ES.BQ.INPUT.ES.SCROLL.LIMIT]
[--es.bq.input.es.action.heart.beat.lead ES.BQ.INPUT.ES.ACTION.HEART.BEAT.LEAD]
[--es.bq.input.es.net.http.header.Authorization ES.BQ.INPUT.ES.NET.HTTP.HEADER.AUTHORIZATION]
[--es.bq.input.es.net.ssl ES.BQ.INPUT.ES.NET.SSL]
[--es.bq.input.es.net.ssl.cert.allow.self.signed ES.BQ.INPUT.ES.NET.SSL.CERT.ALLOW.SELF.SIGNED]
[--es.bq.input.es.net.ssl.protocol ES.BQ.INPUT.ES.NET.SSL.PROTOCOL]
Expand Down Expand Up @@ -484,6 +488,8 @@ options:
Elasticsearch Username
--es.bq.input.password ES.BQ.INPUT.PASSWORD
Elasticsearch Password
--es.bq.input.api.key ES.BQ.INPUT.API.KEY
API Key for Elasticsearch Authorization
--es.bq.input.es.nodes.path.prefix ES.BQ.INPUT.ES.NODES.PATH.PREFIX
Prefix to add to all requests made to Elasticsearch
--es.bq.input.es.query ES.BQ.INPUT.ES.QUERY
Expand Down Expand Up @@ -536,8 +542,6 @@ options:
Number of total results/items returned by each individual scroll. A negative value indicates that all documents that match should be returned
--es.bq.input.es.action.heart.beat.lead ES.BQ.INPUT.ES.ACTION.HEART.BEAT.LEAD
The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart
--es.bq.input.es.net.http.header.Authorization ES.BQ.INPUT.ES.NET.HTTP.HEADER.AUTHORIZATION
API Key for Elasticsearch Authorization
--es.bq.input.es.net.ssl ES.BQ.INPUT.ES.NET.SSL
Enable SSL
--es.bq.input.es.net.ssl.cert.allow.self.signed ES.BQ.INPUT.ES.NET.SSL.CERT.ALLOW.SELF.SIGNED
Expand Down Expand Up @@ -595,7 +599,8 @@ options:

```
export GCP_PROJECT=my-project
export JARS="gs://spark-lib/elasticsearch/elasticsearch-spark-30_2.12-8.11.4.jar"
export GCS_STAGING_LOCATION="gs://my-bucket"
export JARS="gs://<your_bucket_to_store_dependencies>/elasticsearch-spark-30_2.12-8.11.4.jar,gs://<your_bucket_to_store_dependencies>/spark-3.3-bigquery-0.39.0.jar"
export REGION=us-central1
export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet
Expand Down Expand Up @@ -663,6 +668,7 @@ It also requires [DeltaIO dependencies](https://docs.delta.io/latest/releases.ht
- `es.bt.input.index`: Elasticsearch Input Index Name (format: <index>/<type>)
- `es.bt.input.user`: Elasticsearch Username
- `es.bt.input.password`: Elasticsearch Password
- `es.bt.input.api.key`: API Key for Elasticsearch Authorization
- `spark.bigtable.project.id`: GCP project where BigTable instance is running
- `spark.bigtable.instance.id`: BigTable instance id
- `es.bt.catalog.json`: BigTable catalog inline json
Expand Down Expand Up @@ -693,7 +699,6 @@ It also requires [DeltaIO dependencies](https://docs.delta.io/latest/releases.ht
- `es.bt.input.es.scroll.size`: Number of results/items/documents returned per scroll request on each executor/worker/task (default 1000)
- `es.bt.input.es.scroll.limit`: Number of total results/items returned by each individual scroll. A negative value indicates that all documents that match should be returned (default -1)
- `es.bt.input.es.action.heart.beat.lead`: The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart (default 15s)
- `es.bt.input.es.net.http.header.Authorization`: API Key for Elasticsearch Authorization
- `es.bt.input.es.net.ssl`: Enable SSL (default false)
- `es.bt.input.es.net.ssl.cert.allow.self.signed`: Whether or not to allow self signed certificates (default false)
- `es.bt.input.es.net.ssl.protocol`: SSL protocol to be used (default TLS)
Expand All @@ -717,16 +722,19 @@ It also requires [DeltaIO dependencies](https://docs.delta.io/latest/releases.ht
- `spark.bigtable.create.new.table`: Set True if you want to create a BigTable table from catalog. Default value is False means table must be present.
- `spark.bigtable.batch.mutate.size`: BigTable batch mutation size. Maximum allowed value is `100000`. Default is `100`. Rererence [documentation](https://github.com/GoogleCloudDataproc/spark-bigtable-connector/blob/main/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala#L86)
**Note:** Make sure that either ```es.bt.input.api.key``` or both ```es.bt.input.user``` and ```es.bt.input.password``` is provided. Setting or not setting all three properties at the same time will throw an error.
## Usage
```
$ python main.py --template GCSTOBIGTABLE --help
$ python main.py --template ELASTICSEARCHTOBIGTABLE --help

usage: main.py [-h]
--es.bt.input.node ES.BT.INPUT.NODE
--es.bt.input.index ES.BT.INPUT.INDEX
--es.bt.input.user ES.BT.INPUT.USER
--es.bt.input.password ES.BT.INPUT.PASSWORD
--es.bt.input.api.key ES.BT.INPUT.API.KEY
--spark.bigtable.project.id ES.BT.PROJECT.ID
--spark.bigtable.instance.id ES.BT.INSTANCE.ID
--es.bt.catalog.json ES.BT.CATALOG.JSON
Expand Down Expand Up @@ -756,7 +764,6 @@ usage: main.py [-h]
[--es.bt.input.es.scroll.size ES.BT.INPUT.ES.SCROLL.SIZE]
[--es.bt.input.es.scroll.limit ES.BT.INPUT.ES.SCROLL.LIMIT]
[--es.bt.input.es.action.heart.beat.lead ES.BT.INPUT.ES.ACTION.HEART.BEAT.LEAD]
[--es.bt.input.es.net.http.header.Authorization ES.BT.INPUT.ES.NET.HTTP.HEADER.AUTHORIZATION]
[--es.bt.input.es.net.ssl ES.BT.INPUT.ES.NET.SSL]
[--es.bt.input.es.net.ssl.cert.allow.self.signed ES.BT.INPUT.ES.NET.SSL.CERT.ALLOW.SELF.SIGNED]
[--es.bt.input.es.net.ssl.protocol ES.BT.INPUT.ES.NET.SSL.PROTOCOL]
Expand Down Expand Up @@ -791,6 +798,8 @@ options:
Elasticsearch Username
--es.bt.input.password ES.BT.INPUT.PASSWORD
Elasticsearch Password
--es.bt.input.api.key ES.BT.INPUT.API.KEY
API Key for Elasticsearch Authorization
--es.bt.input.es.nodes.path.prefix ES.BT.INPUT.ES.NODES.PATH.PREFIX
Prefix to add to all requests made to Elasticsearch
--es.bt.input.es.query ES.BT.INPUT.ES.QUERY
Expand Down Expand Up @@ -843,8 +852,6 @@ options:
Number of total results/items returned by each individual scroll. A negative value indicates that all documents that match should be returned
--es.bt.input.es.action.heart.beat.lead ES.BT.INPUT.ES.ACTION.HEART.BEAT.LEAD
The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart
--es.bt.input.es.net.http.header.Authorization ES.BT.INPUT.ES.NET.HTTP.HEADER.AUTHORIZATION
API Key for Elasticsearch Authorization
--es.bt.input.es.net.ssl ES.BT.INPUT.ES.NET.SSL
Enable SSL
--es.bt.input.es.net.ssl.cert.allow.self.signed ES.BT.INPUT.ES.NET.SSL.CERT.ALLOW.SELF.SIGNED
Expand Down
Loading

0 comments on commit 4ecbfa6

Please sign in to comment.