diff --git a/pom.xml.elasticsearch-2.0.0 b/pom.xml.elasticsearch-2.0.0
new file mode 100644
index 0000000..830b14a
--- /dev/null
+++ b/pom.xml.elasticsearch-2.0.0
@@ -0,0 +1,124 @@
+
+ 4.0.0
+ com.amazonaws
+ amazon-kinesis-connectors
+ jar
+ Amazon Kinesis Connector Library
+ 1.2.0
+ The Amazon Kinesis Connector Library helps Java developers integrate Amazon Kinesis with other AWS and non-AWS services.
+ https://aws.amazon.com/kinesis
+
+
+ https://github.com/awslabs/amazon-kinesis-connectors.git
+
+
+
+
+ Amazon Software License
+ https://aws.amazon.com/asl
+ repo
+
+
+
+
+ 1.4.0
+ 1.9.37
+ 2.0.0
+ 2.6.4
+
+
+
+
+ com.amazonaws
+ amazon-kinesis-client
+ ${amazon-kinesis-client.version}
+
+
+
+ com.amazonaws
+ aws-java-sdk-core
+ ${aws-java-sdk.version}
+
+
+ com.amazonaws
+ aws-java-sdk-kinesis
+ ${aws-java-sdk.version}
+
+
+ com.amazonaws
+ aws-java-sdk-dynamodb
+ ${aws-java-sdk.version}
+ true
+
+
+ com.amazonaws
+ aws-java-sdk-s3
+ ${aws-java-sdk.version}
+ true
+
+
+ org.elasticsearch
+ elasticsearch
+ ${elasticsearch.version}
+ true
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${fasterxml-jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${fasterxml-jackson.version}
+
+
+
+
+
+ amazonwebservices
+ Amazon Web Services
+ https://aws.amazon.com
+
+ developer
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.2
+
+
+ 1.7
+ UTF-8
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.5
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+
+
+
diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/elasticsearch/Elasticsearch2Emitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/elasticsearch/Elasticsearch2Emitter.java
new file mode 100644
index 0000000..c39189f
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/connectors/elasticsearch/Elasticsearch2Emitter.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package com.amazonaws.services.kinesis.connectors.elasticsearch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
+import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
+import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
+
+public class ElasticsearchEmitter implements IEmitter {
+ private static final Log LOG = LogFactory.getLog(ElasticsearchEmitter.class);
+
+ /**
+ * The settings key for the cluster name.
+ *
+ * Defaults to elasticsearch.
+ */
+ private static final String ELASTICSEARCH_CLUSTER_NAME_KEY = "cluster.name";
+
+ /**
+ * The settings key for transport client sniffing. If set to true, this instructs the TransportClient to
+ * find all nodes in the cluster, providing robustness if the original node were to become unavailable.
+ *
+ * Defaults to false.
+ */
+ private static final String ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY = "client.transport.sniff";
+
+ /**
+ * The settings key for ignoring the cluster name. Set to true to ignore cluster name validation
+ * of connected nodes.
+ *
+ * Defaults to false.
+ */
+ private static final String ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY =
+ "client.transport.ignore_cluster_name";
+
+ /**
+ * The settings key for ping timeout. The time to wait for a ping response from a node.
+ *
+ * Default to 5s.
+ */
+ private static final String ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY = "client.transport.ping_timeout";
+
+ /**
+ * The settings key for node sampler interval. How often to sample / ping the nodes listed and connected.
+ *
+ * Defaults to 5s
+ */
+ private static final String ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY =
+ "client.transport.nodes_sampler_interval";
+
+ /**
+ * The Elasticsearch client.
+ */
+ private final TransportClient elasticsearchClient;
+
+ /**
+ * The Elasticsearch endpoint.
+ */
+ private final String elasticsearchEndpoint;
+
+ /**
+ * The Elasticsearch port.
+ */
+ private final int elasticsearchPort;
+
+ /**
+ * The amount of time to wait in between unsuccessful index requests (in milliseconds).
+ * 10 seconds = 10 * 1000 = 10000
+ */
+ private long BACKOFF_PERIOD = 10000;
+
+ public ElasticsearchEmitter(KinesisConnectorConfiguration configuration) {
+ Settings settings =
+ Settings.settingsBuilder()
+ .put(ELASTICSEARCH_CLUSTER_NAME_KEY, configuration.ELASTICSEARCH_CLUSTER_NAME)
+ .put(ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY, configuration.ELASTICSEARCH_TRANSPORT_SNIFF)
+ .put(ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY,
+ configuration.ELASTICSEARCH_IGNORE_CLUSTER_NAME)
+ .put(ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY, configuration.ELASTICSEARCH_PING_TIMEOUT)
+ .put(ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY,
+ configuration.ELASTICSEARCH_NODE_SAMPLER_INTERVAL)
+ .build();
+ elasticsearchEndpoint = configuration.ELASTICSEARCH_ENDPOINT;
+ elasticsearchPort = configuration.ELASTICSEARCH_PORT;
+ LOG.info("Elasticsearch2Emitter using elasticsearch endpoint " + elasticsearchEndpoint + ":" + elasticsearchPort);
+ elasticsearchClient = TransportClient.builder().settings(settings).build();
+ elasticsearchClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(elasticsearchEndpoint, elasticsearchPort)));
+ }
+
+ /**
+ * Emits records to elasticsearch.
+ * 1. Adds each record to a bulk index request, conditionally adding version, ttl or create if they were set in the
+ * transformer.
+ * 2. Executes the bulk request, returning any record specific failures to be retried by the connector library
+ * pipeline, unless
+ * outlined below.
+ *
+ * Record specific failures (noted in the failure.getMessage() string)
+ * - DocumentAlreadyExistsException means the record has create set to true, but a document already existed at the
+ * specific index/type/id.
+ * - VersionConflictEngineException means the record has a specific version number that did not match what existed
+ * in elasticsearch.
+ * To guarantee in order processing by the connector, when putting data use the same partition key for objects going
+ * to the same
+ * index/type/id and set sequence number for ordering.
+ * - In either case, the emitter will assume that the record would fail again in the future and thus will not return
+ * the record to
+ * be retried.
+ *
+ * Bulk request failures
+ * - NoNodeAvailableException means the TransportClient could not connect to the cluster.
+ * - A general Exception catches any other unexpected behavior.
+ * - In either case the emitter will continue making attempts until the issue has been resolved. This is to ensure
+ * that no data
+ * loss occurs and simplifies restarting the application once issues have been fixed.
+ */
+ @Override
+ public List emit(UnmodifiableBuffer buffer) throws IOException {
+ List records = buffer.getRecords();
+ if (records.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ BulkRequestBuilder bulkRequest = elasticsearchClient.prepareBulk();
+ for (ElasticsearchObject record : records) {
+ IndexRequestBuilder indexRequestBuilder =
+ elasticsearchClient.prepareIndex(record.getIndex(), record.getType(), record.getId());
+ indexRequestBuilder.setSource(record.getSource());
+ Long version = record.getVersion();
+ if (version != null) {
+ indexRequestBuilder.setVersion(version);
+ }
+ Long ttl = record.getTtl();
+ if (ttl != null) {
+ indexRequestBuilder.setTTL(ttl);
+ }
+ Boolean create = record.getCreate();
+ if (create != null) {
+ indexRequestBuilder.setCreate(create);
+ }
+ bulkRequest.add(indexRequestBuilder);
+ }
+
+ while (true) {
+ try {
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+
+ BulkItemResponse[] responses = bulkResponse.getItems();
+ List failures = new ArrayList();
+ int numberOfSkippedRecords = 0;
+ for (int i = 0; i < responses.length; i++) {
+ if (responses[i].isFailed()) {
+ LOG.error("Record failed with message: " + responses[i].getFailureMessage());
+ Failure failure = responses[i].getFailure();
+ if (failure.getMessage().contains("DocumentAlreadyExistsException")
+ || failure.getMessage().contains("VersionConflictEngineException")) {
+ numberOfSkippedRecords++;
+ } else {
+ failures.add(records.get(i));
+ }
+ }
+ }
+ LOG.info("Emitted " + (records.size() - failures.size() - numberOfSkippedRecords)
+ + " records to Elasticsearch");
+ if (!failures.isEmpty()) {
+ printClusterStatus();
+ LOG.warn("Returning " + failures.size() + " records as failed");
+ }
+ return failures;
+ } catch (NoNodeAvailableException nnae) {
+ LOG.error("No nodes found at " + elasticsearchEndpoint + ":" + elasticsearchPort + ". Retrying in "
+ + BACKOFF_PERIOD + " milliseconds", nnae);
+ sleep(BACKOFF_PERIOD);
+ } catch (Exception e) {
+ LOG.error("ElasticsearchEmitter threw an unexpected exception ", e);
+ sleep(BACKOFF_PERIOD);
+ }
+ }
+
+ }
+
+ @Override
+ public void fail(List records) {
+ for (ElasticsearchObject record : records) {
+ LOG.error("Record failed: " + record);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ elasticsearchClient.close();
+ }
+
+ private void sleep(long sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ private void printClusterStatus() {
+ ClusterHealthRequestBuilder healthRequestBuilder = elasticsearchClient.admin().cluster().prepareHealth();
+ ClusterHealthResponse response = healthRequestBuilder.execute().actionGet();
+ if (response.getStatus().equals(ClusterHealthStatus.RED)) {
+ LOG.error("Cluster health is RED. Indexing ability will be limited");
+ } else if (response.getStatus().equals(ClusterHealthStatus.YELLOW)) {
+ LOG.warn("Cluster health is YELLOW.");
+ } else if (response.getStatus().equals(ClusterHealthStatus.GREEN)) {
+ LOG.info("Cluster health is GREEN.");
+ }
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/elasticsearch/ElasticsearchEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/elasticsearch/ElasticsearchEmitter.java
index 16ace70..3a15f4a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/connectors/elasticsearch/ElasticsearchEmitter.java
+++ b/src/main/java/com/amazonaws/services/kinesis/connectors/elasticsearch/ElasticsearchEmitter.java
@@ -44,7 +44,7 @@ public class ElasticsearchEmitter implements IEmitter {
/**
* The settings key for the cluster name.
- *
+ *
* Defaults to elasticsearch.
*/
private static final String ELASTICSEARCH_CLUSTER_NAME_KEY = "cluster.name";
@@ -52,7 +52,7 @@ public class ElasticsearchEmitter implements IEmitter {
/**
* The settings key for transport client sniffing. If set to true, this instructs the TransportClient to
* find all nodes in the cluster, providing robustness if the original node were to become unavailable.
- *
+ *
* Defaults to false.
*/
private static final String ELASTICSEARCH_CLIENT_TRANSPORT_SNIFF_KEY = "client.transport.sniff";
@@ -60,7 +60,7 @@ public class ElasticsearchEmitter implements IEmitter {
/**
* The settings key for ignoring the cluster name. Set to true to ignore cluster name validation
* of connected nodes.
- *
+ *
* Defaults to false.
*/
private static final String ELASTICSEARCH_CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME_KEY =
@@ -68,14 +68,14 @@ public class ElasticsearchEmitter implements IEmitter {
/**
* The settings key for ping timeout. The time to wait for a ping response from a node.
- *
+ *
* Default to 5s.
*/
private static final String ELASTICSEARCH_CLIENT_TRANSPORT_PING_TIMEOUT_KEY = "client.transport.ping_timeout";
/**
* The settings key for node sampler interval. How often to sample / ping the nodes listed and connected.
- *
+ *
* Defaults to 5s
*/
private static final String ELASTICSEARCH_CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL_KEY =
@@ -127,7 +127,7 @@ public ElasticsearchEmitter(KinesisConnectorConfiguration configuration) {
* 2. Executes the bulk request, returning any record specific failures to be retried by the connector library
* pipeline, unless
* outlined below.
- *
+ *
* Record specific failures (noted in the failure.getMessage() string)
* - DocumentAlreadyExistsException means the record has create set to true, but a document already existed at the
* specific index/type/id.
@@ -139,7 +139,7 @@ public ElasticsearchEmitter(KinesisConnectorConfiguration configuration) {
* - In either case, the emitter will assume that the record would fail again in the future and thus will not return
* the record to
* be retried.
- *
+ *
* Bulk request failures
* - NoNodeAvailableException means the TransportClient could not connect to the cluster.
* - A general Exception catches any other unexpected behavior.