diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
new file mode 100644
index 00000000000..709eefdb065
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/pom.xml
@@ -0,0 +1,140 @@
+
+
+
+ 4.0.0
+
+ org.apache.inlong
+ sort-connectors-v1.18
+ 1.14.0-SNAPSHOT
+
+ sort-connector-elasticsearch-base-v1.18
+ jar
+ Apache InLong - Sort-connector-elasticsearch-base
+
+
+ ${project.parent.parent.parent.parent.parent.basedir}
+ 7.10.2
+
+
+
+ org.apache.inlong
+ sort-flink-dependencies-v1.18
+ ${project.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ provided
+
+
+
+ org.elasticsearch
+ elasticsearch
+ ${elasticsearch.version}
+
+
+
+ org.ow2.asm
+ *
+
+
+
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-high-level-client
+ ${elasticsearch.version}
+
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+ provided
+
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ provided
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+
+ shade
+
+ package
+
+ true
+
+
+ org.apache.inlong:sort-connector-*
+
+ org/apache/inlong/**
+ META-INF/services/org.apache.flink.table.factories.Factory
+
+
+
+ *:*
+
+ log4j.properties
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
new file mode 100644
index 00000000000..e014feb86af
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how
+ * failed {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing
+ * malformed documents, or simply requesting them to be sent to Elasticsearch again if the failure
+ * is only temporary.
+ *
+ *
Example:
+ *
+ *
{@code
+ * private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
+ *
+ * @Override
+ * void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
+ * if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
+ * // full queue; re-add document for indexing
+ * indexer.add(action);
+ * } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
+ * // malformed document; simply drop request without failing sink
+ * } else {
+ * // for all other failures, fail the sink;
+ * // here the failure is simply rethrown, but users can also choose to throw custom exceptions
+ * throw failure;
+ * }
+ * }
+ * }
+ *
+ * }
+ *
+ * The above example will let the sink re-add requests that failed due to queue capacity
+ * saturation and drop requests with malformed documents, without failing the sink. For all other
+ * failures, the sink will fail.
+ *
+ *
Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the
+ * exact type could not be retrieved through the older version Java client APIs (thus, the types
+ * will be general {@link Exception}s and only differ in the failure message). In this case, it is
+ * recommended to match on the provided REST status code.
+ *
+ */
+@PublicEvolving
+public interface ActionRequestFailureHandler extends Serializable {
+
+ /**
+ * Handle a failed {@link ActionRequest}.
+ *
+ * @param action the {@link ActionRequest} that failed due to the failure
+ * @param failure the cause of failure
+ * @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
+ * @param indexer request indexer to re-add the failed action, if intended to do so
+ * @throws Throwable if the sink should fail on this failure, the implementation should rethrow
+ * the exception or a custom one
+ */
+ void onFailure(
+ ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
+ throws Throwable;
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
new file mode 100644
index 00000000000..00ceba4ff41
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Implementation of a {@link RequestIndexer} that buffers {@link ActionRequest ActionRequests}
+ * before re-sending them to the Elasticsearch cluster upon request.
+ */
+@Internal
+@NotThreadSafe
+class BufferingNoOpRequestIndexer implements RequestIndexer {
+
+ private ConcurrentLinkedQueue bufferedRequests;
+
+ BufferingNoOpRequestIndexer() {
+ this.bufferedRequests = new ConcurrentLinkedQueue();
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ Collections.addAll(bufferedRequests, deleteRequests);
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ Collections.addAll(bufferedRequests, indexRequests);
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ Collections.addAll(bufferedRequests, updateRequests);
+ }
+
+ void processBufferedRequests(RequestIndexer actualIndexer) {
+ for (ActionRequest request : bufferedRequests) {
+ if (request instanceof IndexRequest) {
+ actualIndexer.add((IndexRequest) request);
+ } else if (request instanceof DeleteRequest) {
+ actualIndexer.add((DeleteRequest) request);
+ } else if (request instanceof UpdateRequest) {
+ actualIndexer.add((UpdateRequest) request);
+ }
+ }
+
+ bufferedRequests.clear();
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
new file mode 100644
index 00000000000..88742cb4985
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls
+ * across different versions. This includes calls to create Elasticsearch clients, handle failed
+ * item responses, etc. Any incompatible Elasticsearch Java APIs should be bridged using this
+ * interface.
+ *
+ * Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since
+ * connecting via an embedded node is allowed, the call bridge will hold reference to the created
+ * embedded node. Each instance of the sink will hold exactly one instance of the call bridge, and
+ * state cleanup is performed when the sink is closed.
+ *
+ * @param The Elasticsearch client, that implements {@link AutoCloseable}.
+ */
+@Internal
+public interface ElasticsearchApiCallBridge extends Serializable {
+
+ /**
+ * Creates an Elasticsearch client implementing {@link AutoCloseable}.
+ *
+ * @return The created client.
+ */
+ C createClient();
+
+ /**
+ * Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
+ *
+ * @param client the Elasticsearch client.
+ * @param listener the bulk processor listener.
+ * @return the bulk processor builder.
+ */
+ BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);
+
+ /**
+ * Extracts the cause of failure of a bulk item action.
+ *
+ * @param bulkItemResponse the bulk item response to extract cause of failure
+ * @return the extracted {@link Throwable} from the response ({@code null} is the response is
+ * successful).
+ */
+ @Nullable
+ Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+
+ /**
+ * Sets the bulk flush interval, in milliseconds on the provided {@link BulkProcessor.Builder}.
+ * The builder will be later on used to instantiate the actual {@link BulkProcessor}.
+ *
+ * @param builder the {@link BulkProcessor.Builder} to configure.
+ * @param flushIntervalMillis the flush interval in milliseconds.
+ */
+ void configureBulkProcessorFlushInterval(
+ BulkProcessor.Builder builder, long flushIntervalMillis);
+
+ /**
+ * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}. The builder
+ * will be later on used to instantiate the actual {@link BulkProcessor}.
+ *
+ * @param builder the {@link BulkProcessor.Builder} to configure.
+ * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user
+ * disabled backoff retries).
+ */
+ void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
+
+ /**
+ * Verify the client connection by making a test request/ping to the Elasticsearch cluster.
+ *
+ * Called by {@link ElasticsearchSinkBase#open(org.apache.flink.configuration.Configuration)}
+ * after creating the client. This makes sure the underlying client is closed if the connection
+ * is not successful and preventing thread leak.
+ *
+ * @param client the Elasticsearch client.
+ */
+ void verifyClientConnection(C client) throws IOException;
+
+ /**
+ * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary
+ * compatible.
+ */
+ RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef);
+
+ /** Perform any necessary state cleanup. */
+ default void cleanup() {
+ // nothing to cleanup by default
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
new file mode 100644
index 00000000000..43754293327
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ *
This class implements the common behaviour across Elasticsearch versions, such as the use of
+ * an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before sending the
+ * requests to the cluster, as well as passing input records to the user provided {@link
+ * ElasticsearchSinkFunction} for processing.
+ *
+ *
The version specific API calls for different Elasticsearch versions should be defined by a
+ * concrete implementation of a {@link ElasticsearchApiCallBridge}, which is provided to the
+ * constructor of this class. This call bridge is used, for example, to create a Elasticsearch
+ * {@link Client}, handle failed item responses, etc.
+ *
+ * @param Type of the elements handled by this sink
+ * @param Type of the Elasticsearch client, which implements {@link AutoCloseable}
+ */
+@Internal
+public abstract class ElasticsearchSinkBase extends RichSinkFunction
+ implements
+ CheckpointedFunction {
+
+ private static final long serialVersionUID = -1007596293618451942L;
+
+ // ------------------------------------------------------------------------
+ // Internal bulk processor configuration
+ // ------------------------------------------------------------------------
+
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+ public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
+
+ /** Used to control whether the retry delay should increase exponentially or remain constant. */
+ @PublicEvolving
+ public enum FlushBackoffType {
+ CONSTANT,
+ EXPONENTIAL
+ }
+
+ /**
+ * Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to
+ * resource constraints (i.e. the client's internal thread pool is full), the backoff policy
+ * decides how long the bulk processor will wait before the operation is retried internally.
+ *
+ * This is a proxy for version specific backoff policies.
+ */
+ public static class BulkFlushBackoffPolicy implements Serializable {
+
+ private static final long serialVersionUID = -6022851996101826049L;
+
+ // the default values follow the Elasticsearch default settings for BulkProcessor
+ private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
+ private int maxRetryCount = 8;
+ private long delayMillis = 50;
+
+ public FlushBackoffType getBackoffType() {
+ return backoffType;
+ }
+
+ public int getMaxRetryCount() {
+ return maxRetryCount;
+ }
+
+ public long getDelayMillis() {
+ return delayMillis;
+ }
+
+ public void setBackoffType(FlushBackoffType backoffType) {
+ this.backoffType = checkNotNull(backoffType);
+ }
+
+ public void setMaxRetryCount(int maxRetryCount) {
+ checkArgument(maxRetryCount >= 0);
+ this.maxRetryCount = maxRetryCount;
+ }
+
+ public void setDelayMillis(long delayMillis) {
+ checkArgument(delayMillis >= 0);
+ this.delayMillis = delayMillis;
+ }
+ }
+
+ private final Integer bulkProcessorFlushMaxActions;
+ private final Integer bulkProcessorFlushMaxSizeMb;
+ private final Long bulkProcessorFlushIntervalMillis;
+ private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
+
+ // ------------------------------------------------------------------------
+ // User-facing API and configuration
+ // ------------------------------------------------------------------------
+
+ /**
+ * The function that is used to construct multiple {@link ActionRequest ActionRequests} from
+ * each incoming element.
+ */
+ private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+ /** User-provided handler for failed {@link ActionRequest ActionRequests}. */
+ private final ActionRequestFailureHandler failureHandler;
+
+ /**
+ * If true, the producer will wait until all outstanding action requests have been sent to
+ * Elasticsearch.
+ */
+ private boolean flushOnCheckpoint = true;
+
+ /**
+ * Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest
+ * ActionRequests}.
+ */
+ private transient RequestIndexer requestIndexer;
+
+ /**
+ * Provided to the {@link ActionRequestFailureHandler} to allow users to re-index failed
+ * requests.
+ */
+ private transient BufferingNoOpRequestIndexer failureRequestIndexer;
+
+ // ------------------------------------------------------------------------
+ // Internals for the Flink Elasticsearch Sink
+ // ------------------------------------------------------------------------
+
+ /** Call bridge for different version-specific. */
+ private final ElasticsearchApiCallBridge callBridge;
+
+ /**
+ * Number of pending action requests not yet acknowledged by Elasticsearch. This value is
+ * maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
+ *
+ * This is incremented whenever the user adds (or re-adds through the {@link
+ * ActionRequestFailureHandler}) requests to the {@link RequestIndexer}. It is decremented for
+ * each completed request of a bulk request, in {@link BulkProcessor.Listener#afterBulk(long,
+ * BulkRequest, BulkResponse)} and {@link BulkProcessor.Listener#afterBulk(long, BulkRequest,
+ * Throwable)}.
+ */
+ private AtomicLong numPendingRequests = new AtomicLong(0);
+
+ /** Elasticsearch client created using the call bridge. */
+ private transient C client;
+
+ /** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
+ private transient BulkProcessor bulkProcessor;
+
+ /**
+ * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown
+ * in callbacks and the user considered it should fail the sink via the {@link
+ * ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method.
+ *
+ *
Errors will be checked and rethrown before processing each input element, and when the
+ * sink is closed.
+ */
+ private final AtomicReference failureThrowable = new AtomicReference<>();
+
+ public ElasticsearchSinkBase(
+ ElasticsearchApiCallBridge callBridge,
+ Map userConfig,
+ ElasticsearchSinkFunction elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler) {
+
+ this.callBridge = checkNotNull(callBridge);
+ this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+ this.failureHandler = checkNotNull(failureHandler);
+ // we eagerly check if the user-provided sink function and failure handler is serializable;
+ // otherwise, if they aren't serializable, users will merely get a non-informative error
+ // message
+ // "ElasticsearchSinkBase is not serializable"
+
+ checkArgument(
+ InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+ "The implementation of the provided ElasticsearchSinkFunction is not serializable. "
+ + "The object probably contains or references non-serializable fields.");
+
+ checkArgument(
+ InstantiationUtil.isSerializable(failureHandler),
+ "The implementation of the provided ActionRequestFailureHandler is not serializable. "
+ + "The object probably contains or references non-serializable fields.");
+
+ // extract and remove bulk processor related configuration from the user-provided config,
+ // so that the resulting user config only contains configuration related to the
+ // Elasticsearch client.
+
+ checkNotNull(userConfig);
+
+ // copy config so we can remove entries without side-effects
+ userConfig = new HashMap<>(userConfig);
+
+ ParameterTool params = ParameterTool.fromMap(userConfig);
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+ bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+ } else {
+ bulkProcessorFlushMaxActions = null;
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+ bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+ } else {
+ bulkProcessorFlushMaxSizeMb = null;
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+ bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+ } else {
+ bulkProcessorFlushIntervalMillis = null;
+ }
+
+ boolean bulkProcessorFlushBackoffEnable =
+ params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
+
+ if (bulkProcessorFlushBackoffEnable) {
+ this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
+ bulkProcessorFlushBackoffPolicy.setBackoffType(
+ FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
+ bulkProcessorFlushBackoffPolicy.setMaxRetryCount(
+ params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
+ bulkProcessorFlushBackoffPolicy.setDelayMillis(
+ params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
+ }
+
+ } else {
+ bulkProcessorFlushBackoffPolicy = null;
+ }
+ }
+
+ /**
+ * Disable flushing on checkpoint. When disabled, the sink will not wait for all pending action
+ * requests to be acknowledged by Elasticsearch on checkpoints.
+ *
+ * NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT provide
+ * any strong guarantees for at-least-once delivery of action requests.
+ */
+ public void disableFlushOnCheckpoint() {
+ this.flushOnCheckpoint = false;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ client = callBridge.createClient();
+ callBridge.verifyClientConnection(client);
+ bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
+ requestIndexer =
+ callBridge.createBulkProcessorIndexer(
+ bulkProcessor, flushOnCheckpoint, numPendingRequests);
+ failureRequestIndexer = new BufferingNoOpRequestIndexer();
+ elasticsearchSinkFunction.open(getRuntimeContext());
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ checkAsyncErrorsAndRequests();
+ elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // no initialization needed
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ checkAsyncErrorsAndRequests();
+
+ if (flushOnCheckpoint) {
+ while (numPendingRequests.get() != 0) {
+ bulkProcessor.flush();
+ checkAsyncErrorsAndRequests();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ elasticsearchSinkFunction.close();
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ bulkProcessor = null;
+ }
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ callBridge.cleanup();
+
+ // make sure any errors from callbacks are rethrown
+ checkErrorAndRethrow();
+ }
+
+ /**
+ * Build the {@link BulkProcessor}.
+ *
+ *
Note: this is exposed for testing purposes.
+ */
+ @VisibleForTesting
+ protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
+ checkNotNull(listener);
+
+ BulkProcessor.Builder bulkProcessorBuilder =
+ callBridge.createBulkProcessorBuilder(client, listener);
+
+ // This makes flush() blocking
+ bulkProcessorBuilder.setConcurrentRequests(0);
+
+ if (bulkProcessorFlushMaxActions != null) {
+ bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
+ }
+
+ if (bulkProcessorFlushMaxSizeMb != null) {
+ configureBulkSize(bulkProcessorBuilder);
+ }
+
+ if (bulkProcessorFlushIntervalMillis != null) {
+ configureFlushInterval(bulkProcessorBuilder);
+ }
+
+ // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
+ callBridge.configureBulkProcessorBackoff(
+ bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
+
+ return bulkProcessorBuilder.build();
+ }
+
+ private void configureBulkSize(BulkProcessor.Builder bulkProcessorBuilder) {
+ final ByteSizeUnit sizeUnit;
+ if (bulkProcessorFlushMaxSizeMb == -1) {
+ // bulk size can be disabled with -1, however the ByteSizeValue constructor accepts -1
+ // only with BYTES as the size unit
+ sizeUnit = ByteSizeUnit.BYTES;
+ } else {
+ sizeUnit = ByteSizeUnit.MB;
+ }
+ bulkProcessorBuilder.setBulkSize(new ByteSizeValue(bulkProcessorFlushMaxSizeMb, sizeUnit));
+ }
+
+ private void configureFlushInterval(BulkProcessor.Builder bulkProcessorBuilder) {
+ if (bulkProcessorFlushIntervalMillis == -1) {
+ bulkProcessorBuilder.setFlushInterval(null);
+ } else {
+ callBridge.configureBulkProcessorFlushInterval(
+ bulkProcessorBuilder, bulkProcessorFlushIntervalMillis);
+ }
+ }
+
+ private void checkErrorAndRethrow() {
+ Throwable cause = failureThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
+ }
+ }
+
+ private void checkAsyncErrorsAndRequests() {
+ checkErrorAndRethrow();
+ failureRequestIndexer.processBufferedRequests(requestIndexer);
+ }
+
+ private class BulkProcessorListener implements BulkProcessor.Listener {
+
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ BulkItemResponse itemResponse;
+ Throwable failure;
+ RestStatus restStatus;
+ DocWriteRequest actionRequest;
+
+ try {
+ for (int i = 0; i < response.getItems().length; i++) {
+ itemResponse = response.getItems()[i];
+ failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
+ if (failure != null) {
+ restStatus = itemResponse.getFailure().getStatus();
+ actionRequest = request.requests().get(i);
+ if (restStatus == null) {
+ if (actionRequest instanceof ActionRequest) {
+ failureHandler.onFailure(
+ (ActionRequest) actionRequest,
+ failure,
+ -1,
+ failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException(
+ "The sink currently only supports ActionRequests");
+ }
+ } else {
+ if (actionRequest instanceof ActionRequest) {
+ failureHandler.onFailure(
+ (ActionRequest) actionRequest,
+ failure,
+ restStatus.getStatus(),
+ failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException(
+ "The sink currently only supports ActionRequests");
+ }
+ }
+ }
+ }
+ } catch (Throwable t) {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, t);
+ }
+ }
+
+ if (flushOnCheckpoint) {
+ numPendingRequests.getAndAdd(-request.numberOfActions());
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ try {
+ for (DocWriteRequest writeRequest : request.requests()) {
+ if (writeRequest instanceof ActionRequest) {
+ failureHandler.onFailure(
+ (ActionRequest) writeRequest, failure, -1, failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException(
+ "The sink currently only supports ActionRequests");
+ }
+ }
+ } catch (Throwable t) {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, t);
+ }
+
+ if (flushOnCheckpoint) {
+ numPendingRequests.getAndAdd(-request.numberOfActions());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ long getNumPendingRequests() {
+ if (flushOnCheckpoint) {
+ return numPendingRequests.get();
+ } else {
+ throw new UnsupportedOperationException(
+ "The number of pending requests is not maintained when flushing on checkpoint is disabled.");
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
new file mode 100644
index 00000000000..87f334693e4
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from an element in a stream.
+ *
+ *
This is used by sinks to prepare elements for sending them to Elasticsearch.
+ *
+ *
Example:
+ *
+ *
{@code
+ * private static class TestElasticSearchSinkFunction implements
+ * ElasticsearchSinkFunction> {
+ *
+ * public IndexRequest createIndexRequest(Tuple2 element) {
+ * Map json = new HashMap<>();
+ * json.put("data", element.f1);
+ *
+ * return Requests.indexRequest()
+ * .index("my-index")
+ * .type("my-type")
+ * .id(element.f0.toString())
+ * .source(json);
+ * }
+ *
+ * public void process(Tuple2 element, RuntimeContext ctx, RequestIndexer indexer) {
+ * indexer.add(createIndexRequest(element));
+ * }
+ * }
+ *
+ * }
+ *
+ * @param The type of the element handled by this {@code ElasticsearchSinkFunction}
+ */
+@PublicEvolving
+public interface ElasticsearchSinkFunction extends Serializable, Function {
+
+ /**
+ * Initialization method for the function. It is called once before the actual working process
+ * methods, if {@link #open(RuntimeContext)} is not overridden.
+ */
+ default void open() throws Exception {
+ }
+
+ /**
+ * Initialization method for the function. It is called once before the actual working process
+ * methods.
+ */
+ default void open(RuntimeContext ctx) throws Exception {
+ open();
+ }
+
+ /** Tear-down method for the function. It is called when the sink closes. */
+ default void close() throws Exception {
+ }
+
+ /**
+ * Process the incoming element to produce multiple {@link ActionRequest ActionsRequests}. The
+ * produced requests should be added to the provided {@link RequestIndexer}.
+ *
+ * @param element incoming element to process
+ * @param ctx runtime context containing information about the sink instance
+ * @param indexer request indexer that {@code ActionRequest} should be added to
+ */
+ void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java
new file mode 100644
index 00000000000..ff9ab406951
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+/**
+ * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare them
+ * for sending to an Elasticsearch cluster.
+ */
+@PublicEvolving
+public interface RequestIndexer {
+
+ /**
+ * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to
+ * Elasticsearch.
+ *
+ * @param actionRequests The multiple {@link ActionRequest} to add.
+ * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@link UpdateRequest}
+ */
+ @Deprecated
+ default void add(ActionRequest... actionRequests) {
+ for (ActionRequest actionRequest : actionRequests) {
+ if (actionRequest instanceof IndexRequest) {
+ add((IndexRequest) actionRequest);
+ } else if (actionRequest instanceof DeleteRequest) {
+ add((DeleteRequest) actionRequest);
+ } else if (actionRequest instanceof UpdateRequest) {
+ add((UpdateRequest) actionRequest);
+ } else {
+ throw new IllegalArgumentException(
+ "RequestIndexer only supports Index, Delete and Update requests");
+ }
+ }
+ }
+
+ /**
+ * Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to
+ * Elasticsearch.
+ *
+ * @param deleteRequests The multiple {@link DeleteRequest} to add.
+ */
+ void add(DeleteRequest... deleteRequests);
+
+ /**
+ * Add multiple {@link IndexRequest} to the indexer to prepare for sending requests to
+ * Elasticsearch.
+ *
+ * @param indexRequests The multiple {@link IndexRequest} to add.
+ */
+ void add(IndexRequest... indexRequests);
+
+ /**
+ * Add multiple {@link UpdateRequest} to the indexer to prepare for sending requests to
+ * Elasticsearch.
+ *
+ * @param updateRequests The multiple {@link UpdateRequest} to add.
+ */
+ void add(UpdateRequest... updateRequests);
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
new file mode 100644
index 00000000000..b7601f5241a
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.format.DateTimeFormatter;
+
+/** Abstract class for time related {@link IndexGenerator}. */
+@Internal
+abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {
+
+ private final String dateTimeFormat;
+ protected transient DateTimeFormatter dateTimeFormatter;
+
+ public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
+ super(index);
+ this.dateTimeFormat = dateTimeFormat;
+ }
+
+ @Override
+ public void open() {
+ this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
new file mode 100644
index 00000000000..ba9716d4ac5
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.inlong.sort.elasticsearch.util.NoOpFailureHandler;
+import org.apache.inlong.sort.elasticsearch.util.RetryRejectedExecutionFailureHandler;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Accessor methods to elasticsearch options. */
+@Internal
+public class ElasticsearchConfiguration {
+
+ protected final ReadableConfig config;
+ private final ClassLoader classLoader;
+
+ public ElasticsearchConfiguration(ReadableConfig config, ClassLoader classLoader) {
+ this.config = config;
+ this.classLoader = classLoader;
+ }
+
+ public ActionRequestFailureHandler getFailureHandler() {
+ final ActionRequestFailureHandler failureHandler;
+ String value = config.get(ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION);
+ switch (value.toUpperCase()) {
+ case "FAIL":
+ failureHandler = new NoOpFailureHandler();
+ break;
+ case "IGNORE":
+ failureHandler = new IgnoringFailureHandler();
+ break;
+ case "RETRY-REJECTED":
+ failureHandler = new RetryRejectedExecutionFailureHandler();
+ break;
+ default:
+ try {
+ Class> failureHandlerClass = Class.forName(value, false, classLoader);
+ failureHandler =
+ (ActionRequestFailureHandler) InstantiationUtil.instantiate(failureHandlerClass);
+ } catch (ClassNotFoundException e) {
+ throw new ValidationException(
+ "Could not instantiate the failure handler class: " + value, e);
+ }
+ break;
+ }
+ return failureHandler;
+ }
+
+ public String getDocumentType() {
+ return config.get(ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION);
+ }
+
+ public int getBulkFlushMaxActions() {
+ int maxActions = config.get(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+ // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+ return maxActions == 0 ? -1 : maxActions;
+ }
+
+ public long getBulkFlushMaxByteSize() {
+ long maxSize =
+ config.get(ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+ // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+ return maxSize == 0 ? -1 : maxSize;
+ }
+
+ public long getBulkFlushInterval() {
+ long interval = config.get(ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION).toMillis();
+ // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+ return interval == 0 ? -1 : interval;
+ }
+
+ public Optional getUsername() {
+ return config.getOptional(ElasticsearchConnectorOptions.USERNAME_OPTION);
+ }
+
+ public Optional getPassword() {
+ return config.getOptional(ElasticsearchConnectorOptions.PASSWORD_OPTION);
+ }
+
+ public boolean isBulkFlushBackoffEnabled() {
+ return config.get(
+ ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION) != ElasticsearchConnectorOptions.BackOffType.DISABLED;
+ }
+
+ public Optional getBulkFlushBackoffType() {
+ switch (config.get(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION)) {
+ case CONSTANT:
+ return Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
+ case EXPONENTIAL:
+ return Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+ default:
+ return Optional.empty();
+ }
+ }
+
+ public Optional getBulkFlushBackoffRetries() {
+ return config.getOptional(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+ }
+
+ public Optional getBulkFlushBackoffDelay() {
+ return config.getOptional(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION)
+ .map(Duration::toMillis);
+ }
+
+ public boolean isDisableFlushOnCheckpoint() {
+ return !config.get(ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION);
+ }
+
+ public String getIndex() {
+ return config.get(ElasticsearchConnectorOptions.INDEX_OPTION);
+ }
+
+ public String getKeyDelimiter() {
+ return config.get(ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION);
+ }
+
+ public Optional getPathPrefix() {
+ return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ElasticsearchConfiguration that = (ElasticsearchConfiguration) o;
+ return Objects.equals(config, that.config) && Objects.equals(classLoader, that.classLoader);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(config, classLoader);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
new file mode 100644
index 00000000000..27c8e390fdb
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for the Elasticsearch connector.
+ *
+ */
+@PublicEvolving
+public class ElasticsearchConnectorOptions {
+
+ public static final ConfigOption> HOSTS_OPTION =
+ ConfigOptions.key("hosts")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("Elasticsearch hosts to connect to.");
+
+ public static final ConfigOption INDEX_OPTION =
+ ConfigOptions.key("index")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch index for every record.");
+
+ public static final ConfigOption DOCUMENT_TYPE_OPTION =
+ ConfigOptions.key("document-type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch document type.");
+
+ public static final ConfigOption PASSWORD_OPTION =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password used to connect to Elasticsearch instance.");
+
+ public static final ConfigOption USERNAME_OPTION =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Username used to connect to Elasticsearch instance.");
+
+ public static final ConfigOption KEY_DELIMITER_OPTION =
+ ConfigOptions.key("document-id.key-delimiter")
+ .stringType()
+ .defaultValue("_")
+ .withDescription(
+ "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+
+ public static final ConfigOption FAILURE_HANDLER_OPTION =
+ ConfigOptions.key("failure-handler")
+ .stringType()
+ .defaultValue("fail")
+ .withDescription(
+ Description.builder()
+ .text(
+ "Failure handling strategy in case a request to Elasticsearch fails")
+ .list(
+ text(
+ "\"fail\" (throws an exception if a request fails and thus causes a job failure)"),
+ text(
+ "\"ignore\" (ignores failures and drops the request)"),
+ text(
+ "\"retry-rejected\" (re-adds requests that have failed due to queue capacity saturation)"),
+ text(
+ "\"class name\" for failure handling with a ActionRequestFailureHandler subclass"))
+ .build());
+
+ public static final ConfigOption FLUSH_ON_CHECKPOINT_OPTION =
+ ConfigOptions.key("sink.flush-on-checkpoint")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Disables flushing on checkpoint");
+
+ public static final ConfigOption BULK_FLUSH_MAX_ACTIONS_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-actions")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Maximum number of actions to buffer for each bulk request.");
+
+ public static final ConfigOption BULK_FLASH_MAX_SIZE_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("2mb"))
+ .withDescription("Maximum size of buffered actions per bulk request");
+
+ public static final ConfigOption BULK_FLUSH_INTERVAL_OPTION =
+ ConfigOptions.key("sink.bulk-flush.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("Bulk flush interval");
+
+ public static final ConfigOption BULK_FLUSH_BACKOFF_TYPE_OPTION =
+ ConfigOptions.key("sink.bulk-flush.backoff.strategy")
+ .enumType(BackOffType.class)
+ .defaultValue(BackOffType.DISABLED)
+ .withDescription("Backoff strategy");
+
+ public static final ConfigOption BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+ ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Maximum number of retries.");
+
+ public static final ConfigOption BULK_FLUSH_BACKOFF_DELAY_OPTION =
+ ConfigOptions.key("sink.bulk-flush.backoff.delay")
+ .durationType()
+ .noDefaultValue()
+ .withDescription("Delay between each backoff attempt.");
+
+ public static final ConfigOption CONNECTION_PATH_PREFIX =
+ ConfigOptions.key("connection.path-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Prefix string to be added to every REST communication.");
+
+ public static final ConfigOption FORMAT_OPTION =
+ ConfigOptions.key("format")
+ .stringType()
+ .defaultValue("json")
+ .withDescription(
+ "The format must produce a valid JSON document. "
+ + "Please refer to the documentation on formats for more details.");
+
+ // --------------------------------------------------------------------------------------------
+ // Enums
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+ * DISABLED} option.
+ */
+ public enum BackOffType {
+ DISABLED,
+ CONSTANT,
+ EXPONENTIAL
+ }
+
+ private ElasticsearchConnectorOptions() {
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
new file mode 100644
index 00000000000..0b04440a5ac
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utility methods for validating Elasticsearch properties. */
+@Internal
+public class ElasticsearchValidationUtils {
+
+ private static final Set ILLEGAL_PRIMARY_KEY_TYPES = new LinkedHashSet<>();
+
+ static {
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY);
+ }
+
+ /**
+ * Checks that the table does not have primary key defined on illegal types. In Elasticsearch
+ * the primary key is used to calculate the Elasticsearch document id, which is a string of up
+ * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the
+ * fields. Certain types do not have a good string representation to be used in this scenario.
+ * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link
+ * LogicalTypeRoot#RAW} type.
+ */
+ public static void validatePrimaryKey(TableSchema schema) {
+ schema.getPrimaryKey()
+ .ifPresent(
+ key -> {
+ List illegalTypes =
+ key.getColumns().stream()
+ .map(
+ fieldName -> {
+ LogicalType logicalType =
+ schema.getFieldDataType(fieldName)
+ .get()
+ .getLogicalType();
+ if (logicalType.is(
+ LogicalTypeRoot.DISTINCT_TYPE)) {
+ return ((DistinctType) logicalType)
+ .getSourceType()
+ .getTypeRoot();
+ } else {
+ return logicalType.getTypeRoot();
+ }
+ })
+ .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
+ .collect(Collectors.toList());
+
+ if (!illegalTypes.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "The table has a primary key on columns of illegal types: %s.\n"
+ + " Elasticsearch sink does not support primary keys on columns of types: %s.",
+ illegalTypes, ILLEGAL_PRIMARY_KEY_TYPES));
+ }
+ });
+ }
+
+ private ElasticsearchValidationUtils() {
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
new file mode 100644
index 00000000000..38ff5e1fba5
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+/** This interface is responsible to generate index name from given {@link Row} record. */
+@Internal
+interface IndexGenerator extends Serializable {
+
+ /**
+ * Initialize the index generator, this will be called only once before {@link
+ * #generate(RowData)} is called.
+ */
+ default void open() {
+ }
+
+ /** Generate index name according the the given row. */
+ String generate(RowData row);
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
new file mode 100644
index 00000000000..3b43b636f2f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/** Base class for {@link IndexGenerator}. */
+@Internal
+public abstract class IndexGeneratorBase implements IndexGenerator {
+
+ private static final long serialVersionUID = 1L;
+ protected final String index;
+
+ public IndexGeneratorBase(String index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IndexGeneratorBase)) {
+ return false;
+ }
+ IndexGeneratorBase that = (IndexGeneratorBase) o;
+ return index.equals(that.index);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
new file mode 100644
index 00000000000..7f25c8f8a51
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * Flink supports both static index and dynamic index.
+ *
+ *
If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ *
If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+@Internal
+public final class IndexGeneratorFactory {
+
+ private IndexGeneratorFactory() {
+ }
+
+ public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {
+ return createIndexGenerator(index, schema, ZoneId.systemDefault());
+ }
+
+ public static IndexGenerator createIndexGenerator(
+ String index, TableSchema schema, ZoneId localTimeZoneId) {
+ final IndexHelper indexHelper = new IndexHelper();
+ if (indexHelper.checkIsDynamicIndex(index)) {
+ return createRuntimeIndexGenerator(
+ index,
+ schema.getFieldNames(),
+ schema.getFieldDataTypes(),
+ indexHelper,
+ localTimeZoneId);
+ } else {
+ return new StaticIndexGenerator(index);
+ }
+ }
+
+ interface DynamicFormatter extends Serializable {
+
+ String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
+ }
+
+ private static IndexGenerator createRuntimeIndexGenerator(
+ String index,
+ String[] fieldNames,
+ DataType[] fieldTypes,
+ IndexHelper indexHelper,
+ ZoneId localTimeZoneId) {
+ final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
+ final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
+ final String indexSuffix =
+ index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());
+
+ if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
+ final String dateTimeFormat =
+ indexHelper.extractDateFormat(
+ index, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+
+ @Override
+ public String generate(RowData row) {
+ return indexPrefix
+ .concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
+ .concat(indexSuffix);
+ }
+ };
+ }
+
+ final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
+ final int indexFieldPos =
+ indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
+ final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
+ final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();
+
+ // validate index field type
+ indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
+
+ // time extract dynamic index pattern
+ final RowData.FieldGetter fieldGetter =
+ RowData.createFieldGetter(indexFieldType, indexFieldPos);
+
+ if (isDynamicIndexWithFormat) {
+ final String dateTimeFormat =
+ indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
+ DynamicFormatter formatFunction =
+ createFormatFunction(
+ indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId);
+
+ return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+
+ @Override
+ public String generate(RowData row) {
+ Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+ final String formattedField;
+ // TODO we can possibly optimize it to use the nullability of the field
+ if (fieldOrNull != null) {
+ formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
+ } else {
+ formattedField = "null";
+ }
+ return indexPrefix.concat(formattedField).concat(indexSuffix);
+ }
+ };
+ }
+ // general dynamic index pattern
+ return new IndexGeneratorBase(index) {
+
+ @Override
+ public String generate(RowData row) {
+ Object indexField = fieldGetter.getFieldOrNull(row);
+ return indexPrefix
+ .concat(indexField == null ? "null" : indexField.toString())
+ .concat(indexSuffix);
+ }
+ };
+ }
+
+ private static DynamicFormatter createFormatFunction(
+ LogicalType indexFieldType,
+ LogicalTypeRoot indexFieldLogicalTypeRoot,
+ ZoneId localTimeZoneId) {
+ switch (indexFieldLogicalTypeRoot) {
+ case DATE:
+ return (value, dateTimeFormatter) -> {
+ Integer indexField = (Integer) value;
+ return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
+ };
+ case TIME_WITHOUT_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ Integer indexField = (Integer) value;
+ return LocalTime.ofNanoOfDay(indexField * 1_000_000L).format(dateTimeFormatter);
+ };
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ TimestampData indexField = (TimestampData) value;
+ return indexField.toLocalDateTime().format(dateTimeFormatter);
+ };
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException(
+ "TIMESTAMP_WITH_TIME_ZONE is not supported yet");
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ TimestampData indexField = (TimestampData) value;
+ return indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter);
+ };
+ default:
+ throw new TableException(
+ String.format(
+ "Unsupported type '%s' found in Elasticsearch dynamic index field, "
+ + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.",
+ indexFieldType));
+ }
+ }
+
+ /**
+ * Helper class for {@link IndexGeneratorFactory}, this helper can use to validate index field
+ * type ans parse index format from pattern.
+ */
+ public static class IndexHelper {
+
+ private static final Pattern dynamicIndexPattern = Pattern.compile("\\{[^\\{\\}]+\\}?");
+ private static final Pattern dynamicIndexTimeExtractPattern =
+ Pattern.compile(".*\\{.+\\|.*\\}.*");
+ private static final Pattern dynamicIndexSystemTimeExtractPattern =
+ Pattern.compile(
+ ".*\\{\\s*(now\\(\\s*\\)|NOW\\(\\s*\\)|current_timestamp|CURRENT_TIMESTAMP)\\s*\\|.*\\}.*");
+ private static final List supportedTypes = new ArrayList<>();
+ private static final Map defaultFormats = new HashMap<>();
+
+ static {
+ // time related types
+ supportedTypes.add(LogicalTypeRoot.DATE);
+ supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ // general types
+ supportedTypes.add(LogicalTypeRoot.VARCHAR);
+ supportedTypes.add(LogicalTypeRoot.CHAR);
+ supportedTypes.add(LogicalTypeRoot.TINYINT);
+ supportedTypes.add(LogicalTypeRoot.INTEGER);
+ supportedTypes.add(LogicalTypeRoot.BIGINT);
+ }
+
+ static {
+ defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd");
+ defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss");
+ defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+ defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+ defaultFormats.put(
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "yyyy_MM_dd_HH_mm_ssX");
+ }
+
+ /** Validate the index field Type. */
+ void validateIndexFieldType(LogicalTypeRoot logicalType) {
+ if (!supportedTypes.contains(logicalType)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported type %s of index field, " + "Supported types are: %s",
+ logicalType, supportedTypes));
+ }
+ }
+
+ /** Get the default date format. */
+ String getDefaultFormat(LogicalTypeRoot logicalType) {
+ return defaultFormats.get(logicalType);
+ }
+
+ /** Check general dynamic index is enabled or not by index pattern. */
+ boolean checkIsDynamicIndex(String index) {
+ final Matcher matcher = dynamicIndexPattern.matcher(index);
+ int count = 0;
+ while (matcher.find()) {
+ count++;
+ }
+ if (count > 1) {
+ throw new TableException(
+ String.format(
+ "Chaining dynamic index pattern %s is not supported,"
+ + " only support single dynamic index pattern.",
+ index));
+ }
+ return count == 1;
+ }
+
+ /** Check time extract dynamic index is enabled or not by index pattern. */
+ boolean checkIsDynamicIndexWithFormat(String index) {
+ return dynamicIndexTimeExtractPattern.matcher(index).matches();
+ }
+
+ /** Check generate dynamic index is from system time or not. */
+ public boolean checkIsDynamicIndexWithSystemTimeFormat(String index) {
+ return dynamicIndexSystemTimeExtractPattern.matcher(index).matches();
+ }
+
+ /** Extract dynamic index pattern string from index pattern string. */
+ String extractDynamicIndexPatternStr(String index) {
+ int start = index.indexOf("{");
+ int end = index.lastIndexOf("}");
+ return index.substring(start, end + 1);
+ }
+
+ /** Extract index field position in a fieldNames, return the field position. */
+ int extractIndexFieldPos(
+ String index, String[] fieldNames, boolean isDynamicIndexWithFormat) {
+ List fieldList = Arrays.asList(fieldNames);
+ String indexFieldName;
+ if (isDynamicIndexWithFormat) {
+ indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("|"));
+ } else {
+ indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("}"));
+ }
+ if (!fieldList.contains(indexFieldName)) {
+ throw new TableException(
+ String.format(
+ "Unknown field '%s' in index pattern '%s', please check the field name.",
+ indexFieldName, index));
+ }
+ return fieldList.indexOf(indexFieldName);
+ }
+
+ /** Extract dateTime format by the date format that extracted from index pattern string. */
+ private String extractDateFormat(String index, LogicalTypeRoot logicalType) {
+ String format = index.substring(index.indexOf("|") + 1, index.indexOf("}"));
+ if ("".equals(format)) {
+ format = getDefaultFormat(logicalType);
+ }
+ return format;
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
new file mode 100644
index 00000000000..e4808c0e6b0
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+@Internal
+public class KeyExtractor implements Function, Serializable {
+
+ private final FieldFormatter[] fieldFormatters;
+ private final String keyDelimiter;
+
+ private interface FieldFormatter extends Serializable {
+
+ String format(RowData rowData);
+ }
+
+ private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) {
+ this.fieldFormatters = fieldFormatters;
+ this.keyDelimiter = keyDelimiter;
+ }
+
+ @Override
+ public String apply(RowData rowData) {
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < fieldFormatters.length; i++) {
+ if (i > 0) {
+ builder.append(keyDelimiter);
+ }
+ final String value = fieldFormatters[i].format(rowData);
+ builder.append(value);
+ }
+ return builder.toString();
+ }
+
+ private static class ColumnWithIndex {
+
+ public TableColumn column;
+ public int index;
+
+ public ColumnWithIndex(TableColumn column, int index) {
+ this.column = column;
+ this.index = index;
+ }
+
+ public LogicalType getType() {
+ return column.getType().getLogicalType();
+ }
+
+ public int getIndex() {
+ return index;
+ }
+ }
+
+ public static Function createKeyExtractor(
+ TableSchema schema, String keyDelimiter) {
+ return schema.getPrimaryKey()
+ .map(
+ key -> {
+ Map namesToColumns = new HashMap<>();
+ List tableColumns = schema.getTableColumns();
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ TableColumn column = tableColumns.get(i);
+ namesToColumns.put(
+ column.getName(), new ColumnWithIndex(column, i));
+ }
+
+ FieldFormatter[] fieldFormatters =
+ key.getColumns().stream()
+ .map(namesToColumns::get)
+ .map(
+ column -> toFormatter(
+ column.index, column.getType()))
+ .toArray(FieldFormatter[]::new);
+
+ return (Function) new KeyExtractor(fieldFormatters, keyDelimiter);
+ })
+ .orElseGet(() -> (Function & Serializable) (row) -> null);
+ }
+
+ private static FieldFormatter toFormatter(int index, LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case DATE:
+ return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString();
+ case TIME_WITHOUT_TIME_ZONE:
+ return (row) -> LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString();
+ case INTERVAL_YEAR_MONTH:
+ return (row) -> Period.ofDays(row.getInt(index)).toString();
+ case INTERVAL_DAY_TIME:
+ return (row) -> Duration.ofMillis(row.getLong(index)).toString();
+ case DISTINCT_TYPE:
+ return toFormatter(index, ((DistinctType) type).getSourceType());
+ default:
+ RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, index);
+ return (row) -> fieldGetter.getFieldOrNull(row).toString();
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
new file mode 100644
index 00000000000..24815296df5
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.Serializable;
+
+/** For version-agnostic creating of {@link ActionRequest}s. */
+@Internal
+public interface RequestFactory extends Serializable {
+
+ /**
+ * Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has
+ * been deprecated since Elasticsearch 7.x and it would not take any effort.
+ */
+ UpdateRequest createUpdateRequest(
+ String index, String docType, String key, XContentType contentType, byte[] document);
+
+ /**
+ * Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has
+ * been deprecated since Elasticsearch 7.x and it would not take any effort.
+ */
+ IndexRequest createIndexRequest(
+ String index, String docType, String key, XContentType contentType, byte[] document);
+
+ /**
+ * Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has
+ * been deprecated since Elasticsearch 7.x and it would not take any effort.
+ */
+ DeleteRequest createDeleteRequest(String index, String docType, String key);
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
new file mode 100644
index 00000000000..260ed576d93
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
+@Internal
+public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IndexGenerator indexGenerator;
+ private final String docType;
+ private final SerializationSchema serializationSchema;
+ private final XContentType contentType;
+ private final RequestFactory requestFactory;
+ private final Function createKey;
+
+ public RowElasticsearchSinkFunction(
+ IndexGenerator indexGenerator,
+ @Nullable String docType, // this is deprecated in es 7+
+ SerializationSchema serializationSchema,
+ XContentType contentType,
+ RequestFactory requestFactory,
+ Function createKey) {
+ this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+ this.docType = docType;
+ this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+ this.contentType = Preconditions.checkNotNull(contentType);
+ this.requestFactory = Preconditions.checkNotNull(requestFactory);
+ this.createKey = Preconditions.checkNotNull(createKey);
+ }
+
+ @Override
+ public void open(RuntimeContext ctx) throws Exception {
+ serializationSchema.open(
+ RuntimeContextInitializationContextAdapters.serializationAdapter(ctx));
+ indexGenerator.open();
+ }
+
+ @Override
+ public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) {
+ switch (element.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ processUpsert(element, indexer);
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ processDelete(element, indexer);
+ break;
+ default:
+ throw new TableException("Unsupported message kind: " + element.getRowKind());
+ }
+ }
+
+ private void processUpsert(RowData row, RequestIndexer indexer) {
+ final byte[] document = serializationSchema.serialize(row);
+ final String key = createKey.apply(row);
+ if (key != null) {
+ final UpdateRequest updateRequest =
+ requestFactory.createUpdateRequest(
+ indexGenerator.generate(row), docType, key, contentType, document);
+ indexer.add(updateRequest);
+ } else {
+ final IndexRequest indexRequest =
+ requestFactory.createIndexRequest(
+ indexGenerator.generate(row), docType, key, contentType, document);
+ indexer.add(indexRequest);
+ }
+ }
+
+ private void processDelete(RowData row, RequestIndexer indexer) {
+ final String key = createKey.apply(row);
+ final DeleteRequest deleteRequest =
+ requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
+ indexer.add(deleteRequest);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowElasticsearchSinkFunction that = (RowElasticsearchSinkFunction) o;
+ return Objects.equals(indexGenerator, that.indexGenerator)
+ && Objects.equals(docType, that.docType)
+ && Objects.equals(serializationSchema, that.serializationSchema)
+ && contentType == that.contentType
+ && Objects.equals(requestFactory, that.requestFactory)
+ && Objects.equals(createKey, that.createKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ indexGenerator,
+ docType,
+ serializationSchema,
+ contentType,
+ requestFactory,
+ createKey);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
new file mode 100644
index 00000000000..0f59ce0969e
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+
+/** A static {@link IndexGenerator} which generate fixed index name. */
+@Internal
+final class StaticIndexGenerator extends IndexGeneratorBase {
+
+ public StaticIndexGenerator(String index) {
+ super(index);
+ }
+
+ public String generate(RowData row) {
+ return index;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
new file mode 100644
index 00000000000..cdbfa731a91
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.util;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+
+/** Ignores all kinds of failures and drops the affected {@link ActionRequest}. */
+@Internal
+public class IgnoringFailureHandler implements ActionRequestFailureHandler {
+
+ private static final long serialVersionUID = 1662846593501L;
+
+ @Override
+ public void onFailure(
+ ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
+ // ignore failure
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
new file mode 100644
index 00000000000..477b2b6b6fb
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.util;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An {@link ActionRequestFailureHandler} that simply fails the sink on any failures. */
+@Internal
+public class NoOpFailureHandler implements ActionRequestFailureHandler {
+
+ private static final long serialVersionUID = 737941343410827885L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(NoOpFailureHandler.class);
+
+ @Override
+ public void onFailure(
+ ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
+ throws Throwable {
+ LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+ // simply fail the sink
+ throw failure;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof NoOpFailureHandler;
+ }
+
+ @Override
+ public int hashCode() {
+ return NoOpFailureHandler.class.hashCode();
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
new file mode 100644
index 00000000000..1afb7b0ee61
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.util;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary {@link
+ * EsRejectedExecutionException}s (which means that Elasticsearch node queues are currently full),
+ * and fails for all other failures.
+ *
+ */
+@PublicEvolving
+public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
+
+ private static final long serialVersionUID = -7423562912824511906L;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RetryRejectedExecutionFailureHandler.class);
+
+ @Override
+ public void onFailure(
+ ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
+ throws Throwable {
+ LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
+ if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
+ indexer.add(action);
+ } else {
+ // rethrow all other failures
+ throw failure;
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
index 72132b4de61..f21472326a9 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
@@ -33,6 +33,7 @@
pulsar
jdbc
+ elasticsearch-base
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 8d1fd1ede92..d5b47b354e5 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -938,6 +938,27 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
Source : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConnectorOptions.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/IgnoringFailureHandler.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/NoOpFailureHandler.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ActionRequestFailureHandler.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/RequestIndexer.java
+Source : org.apache.flink:flink-connector-elasticsearch-base-3.0.1-1.17.jar (Please note that the software have been modified.)
+License : https://github.com/apache/flink/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents: