From 2d487362d5425e7e0d94473257827f44525bf3db Mon Sep 17 00:00:00 2001 From: XiaoYou201 <58425449+XiaoYou201@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:19:28 +0800 Subject: [PATCH] [INLONG-10644][Sort] Add the elasticsearch-base module to implement elasticsearch connector 6&7 (#10647) --- .../elasticsearch-base/pom.xml | 140 +++++ .../ActionRequestFailureHandler.java | 79 +++ .../BufferingNoOpRequestIndexer.java | 73 +++ .../ElasticsearchApiCallBridge.java | 118 ++++ .../elasticsearch/ElasticsearchSinkBase.java | 515 ++++++++++++++++++ .../ElasticsearchSinkFunction.java | 89 +++ .../sort/elasticsearch/RequestIndexer.java | 79 +++ .../table/AbstractTimeIndexGenerator.java | 40 ++ .../table/ElasticsearchConfiguration.java | 162 ++++++ .../table/ElasticsearchConnectorOptions.java | 169 ++++++ .../table/ElasticsearchValidationUtils.java | 94 ++++ .../elasticsearch/table/IndexGenerator.java | 39 ++ .../table/IndexGeneratorBase.java | 51 ++ .../table/IndexGeneratorFactory.java | 319 +++++++++++ .../elasticsearch/table/KeyExtractor.java | 129 +++++ .../elasticsearch/table/RequestFactory.java | 54 ++ .../table/RowElasticsearchSinkFunction.java | 142 +++++ .../table/StaticIndexGenerator.java | 34 ++ .../util/IgnoringFailureHandler.java | 37 ++ .../util/NoOpFailureHandler.java | 54 ++ .../RetryRejectedExecutionFailureHandler.java | 56 ++ .../sort-flink-v1.18/sort-connectors/pom.xml | 1 + licenses/inlong-sort-connectors/LICENSE | 21 + 23 files changed, 2495 insertions(+) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml new file mode 100644 index 00000000000..709eefdb065 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml @@ -0,0 +1,140 @@ + + + + 4.0.0 + + org.apache.inlong + sort-connectors-v1.18 + 1.14.0-SNAPSHOT + + sort-connector-elasticsearch-base-v1.18 + jar + Apache InLong - Sort-connector-elasticsearch-base + + + ${project.parent.parent.parent.parent.parent.basedir} + 7.10.2 + + + + org.apache.inlong + sort-flink-dependencies-v1.18 + ${project.version} + provided + + + + org.apache.flink + flink-json + ${flink.version} + provided + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + + + + org.ow2.asm + * + + + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${elasticsearch.version} + + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + + org.apache.logging.log4j + log4j-api + provided + + + + org.apache.logging.log4j + log4j-core + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + + shade + + package + + true + + + org.apache.inlong:sort-connector-* + + org/apache/inlong/** + META-INF/services/org.apache.flink.table.factories.Factory + + + + *:* + + log4j.properties + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + + + diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java new file mode 100644 index 00000000000..e014feb86af --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch; + +import org.apache.flink.annotation.PublicEvolving; +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +/** + * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how + * failed {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing + * malformed documents, or simply requesting them to be sent to Elasticsearch again if the failure + * is only temporary. + * + *

Example: + * + *

{@code
+ * private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
+ *
+ * 	@Override
+ * 	void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ * 		if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
+ * 			// full queue; re-add document for indexing
+ * 			indexer.add(action);
+ * 		} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
+ * 			// malformed document; simply drop request without failing sink
+ * 		} else {
+ * 			// for all other failures, fail the sink;
+ * 			// here the failure is simply rethrown, but users can also choose to throw custom exceptions
+ * 			throw failure;
+ * 		}
+ * 	}
+ * }
+ *
+ * }
+ * + *

The above example will let the sink re-add requests that failed due to queue capacity + * saturation and drop requests with malformed documents, without failing the sink. For all other + * failures, the sink will fail. + * + *

Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the + * exact type could not be retrieved through the older version Java client APIs (thus, the types + * will be general {@link Exception}s and only differ in the failure message). In this case, it is + * recommended to match on the provided REST status code. + * + */ +@PublicEvolving +public interface ActionRequestFailureHandler extends Serializable { + + /** + * Handle a failed {@link ActionRequest}. + * + * @param action the {@link ActionRequest} that failed due to the failure + * @param failure the cause of failure + * @param restStatusCode the REST status code of the failure (-1 if none can be retrieved) + * @param indexer request indexer to re-add the failed action, if intended to do so + * @throws Throwable if the sink should fail on this failure, the implementation should rethrow + * the exception or a custom one + */ + void onFailure( + ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) + throws Throwable; +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java new file mode 100644 index 00000000000..00ceba4ff41 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch; + +import org.apache.flink.annotation.Internal; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Collections; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Implementation of a {@link RequestIndexer} that buffers {@link ActionRequest ActionRequests} + * before re-sending them to the Elasticsearch cluster upon request. + */ +@Internal +@NotThreadSafe +class BufferingNoOpRequestIndexer implements RequestIndexer { + + private ConcurrentLinkedQueue bufferedRequests; + + BufferingNoOpRequestIndexer() { + this.bufferedRequests = new ConcurrentLinkedQueue(); + } + + @Override + public void add(DeleteRequest... deleteRequests) { + Collections.addAll(bufferedRequests, deleteRequests); + } + + @Override + public void add(IndexRequest... indexRequests) { + Collections.addAll(bufferedRequests, indexRequests); + } + + @Override + public void add(UpdateRequest... updateRequests) { + Collections.addAll(bufferedRequests, updateRequests); + } + + void processBufferedRequests(RequestIndexer actualIndexer) { + for (ActionRequest request : bufferedRequests) { + if (request instanceof IndexRequest) { + actualIndexer.add((IndexRequest) request); + } else if (request instanceof DeleteRequest) { + actualIndexer.add((DeleteRequest) request); + } else if (request instanceof UpdateRequest) { + actualIndexer.add((UpdateRequest) request); + } + } + + bufferedRequests.clear(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java new file mode 100644 index 00000000000..88742cb4985 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch; + +import org.apache.flink.annotation.Internal; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls + * across different versions. This includes calls to create Elasticsearch clients, handle failed + * item responses, etc. Any incompatible Elasticsearch Java APIs should be bridged using this + * interface. + * + *

Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since + * connecting via an embedded node is allowed, the call bridge will hold reference to the created + * embedded node. Each instance of the sink will hold exactly one instance of the call bridge, and + * state cleanup is performed when the sink is closed. + * + * @param The Elasticsearch client, that implements {@link AutoCloseable}. + */ +@Internal +public interface ElasticsearchApiCallBridge extends Serializable { + + /** + * Creates an Elasticsearch client implementing {@link AutoCloseable}. + * + * @return The created client. + */ + C createClient(); + + /** + * Creates a {@link BulkProcessor.Builder} for creating the bulk processor. + * + * @param client the Elasticsearch client. + * @param listener the bulk processor listener. + * @return the bulk processor builder. + */ + BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener); + + /** + * Extracts the cause of failure of a bulk item action. + * + * @param bulkItemResponse the bulk item response to extract cause of failure + * @return the extracted {@link Throwable} from the response ({@code null} is the response is + * successful). + */ + @Nullable + Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse); + + /** + * Sets the bulk flush interval, in milliseconds on the provided {@link BulkProcessor.Builder}. + * The builder will be later on used to instantiate the actual {@link BulkProcessor}. + * + * @param builder the {@link BulkProcessor.Builder} to configure. + * @param flushIntervalMillis the flush interval in milliseconds. + */ + void configureBulkProcessorFlushInterval( + BulkProcessor.Builder builder, long flushIntervalMillis); + + /** + * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}. The builder + * will be later on used to instantiate the actual {@link BulkProcessor}. + * + * @param builder the {@link BulkProcessor.Builder} to configure. + * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user + * disabled backoff retries). + */ + void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy); + + /** + * Verify the client connection by making a test request/ping to the Elasticsearch cluster. + * + *

Called by {@link ElasticsearchSinkBase#open(org.apache.flink.configuration.Configuration)} + * after creating the client. This makes sure the underlying client is closed if the connection + * is not successful and preventing thread leak. + * + * @param client the Elasticsearch client. + */ + void verifyClientConnection(C client) throws IOException; + + /** + * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary + * compatible. + */ + RequestIndexer createBulkProcessorIndexer( + BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequestsRef); + + /** Perform any necessary state cleanup. */ + default void cleanup() { + // nothing to cleanup by default + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java new file mode 100644 index 00000000000..43754293327 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.rest.RestStatus; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all Flink Elasticsearch Sinks. + * + *

This class implements the common behaviour across Elasticsearch versions, such as the use of + * an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before sending the + * requests to the cluster, as well as passing input records to the user provided {@link + * ElasticsearchSinkFunction} for processing. + * + *

The version specific API calls for different Elasticsearch versions should be defined by a + * concrete implementation of a {@link ElasticsearchApiCallBridge}, which is provided to the + * constructor of this class. This call bridge is used, for example, to create a Elasticsearch + * {@link Client}, handle failed item responses, etc. + * + * @param Type of the elements handled by this sink + * @param Type of the Elasticsearch client, which implements {@link AutoCloseable} + */ +@Internal +public abstract class ElasticsearchSinkBase extends RichSinkFunction + implements + CheckpointedFunction { + + private static final long serialVersionUID = -1007596293618451942L; + + // ------------------------------------------------------------------------ + // Internal bulk processor configuration + // ------------------------------------------------------------------------ + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay"; + + /** Used to control whether the retry delay should increase exponentially or remain constant. */ + @PublicEvolving + public enum FlushBackoffType { + CONSTANT, + EXPONENTIAL + } + + /** + * Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to + * resource constraints (i.e. the client's internal thread pool is full), the backoff policy + * decides how long the bulk processor will wait before the operation is retried internally. + * + *

This is a proxy for version specific backoff policies. + */ + public static class BulkFlushBackoffPolicy implements Serializable { + + private static final long serialVersionUID = -6022851996101826049L; + + // the default values follow the Elasticsearch default settings for BulkProcessor + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL; + private int maxRetryCount = 8; + private long delayMillis = 50; + + public FlushBackoffType getBackoffType() { + return backoffType; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public long getDelayMillis() { + return delayMillis; + } + + public void setBackoffType(FlushBackoffType backoffType) { + this.backoffType = checkNotNull(backoffType); + } + + public void setMaxRetryCount(int maxRetryCount) { + checkArgument(maxRetryCount >= 0); + this.maxRetryCount = maxRetryCount; + } + + public void setDelayMillis(long delayMillis) { + checkArgument(delayMillis >= 0); + this.delayMillis = delayMillis; + } + } + + private final Integer bulkProcessorFlushMaxActions; + private final Integer bulkProcessorFlushMaxSizeMb; + private final Long bulkProcessorFlushIntervalMillis; + private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy; + + // ------------------------------------------------------------------------ + // User-facing API and configuration + // ------------------------------------------------------------------------ + + /** + * The function that is used to construct multiple {@link ActionRequest ActionRequests} from + * each incoming element. + */ + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + /** User-provided handler for failed {@link ActionRequest ActionRequests}. */ + private final ActionRequestFailureHandler failureHandler; + + /** + * If true, the producer will wait until all outstanding action requests have been sent to + * Elasticsearch. + */ + private boolean flushOnCheckpoint = true; + + /** + * Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest + * ActionRequests}. + */ + private transient RequestIndexer requestIndexer; + + /** + * Provided to the {@link ActionRequestFailureHandler} to allow users to re-index failed + * requests. + */ + private transient BufferingNoOpRequestIndexer failureRequestIndexer; + + // ------------------------------------------------------------------------ + // Internals for the Flink Elasticsearch Sink + // ------------------------------------------------------------------------ + + /** Call bridge for different version-specific. */ + private final ElasticsearchApiCallBridge callBridge; + + /** + * Number of pending action requests not yet acknowledged by Elasticsearch. This value is + * maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}. + * + *

This is incremented whenever the user adds (or re-adds through the {@link + * ActionRequestFailureHandler}) requests to the {@link RequestIndexer}. It is decremented for + * each completed request of a bulk request, in {@link BulkProcessor.Listener#afterBulk(long, + * BulkRequest, BulkResponse)} and {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, + * Throwable)}. + */ + private AtomicLong numPendingRequests = new AtomicLong(0); + + /** Elasticsearch client created using the call bridge. */ + private transient C client; + + /** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */ + private transient BulkProcessor bulkProcessor; + + /** + * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown + * in callbacks and the user considered it should fail the sink via the {@link + * ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method. + * + *

Errors will be checked and rethrown before processing each input element, and when the + * sink is closed. + */ + private final AtomicReference failureThrowable = new AtomicReference<>(); + + public ElasticsearchSinkBase( + ElasticsearchApiCallBridge callBridge, + Map userConfig, + ElasticsearchSinkFunction elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { + + this.callBridge = checkNotNull(callBridge); + this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction); + this.failureHandler = checkNotNull(failureHandler); + // we eagerly check if the user-provided sink function and failure handler is serializable; + // otherwise, if they aren't serializable, users will merely get a non-informative error + // message + // "ElasticsearchSinkBase is not serializable" + + checkArgument( + InstantiationUtil.isSerializable(elasticsearchSinkFunction), + "The implementation of the provided ElasticsearchSinkFunction is not serializable. " + + "The object probably contains or references non-serializable fields."); + + checkArgument( + InstantiationUtil.isSerializable(failureHandler), + "The implementation of the provided ActionRequestFailureHandler is not serializable. " + + "The object probably contains or references non-serializable fields."); + + // extract and remove bulk processor related configuration from the user-provided config, + // so that the resulting user config only contains configuration related to the + // Elasticsearch client. + + checkNotNull(userConfig); + + // copy config so we can remove entries without side-effects + userConfig = new HashMap<>(userConfig); + + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { + bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS); + } else { + bulkProcessorFlushMaxActions = null; + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { + bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB); + } else { + bulkProcessorFlushMaxSizeMb = null; + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { + bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS); + } else { + bulkProcessorFlushIntervalMillis = null; + } + + boolean bulkProcessorFlushBackoffEnable = + params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE); + + if (bulkProcessorFlushBackoffEnable) { + this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy(); + + if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) { + bulkProcessorFlushBackoffPolicy.setBackoffType( + FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE))); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) { + bulkProcessorFlushBackoffPolicy.setMaxRetryCount( + params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) { + bulkProcessorFlushBackoffPolicy.setDelayMillis( + params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)); + userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY); + } + + } else { + bulkProcessorFlushBackoffPolicy = null; + } + } + + /** + * Disable flushing on checkpoint. When disabled, the sink will not wait for all pending action + * requests to be acknowledged by Elasticsearch on checkpoints. + * + *

NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT provide + * any strong guarantees for at-least-once delivery of action requests. + */ + public void disableFlushOnCheckpoint() { + this.flushOnCheckpoint = false; + } + + @Override + public void open(Configuration parameters) throws Exception { + client = callBridge.createClient(); + callBridge.verifyClientConnection(client); + bulkProcessor = buildBulkProcessor(new BulkProcessorListener()); + requestIndexer = + callBridge.createBulkProcessorIndexer( + bulkProcessor, flushOnCheckpoint, numPendingRequests); + failureRequestIndexer = new BufferingNoOpRequestIndexer(); + elasticsearchSinkFunction.open(getRuntimeContext()); + } + + @Override + public void invoke(T value, Context context) throws Exception { + checkAsyncErrorsAndRequests(); + elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkAsyncErrorsAndRequests(); + + if (flushOnCheckpoint) { + while (numPendingRequests.get() != 0) { + bulkProcessor.flush(); + checkAsyncErrorsAndRequests(); + } + } + } + + @Override + public void close() throws Exception { + elasticsearchSinkFunction.close(); + if (bulkProcessor != null) { + bulkProcessor.close(); + bulkProcessor = null; + } + + if (client != null) { + client.close(); + client = null; + } + + callBridge.cleanup(); + + // make sure any errors from callbacks are rethrown + checkErrorAndRethrow(); + } + + /** + * Build the {@link BulkProcessor}. + * + *

Note: this is exposed for testing purposes. + */ + @VisibleForTesting + protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { + checkNotNull(listener); + + BulkProcessor.Builder bulkProcessorBuilder = + callBridge.createBulkProcessorBuilder(client, listener); + + // This makes flush() blocking + bulkProcessorBuilder.setConcurrentRequests(0); + + if (bulkProcessorFlushMaxActions != null) { + bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions); + } + + if (bulkProcessorFlushMaxSizeMb != null) { + configureBulkSize(bulkProcessorBuilder); + } + + if (bulkProcessorFlushIntervalMillis != null) { + configureFlushInterval(bulkProcessorBuilder); + } + + // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null + callBridge.configureBulkProcessorBackoff( + bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy); + + return bulkProcessorBuilder.build(); + } + + private void configureBulkSize(BulkProcessor.Builder bulkProcessorBuilder) { + final ByteSizeUnit sizeUnit; + if (bulkProcessorFlushMaxSizeMb == -1) { + // bulk size can be disabled with -1, however the ByteSizeValue constructor accepts -1 + // only with BYTES as the size unit + sizeUnit = ByteSizeUnit.BYTES; + } else { + sizeUnit = ByteSizeUnit.MB; + } + bulkProcessorBuilder.setBulkSize(new ByteSizeValue(bulkProcessorFlushMaxSizeMb, sizeUnit)); + } + + private void configureFlushInterval(BulkProcessor.Builder bulkProcessorBuilder) { + if (bulkProcessorFlushIntervalMillis == -1) { + bulkProcessorBuilder.setFlushInterval(null); + } else { + callBridge.configureBulkProcessorFlushInterval( + bulkProcessorBuilder, bulkProcessorFlushIntervalMillis); + } + } + + private void checkErrorAndRethrow() { + Throwable cause = failureThrowable.get(); + if (cause != null) { + throw new RuntimeException("An error occurred in ElasticsearchSink.", cause); + } + } + + private void checkAsyncErrorsAndRequests() { + checkErrorAndRethrow(); + failureRequestIndexer.processBufferedRequests(requestIndexer); + } + + private class BulkProcessorListener implements BulkProcessor.Listener { + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + BulkItemResponse itemResponse; + Throwable failure; + RestStatus restStatus; + DocWriteRequest actionRequest; + + try { + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems()[i]; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); + if (failure != null) { + restStatus = itemResponse.getFailure().getStatus(); + actionRequest = request.requests().get(i); + if (restStatus == null) { + if (actionRequest instanceof ActionRequest) { + failureHandler.onFailure( + (ActionRequest) actionRequest, + failure, + -1, + failureRequestIndexer); + } else { + throw new UnsupportedOperationException( + "The sink currently only supports ActionRequests"); + } + } else { + if (actionRequest instanceof ActionRequest) { + failureHandler.onFailure( + (ActionRequest) actionRequest, + failure, + restStatus.getStatus(), + failureRequestIndexer); + } else { + throw new UnsupportedOperationException( + "The sink currently only supports ActionRequests"); + } + } + } + } + } catch (Throwable t) { + // fail the sink and skip the rest of the items + // if the failure handler decides to throw an exception + failureThrowable.compareAndSet(null, t); + } + } + + if (flushOnCheckpoint) { + numPendingRequests.getAndAdd(-request.numberOfActions()); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + try { + for (DocWriteRequest writeRequest : request.requests()) { + if (writeRequest instanceof ActionRequest) { + failureHandler.onFailure( + (ActionRequest) writeRequest, failure, -1, failureRequestIndexer); + } else { + throw new UnsupportedOperationException( + "The sink currently only supports ActionRequests"); + } + } + } catch (Throwable t) { + // fail the sink and skip the rest of the items + // if the failure handler decides to throw an exception + failureThrowable.compareAndSet(null, t); + } + + if (flushOnCheckpoint) { + numPendingRequests.getAndAdd(-request.numberOfActions()); + } + } + } + + @VisibleForTesting + long getNumPendingRequests() { + if (flushOnCheckpoint) { + return numPendingRequests.get(); + } else { + throw new UnsupportedOperationException( + "The number of pending requests is not maintained when flushing on checkpoint is disabled."); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java new file mode 100644 index 00000000000..87f334693e4 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +/** + * Creates multiple {@link ActionRequest ActionRequests} from an element in a stream. + * + *

This is used by sinks to prepare elements for sending them to Elasticsearch. + * + *

Example: + * + *

{@code
+ * 				private static class TestElasticSearchSinkFunction implements
+ * 					ElasticsearchSinkFunction> {
+ *
+ * 				public IndexRequest createIndexRequest(Tuple2 element) {
+ * 					Map json = new HashMap<>();
+ * 					json.put("data", element.f1);
+ *
+ * 					return Requests.indexRequest()
+ * 						.index("my-index")
+ * 						.type("my-type")
+ * 						.id(element.f0.toString())
+ * 						.source(json);
+ * 					}
+ *
+ * 			public void process(Tuple2 element, RuntimeContext ctx, RequestIndexer indexer) {
+ * 				indexer.add(createIndexRequest(element));
+ * 			}
+ * 	}
+ *
+ * }
+ * + * @param The type of the element handled by this {@code ElasticsearchSinkFunction} + */ +@PublicEvolving +public interface ElasticsearchSinkFunction extends Serializable, Function { + + /** + * Initialization method for the function. It is called once before the actual working process + * methods, if {@link #open(RuntimeContext)} is not overridden. + */ + default void open() throws Exception { + } + + /** + * Initialization method for the function. It is called once before the actual working process + * methods. + */ + default void open(RuntimeContext ctx) throws Exception { + open(); + } + + /** Tear-down method for the function. It is called when the sink closes. */ + default void close() throws Exception { + } + + /** + * Process the incoming element to produce multiple {@link ActionRequest ActionsRequests}. The + * produced requests should be added to the provided {@link RequestIndexer}. + * + * @param element incoming element to process + * @param ctx runtime context containing information about the sink instance + * @param indexer request indexer that {@code ActionRequest} should be added to + */ + void process(T element, RuntimeContext ctx, RequestIndexer indexer); +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java new file mode 100644 index 00000000000..ff9ab406951 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch; + +import org.apache.flink.annotation.PublicEvolving; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; + +/** + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare them + * for sending to an Elasticsearch cluster. + */ +@PublicEvolving +public interface RequestIndexer { + + /** + * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to + * Elasticsearch. + * + * @param actionRequests The multiple {@link ActionRequest} to add. + * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@link UpdateRequest} + */ + @Deprecated + default void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (actionRequest instanceof IndexRequest) { + add((IndexRequest) actionRequest); + } else if (actionRequest instanceof DeleteRequest) { + add((DeleteRequest) actionRequest); + } else if (actionRequest instanceof UpdateRequest) { + add((UpdateRequest) actionRequest); + } else { + throw new IllegalArgumentException( + "RequestIndexer only supports Index, Delete and Update requests"); + } + } + } + + /** + * Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to + * Elasticsearch. + * + * @param deleteRequests The multiple {@link DeleteRequest} to add. + */ + void add(DeleteRequest... deleteRequests); + + /** + * Add multiple {@link IndexRequest} to the indexer to prepare for sending requests to + * Elasticsearch. + * + * @param indexRequests The multiple {@link IndexRequest} to add. + */ + void add(IndexRequest... indexRequests); + + /** + * Add multiple {@link UpdateRequest} to the indexer to prepare for sending requests to + * Elasticsearch. + * + * @param updateRequests The multiple {@link UpdateRequest} to add. + */ + void add(UpdateRequest... updateRequests); +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java new file mode 100644 index 00000000000..b7601f5241a --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.flink.annotation.Internal; + +import java.time.format.DateTimeFormatter; + +/** Abstract class for time related {@link IndexGenerator}. */ +@Internal +abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase { + + private final String dateTimeFormat; + protected transient DateTimeFormatter dateTimeFormatter; + + public AbstractTimeIndexGenerator(String index, String dateTimeFormat) { + super(index); + this.dateTimeFormat = dateTimeFormat; + } + + @Override + public void open() { + this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java new file mode 100644 index 00000000000..ba9716d4ac5 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler; +import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase; +import org.apache.inlong.sort.elasticsearch.util.IgnoringFailureHandler; +import org.apache.inlong.sort.elasticsearch.util.NoOpFailureHandler; +import org.apache.inlong.sort.elasticsearch.util.RetryRejectedExecutionFailureHandler; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.InstantiationUtil; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; + +/** Accessor methods to elasticsearch options. */ +@Internal +public class ElasticsearchConfiguration { + + protected final ReadableConfig config; + private final ClassLoader classLoader; + + public ElasticsearchConfiguration(ReadableConfig config, ClassLoader classLoader) { + this.config = config; + this.classLoader = classLoader; + } + + public ActionRequestFailureHandler getFailureHandler() { + final ActionRequestFailureHandler failureHandler; + String value = config.get(ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION); + switch (value.toUpperCase()) { + case "FAIL": + failureHandler = new NoOpFailureHandler(); + break; + case "IGNORE": + failureHandler = new IgnoringFailureHandler(); + break; + case "RETRY-REJECTED": + failureHandler = new RetryRejectedExecutionFailureHandler(); + break; + default: + try { + Class failureHandlerClass = Class.forName(value, false, classLoader); + failureHandler = + (ActionRequestFailureHandler) InstantiationUtil.instantiate(failureHandlerClass); + } catch (ClassNotFoundException e) { + throw new ValidationException( + "Could not instantiate the failure handler class: " + value, e); + } + break; + } + return failureHandler; + } + + public String getDocumentType() { + return config.get(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION); + } + + public int getBulkFlushMaxActions() { + int maxActions = config.get(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION); + // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration. + return maxActions == 0 ? -1 : maxActions; + } + + public long getBulkFlushMaxByteSize() { + long maxSize = + config.get(ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes(); + // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration. + return maxSize == 0 ? -1 : maxSize; + } + + public long getBulkFlushInterval() { + long interval = config.get(ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION).toMillis(); + // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration. + return interval == 0 ? -1 : interval; + } + + public Optional getUsername() { + return config.getOptional(ElasticsearchConnectorOptions.USERNAME_OPTION); + } + + public Optional getPassword() { + return config.getOptional(ElasticsearchConnectorOptions.PASSWORD_OPTION); + } + + public boolean isBulkFlushBackoffEnabled() { + return config.get( + ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION) != ElasticsearchConnectorOptions.BackOffType.DISABLED; + } + + public Optional getBulkFlushBackoffType() { + switch (config.get(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION)) { + case CONSTANT: + return Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT); + case EXPONENTIAL: + return Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL); + default: + return Optional.empty(); + } + } + + public Optional getBulkFlushBackoffRetries() { + return config.getOptional(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION); + } + + public Optional getBulkFlushBackoffDelay() { + return config.getOptional(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION) + .map(Duration::toMillis); + } + + public boolean isDisableFlushOnCheckpoint() { + return !config.get(ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION); + } + + public String getIndex() { + return config.get(ElasticsearchConnectorOptions.INDEX_OPTION); + } + + public String getKeyDelimiter() { + return config.get(ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION); + } + + public Optional getPathPrefix() { + return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ElasticsearchConfiguration that = (ElasticsearchConfiguration) o; + return Objects.equals(config, that.config) && Objects.equals(classLoader, that.classLoader); + } + + @Override + public int hashCode() { + return Objects.hash(config, classLoader); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java new file mode 100644 index 00000000000..27c8e390fdb --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.description.Description; + +import java.time.Duration; +import java.util.List; + +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * Options for the Elasticsearch connector. + * + */ +@PublicEvolving +public class ElasticsearchConnectorOptions { + + public static final ConfigOption> HOSTS_OPTION = + ConfigOptions.key("hosts") + .stringType() + .asList() + .noDefaultValue() + .withDescription("Elasticsearch hosts to connect to."); + + public static final ConfigOption INDEX_OPTION = + ConfigOptions.key("index") + .stringType() + .noDefaultValue() + .withDescription("Elasticsearch index for every record."); + + public static final ConfigOption DOCUMENT_TYPE_OPTION = + ConfigOptions.key("document-type") + .stringType() + .noDefaultValue() + .withDescription("Elasticsearch document type."); + + public static final ConfigOption PASSWORD_OPTION = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("Password used to connect to Elasticsearch instance."); + + public static final ConfigOption USERNAME_OPTION = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("Username used to connect to Elasticsearch instance."); + + public static final ConfigOption KEY_DELIMITER_OPTION = + ConfigOptions.key("document-id.key-delimiter") + .stringType() + .defaultValue("_") + .withDescription( + "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\"."); + + public static final ConfigOption FAILURE_HANDLER_OPTION = + ConfigOptions.key("failure-handler") + .stringType() + .defaultValue("fail") + .withDescription( + Description.builder() + .text( + "Failure handling strategy in case a request to Elasticsearch fails") + .list( + text( + "\"fail\" (throws an exception if a request fails and thus causes a job failure)"), + text( + "\"ignore\" (ignores failures and drops the request)"), + text( + "\"retry-rejected\" (re-adds requests that have failed due to queue capacity saturation)"), + text( + "\"class name\" for failure handling with a ActionRequestFailureHandler subclass")) + .build()); + + public static final ConfigOption FLUSH_ON_CHECKPOINT_OPTION = + ConfigOptions.key("sink.flush-on-checkpoint") + .booleanType() + .defaultValue(true) + .withDescription("Disables flushing on checkpoint"); + + public static final ConfigOption BULK_FLUSH_MAX_ACTIONS_OPTION = + ConfigOptions.key("sink.bulk-flush.max-actions") + .intType() + .defaultValue(1000) + .withDescription("Maximum number of actions to buffer for each bulk request."); + + public static final ConfigOption BULK_FLASH_MAX_SIZE_OPTION = + ConfigOptions.key("sink.bulk-flush.max-size") + .memoryType() + .defaultValue(MemorySize.parse("2mb")) + .withDescription("Maximum size of buffered actions per bulk request"); + + public static final ConfigOption BULK_FLUSH_INTERVAL_OPTION = + ConfigOptions.key("sink.bulk-flush.interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("Bulk flush interval"); + + public static final ConfigOption BULK_FLUSH_BACKOFF_TYPE_OPTION = + ConfigOptions.key("sink.bulk-flush.backoff.strategy") + .enumType(BackOffType.class) + .defaultValue(BackOffType.DISABLED) + .withDescription("Backoff strategy"); + + public static final ConfigOption BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION = + ConfigOptions.key("sink.bulk-flush.backoff.max-retries") + .intType() + .noDefaultValue() + .withDescription("Maximum number of retries."); + + public static final ConfigOption BULK_FLUSH_BACKOFF_DELAY_OPTION = + ConfigOptions.key("sink.bulk-flush.backoff.delay") + .durationType() + .noDefaultValue() + .withDescription("Delay between each backoff attempt."); + + public static final ConfigOption CONNECTION_PATH_PREFIX = + ConfigOptions.key("connection.path-prefix") + .stringType() + .noDefaultValue() + .withDescription("Prefix string to be added to every REST communication."); + + public static final ConfigOption FORMAT_OPTION = + ConfigOptions.key("format") + .stringType() + .defaultValue("json") + .withDescription( + "The format must produce a valid JSON document. " + + "Please refer to the documentation on formats for more details."); + + // -------------------------------------------------------------------------------------------- + // Enums + // -------------------------------------------------------------------------------------------- + + /** + * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code + * DISABLED} option. + */ + public enum BackOffType { + DISABLED, + CONSTANT, + EXPONENTIAL + } + + private ElasticsearchConnectorOptions() { + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java new file mode 100644 index 00000000000..0b04440a5ac --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utility methods for validating Elasticsearch properties. */ +@Internal +public class ElasticsearchValidationUtils { + + private static final Set ILLEGAL_PRIMARY_KEY_TYPES = new LinkedHashSet<>(); + + static { + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY); + } + + /** + * Checks that the table does not have primary key defined on illegal types. In Elasticsearch + * the primary key is used to calculate the Elasticsearch document id, which is a string of up + * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the + * fields. Certain types do not have a good string representation to be used in this scenario. + * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link + * LogicalTypeRoot#RAW} type. + */ + public static void validatePrimaryKey(TableSchema schema) { + schema.getPrimaryKey() + .ifPresent( + key -> { + List illegalTypes = + key.getColumns().stream() + .map( + fieldName -> { + LogicalType logicalType = + schema.getFieldDataType(fieldName) + .get() + .getLogicalType(); + if (logicalType.is( + LogicalTypeRoot.DISTINCT_TYPE)) { + return ((DistinctType) logicalType) + .getSourceType() + .getTypeRoot(); + } else { + return logicalType.getTypeRoot(); + } + }) + .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains) + .collect(Collectors.toList()); + + if (!illegalTypes.isEmpty()) { + throw new ValidationException( + String.format( + "The table has a primary key on columns of illegal types: %s.\n" + + " Elasticsearch sink does not support primary keys on columns of types: %s.", + illegalTypes, ILLEGAL_PRIMARY_KEY_TYPES)); + } + }); + } + + private ElasticsearchValidationUtils() { + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java new file mode 100644 index 00000000000..38ff5e1fba5 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; + +import java.io.Serializable; + +/** This interface is responsible to generate index name from given {@link Row} record. */ +@Internal +interface IndexGenerator extends Serializable { + + /** + * Initialize the index generator, this will be called only once before {@link + * #generate(RowData)} is called. + */ + default void open() { + } + + /** Generate index name according the the given row. */ + String generate(RowData row); +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java new file mode 100644 index 00000000000..3b43b636f2f --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** Base class for {@link IndexGenerator}. */ +@Internal +public abstract class IndexGeneratorBase implements IndexGenerator { + + private static final long serialVersionUID = 1L; + protected final String index; + + public IndexGeneratorBase(String index) { + this.index = index; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IndexGeneratorBase)) { + return false; + } + IndexGeneratorBase that = (IndexGeneratorBase) o; + return index.equals(that.index); + } + + @Override + public int hashCode() { + return Objects.hash(index); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java new file mode 100644 index 00000000000..7f25c8f8a51 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Factory of {@link IndexGenerator}. + * + *

Flink supports both static index and dynamic index. + * + *

If you want to have a static index, this option value should be a plain string, e.g. + * 'myusers', all the records will be consistently written into "myusers" index. + * + *

If you want to have a dynamic index, you can use '{field_name}' to reference a field value in + * the record to dynamically generate a target index. You can also use + * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the + * format specified by date_format_string. The date_format_string is compatible with {@link + * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}', + * then a record with log_ts field value 2020-03-27 12:25:55 will be written into + * "myusers_2020-03-27" index. + */ +@Internal +public final class IndexGeneratorFactory { + + private IndexGeneratorFactory() { + } + + public static IndexGenerator createIndexGenerator(String index, TableSchema schema) { + return createIndexGenerator(index, schema, ZoneId.systemDefault()); + } + + public static IndexGenerator createIndexGenerator( + String index, TableSchema schema, ZoneId localTimeZoneId) { + final IndexHelper indexHelper = new IndexHelper(); + if (indexHelper.checkIsDynamicIndex(index)) { + return createRuntimeIndexGenerator( + index, + schema.getFieldNames(), + schema.getFieldDataTypes(), + indexHelper, + localTimeZoneId); + } else { + return new StaticIndexGenerator(index); + } + } + + interface DynamicFormatter extends Serializable { + + String format(@Nonnull Object fieldValue, DateTimeFormatter formatter); + } + + private static IndexGenerator createRuntimeIndexGenerator( + String index, + String[] fieldNames, + DataType[] fieldTypes, + IndexHelper indexHelper, + ZoneId localTimeZoneId) { + final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index); + final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr)); + final String indexSuffix = + index.substring(indexPrefix.length() + dynamicIndexPatternStr.length()); + + if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) { + final String dateTimeFormat = + indexHelper.extractDateFormat( + index, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + return new AbstractTimeIndexGenerator(index, dateTimeFormat) { + + @Override + public String generate(RowData row) { + return indexPrefix + .concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter)) + .concat(indexSuffix); + } + }; + } + + final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index); + final int indexFieldPos = + indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat); + final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType(); + final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot(); + + // validate index field type + indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot); + + // time extract dynamic index pattern + final RowData.FieldGetter fieldGetter = + RowData.createFieldGetter(indexFieldType, indexFieldPos); + + if (isDynamicIndexWithFormat) { + final String dateTimeFormat = + indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot); + DynamicFormatter formatFunction = + createFormatFunction( + indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId); + + return new AbstractTimeIndexGenerator(index, dateTimeFormat) { + + @Override + public String generate(RowData row) { + Object fieldOrNull = fieldGetter.getFieldOrNull(row); + final String formattedField; + // TODO we can possibly optimize it to use the nullability of the field + if (fieldOrNull != null) { + formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter); + } else { + formattedField = "null"; + } + return indexPrefix.concat(formattedField).concat(indexSuffix); + } + }; + } + // general dynamic index pattern + return new IndexGeneratorBase(index) { + + @Override + public String generate(RowData row) { + Object indexField = fieldGetter.getFieldOrNull(row); + return indexPrefix + .concat(indexField == null ? "null" : indexField.toString()) + .concat(indexSuffix); + } + }; + } + + private static DynamicFormatter createFormatFunction( + LogicalType indexFieldType, + LogicalTypeRoot indexFieldLogicalTypeRoot, + ZoneId localTimeZoneId) { + switch (indexFieldLogicalTypeRoot) { + case DATE: + return (value, dateTimeFormatter) -> { + Integer indexField = (Integer) value; + return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter); + }; + case TIME_WITHOUT_TIME_ZONE: + return (value, dateTimeFormatter) -> { + Integer indexField = (Integer) value; + return LocalTime.ofNanoOfDay(indexField * 1_000_000L).format(dateTimeFormatter); + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (value, dateTimeFormatter) -> { + TimestampData indexField = (TimestampData) value; + return indexField.toLocalDateTime().format(dateTimeFormatter); + }; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException( + "TIMESTAMP_WITH_TIME_ZONE is not supported yet"); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return (value, dateTimeFormatter) -> { + TimestampData indexField = (TimestampData) value; + return indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter); + }; + default: + throw new TableException( + String.format( + "Unsupported type '%s' found in Elasticsearch dynamic index field, " + + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.", + indexFieldType)); + } + } + + /** + * Helper class for {@link IndexGeneratorFactory}, this helper can use to validate index field + * type ans parse index format from pattern. + */ + public static class IndexHelper { + + private static final Pattern dynamicIndexPattern = Pattern.compile("\\{[^\\{\\}]+\\}?"); + private static final Pattern dynamicIndexTimeExtractPattern = + Pattern.compile(".*\\{.+\\|.*\\}.*"); + private static final Pattern dynamicIndexSystemTimeExtractPattern = + Pattern.compile( + ".*\\{\\s*(now\\(\\s*\\)|NOW\\(\\s*\\)|current_timestamp|CURRENT_TIMESTAMP)\\s*\\|.*\\}.*"); + private static final List supportedTypes = new ArrayList<>(); + private static final Map defaultFormats = new HashMap<>(); + + static { + // time related types + supportedTypes.add(LogicalTypeRoot.DATE); + supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE); + supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE); + supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + // general types + supportedTypes.add(LogicalTypeRoot.VARCHAR); + supportedTypes.add(LogicalTypeRoot.CHAR); + supportedTypes.add(LogicalTypeRoot.TINYINT); + supportedTypes.add(LogicalTypeRoot.INTEGER); + supportedTypes.add(LogicalTypeRoot.BIGINT); + } + + static { + defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd"); + defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss"); + defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss"); + defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss"); + defaultFormats.put( + LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "yyyy_MM_dd_HH_mm_ssX"); + } + + /** Validate the index field Type. */ + void validateIndexFieldType(LogicalTypeRoot logicalType) { + if (!supportedTypes.contains(logicalType)) { + throw new IllegalArgumentException( + String.format( + "Unsupported type %s of index field, " + "Supported types are: %s", + logicalType, supportedTypes)); + } + } + + /** Get the default date format. */ + String getDefaultFormat(LogicalTypeRoot logicalType) { + return defaultFormats.get(logicalType); + } + + /** Check general dynamic index is enabled or not by index pattern. */ + boolean checkIsDynamicIndex(String index) { + final Matcher matcher = dynamicIndexPattern.matcher(index); + int count = 0; + while (matcher.find()) { + count++; + } + if (count > 1) { + throw new TableException( + String.format( + "Chaining dynamic index pattern %s is not supported," + + " only support single dynamic index pattern.", + index)); + } + return count == 1; + } + + /** Check time extract dynamic index is enabled or not by index pattern. */ + boolean checkIsDynamicIndexWithFormat(String index) { + return dynamicIndexTimeExtractPattern.matcher(index).matches(); + } + + /** Check generate dynamic index is from system time or not. */ + public boolean checkIsDynamicIndexWithSystemTimeFormat(String index) { + return dynamicIndexSystemTimeExtractPattern.matcher(index).matches(); + } + + /** Extract dynamic index pattern string from index pattern string. */ + String extractDynamicIndexPatternStr(String index) { + int start = index.indexOf("{"); + int end = index.lastIndexOf("}"); + return index.substring(start, end + 1); + } + + /** Extract index field position in a fieldNames, return the field position. */ + int extractIndexFieldPos( + String index, String[] fieldNames, boolean isDynamicIndexWithFormat) { + List fieldList = Arrays.asList(fieldNames); + String indexFieldName; + if (isDynamicIndexWithFormat) { + indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("|")); + } else { + indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("}")); + } + if (!fieldList.contains(indexFieldName)) { + throw new TableException( + String.format( + "Unknown field '%s' in index pattern '%s', please check the field name.", + indexFieldName, index)); + } + return fieldList.indexOf(indexFieldName); + } + + /** Extract dateTime format by the date format that extracted from index pattern string. */ + private String extractDateFormat(String index, LogicalTypeRoot logicalType) { + String format = index.substring(index.indexOf("|") + 1, index.indexOf("}")); + if ("".equals(format)) { + format = getDefaultFormat(logicalType); + } + return format; + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java new file mode 100644 index 00000000000..e4808c0e6b0 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.Period; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** An extractor for a Elasticsearch key from a {@link RowData}. */ +@Internal +public class KeyExtractor implements Function, Serializable { + + private final FieldFormatter[] fieldFormatters; + private final String keyDelimiter; + + private interface FieldFormatter extends Serializable { + + String format(RowData rowData); + } + + private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) { + this.fieldFormatters = fieldFormatters; + this.keyDelimiter = keyDelimiter; + } + + @Override + public String apply(RowData rowData) { + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < fieldFormatters.length; i++) { + if (i > 0) { + builder.append(keyDelimiter); + } + final String value = fieldFormatters[i].format(rowData); + builder.append(value); + } + return builder.toString(); + } + + private static class ColumnWithIndex { + + public TableColumn column; + public int index; + + public ColumnWithIndex(TableColumn column, int index) { + this.column = column; + this.index = index; + } + + public LogicalType getType() { + return column.getType().getLogicalType(); + } + + public int getIndex() { + return index; + } + } + + public static Function createKeyExtractor( + TableSchema schema, String keyDelimiter) { + return schema.getPrimaryKey() + .map( + key -> { + Map namesToColumns = new HashMap<>(); + List tableColumns = schema.getTableColumns(); + for (int i = 0; i < schema.getFieldCount(); i++) { + TableColumn column = tableColumns.get(i); + namesToColumns.put( + column.getName(), new ColumnWithIndex(column, i)); + } + + FieldFormatter[] fieldFormatters = + key.getColumns().stream() + .map(namesToColumns::get) + .map( + column -> toFormatter( + column.index, column.getType())) + .toArray(FieldFormatter[]::new); + + return (Function) new KeyExtractor(fieldFormatters, keyDelimiter); + }) + .orElseGet(() -> (Function & Serializable) (row) -> null); + } + + private static FieldFormatter toFormatter(int index, LogicalType type) { + switch (type.getTypeRoot()) { + case DATE: + return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString(); + case TIME_WITHOUT_TIME_ZONE: + return (row) -> LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString(); + case INTERVAL_YEAR_MONTH: + return (row) -> Period.ofDays(row.getInt(index)).toString(); + case INTERVAL_DAY_TIME: + return (row) -> Duration.ofMillis(row.getLong(index)).toString(); + case DISTINCT_TYPE: + return toFormatter(index, ((DistinctType) type).getSourceType()); + default: + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, index); + return (row) -> fieldGetter.getFieldOrNull(row).toString(); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java new file mode 100644 index 00000000000..24815296df5 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.inlong.sort.elasticsearch.RequestIndexer; + +import org.apache.flink.annotation.Internal; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.Serializable; + +/** For version-agnostic creating of {@link ActionRequest}s. */ +@Internal +public interface RequestFactory extends Serializable { + + /** + * Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has + * been deprecated since Elasticsearch 7.x and it would not take any effort. + */ + UpdateRequest createUpdateRequest( + String index, String docType, String key, XContentType contentType, byte[] document); + + /** + * Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has + * been deprecated since Elasticsearch 7.x and it would not take any effort. + */ + IndexRequest createIndexRequest( + String index, String docType, String key, XContentType contentType, byte[] document); + + /** + * Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has + * been deprecated since Elasticsearch 7.x and it would not take any effort. + */ + DeleteRequest createDeleteRequest(String index, String docType, String key); +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java new file mode 100644 index 00000000000..260ed576d93 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; +import org.apache.inlong.sort.elasticsearch.RequestIndexer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.XContentType; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.function.Function; + +/** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */ +@Internal +public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction { + + private static final long serialVersionUID = 1L; + + private final IndexGenerator indexGenerator; + private final String docType; + private final SerializationSchema serializationSchema; + private final XContentType contentType; + private final RequestFactory requestFactory; + private final Function createKey; + + public RowElasticsearchSinkFunction( + IndexGenerator indexGenerator, + @Nullable String docType, // this is deprecated in es 7+ + SerializationSchema serializationSchema, + XContentType contentType, + RequestFactory requestFactory, + Function createKey) { + this.indexGenerator = Preconditions.checkNotNull(indexGenerator); + this.docType = docType; + this.serializationSchema = Preconditions.checkNotNull(serializationSchema); + this.contentType = Preconditions.checkNotNull(contentType); + this.requestFactory = Preconditions.checkNotNull(requestFactory); + this.createKey = Preconditions.checkNotNull(createKey); + } + + @Override + public void open(RuntimeContext ctx) throws Exception { + serializationSchema.open( + RuntimeContextInitializationContextAdapters.serializationAdapter(ctx)); + indexGenerator.open(); + } + + @Override + public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) { + switch (element.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + processUpsert(element, indexer); + break; + case UPDATE_BEFORE: + case DELETE: + processDelete(element, indexer); + break; + default: + throw new TableException("Unsupported message kind: " + element.getRowKind()); + } + } + + private void processUpsert(RowData row, RequestIndexer indexer) { + final byte[] document = serializationSchema.serialize(row); + final String key = createKey.apply(row); + if (key != null) { + final UpdateRequest updateRequest = + requestFactory.createUpdateRequest( + indexGenerator.generate(row), docType, key, contentType, document); + indexer.add(updateRequest); + } else { + final IndexRequest indexRequest = + requestFactory.createIndexRequest( + indexGenerator.generate(row), docType, key, contentType, document); + indexer.add(indexRequest); + } + } + + private void processDelete(RowData row, RequestIndexer indexer) { + final String key = createKey.apply(row); + final DeleteRequest deleteRequest = + requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key); + indexer.add(deleteRequest); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RowElasticsearchSinkFunction that = (RowElasticsearchSinkFunction) o; + return Objects.equals(indexGenerator, that.indexGenerator) + && Objects.equals(docType, that.docType) + && Objects.equals(serializationSchema, that.serializationSchema) + && contentType == that.contentType + && Objects.equals(requestFactory, that.requestFactory) + && Objects.equals(createKey, that.createKey); + } + + @Override + public int hashCode() { + return Objects.hash( + indexGenerator, + docType, + serializationSchema, + contentType, + requestFactory, + createKey); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java new file mode 100644 index 00000000000..0f59ce0969e --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; + +/** A static {@link IndexGenerator} which generate fixed index name. */ +@Internal +final class StaticIndexGenerator extends IndexGeneratorBase { + + public StaticIndexGenerator(String index) { + super(index); + } + + public String generate(RowData row) { + return index; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java new file mode 100644 index 00000000000..cdbfa731a91 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.util; + +import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler; +import org.apache.inlong.sort.elasticsearch.RequestIndexer; + +import org.apache.flink.annotation.Internal; +import org.elasticsearch.action.ActionRequest; + +/** Ignores all kinds of failures and drops the affected {@link ActionRequest}. */ +@Internal +public class IgnoringFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = 1662846593501L; + + @Override + public void onFailure( + ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) { + // ignore failure + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java new file mode 100644 index 00000000000..477b2b6b6fb --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.util; + +import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler; +import org.apache.inlong.sort.elasticsearch.RequestIndexer; + +import org.apache.flink.annotation.Internal; +import org.elasticsearch.action.ActionRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** An {@link ActionRequestFailureHandler} that simply fails the sink on any failures. */ +@Internal +public class NoOpFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = 737941343410827885L; + + private static final Logger LOG = LoggerFactory.getLogger(NoOpFailureHandler.class); + + @Override + public void onFailure( + ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) + throws Throwable { + LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); + // simply fail the sink + throw failure; + } + + @Override + public boolean equals(Object o) { + return o instanceof NoOpFailureHandler; + } + + @Override + public int hashCode() { + return NoOpFailureHandler.class.hashCode(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java new file mode 100644 index 00000000000..1afb7b0ee61 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch.util; + +import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler; +import org.apache.inlong.sort.elasticsearch.RequestIndexer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.ExceptionUtils; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary {@link + * EsRejectedExecutionException}s (which means that Elasticsearch node queues are currently full), + * and fails for all other failures. + * + */ +@PublicEvolving +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = -7423562912824511906L; + + private static final Logger LOG = + LoggerFactory.getLogger(RetryRejectedExecutionFailureHandler.class); + + @Override + public void onFailure( + ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) + throws Throwable { + LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); + if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) { + indexer.add(action); + } else { + // rethrow all other failures + throw failure; + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml index 72132b4de61..f21472326a9 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml @@ -33,6 +33,7 @@ pulsar jdbc + elasticsearch-base diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 8d1fd1ede92..d5b47b354e5 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -938,6 +938,27 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE Source : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/master/LICENSE + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java +Source : org.apache.flink:flink-connector-elasticsearch-base-3.0.1-1.17.jar (Please note that the software have been modified.) +License : https://github.com/apache/flink/blob/master/LICENSE + ======================================================================= Apache InLong Subcomponents: