Skip to content

Commit

Permalink
[INLONG-10739][Sort] Add Elasticsearch connector base on flink1.15 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoYou201 authored Aug 2, 2024
1 parent 3af0a7f commit 4eee655
Show file tree
Hide file tree
Showing 23 changed files with 2,476 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connectors-v1.15</artifactId>
<version>1.14.0-SNAPSHOT</version>
</parent>
<artifactId>sort-connector-elasticsearch-base-v1.15</artifactId>
<packaging>jar</packaging>
<name>Apache InLong - Sort-connector-elasticsearch-base</name>

<properties>
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
<elasticsearch.version>7.10.2</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-flink-dependencies-v1.15</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<!--
FLINK-7133: Excluding all org.ow2.asm from elasticsearch dependencies because
1. from the POV of client they are optional,
2. the version configured by default at the time of writing this comment (1.7.1) depends on asm 4.1
and when it is shaded into elasticsearch-base artifact it conflicts with newer shaded versions of asm
resulting in errors at the runtime when application is executed locally, e.g. from IDE.
-->
<exclusions>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--
Including Log4j2 dependencies for tests is required for the
embedded Elasticsearch nodes used in tests to run correctly.
-->

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- Shade all the dependencies to avoid conflicts -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<filters>
<filter>
<artifact>org.apache.inlong:sort-connector-*</artifact>
<includes>
<include>org/apache/inlong/**</include>
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
</includes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>log4j.properties</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.elasticsearch;

import org.apache.flink.annotation.PublicEvolving;
import org.elasticsearch.action.ActionRequest;

import java.io.Serializable;

/**
* An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how
* failed {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing
* malformed documents, or simply requesting them to be sent to Elasticsearch again if the failure
* is only temporary.
*
* <p>Example:
*
* <pre>{@code
* private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
*
* @Override
* void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
* if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
* // full queue; re-add document for indexing
* indexer.add(action);
* } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
* // malformed document; simply drop request without failing sink
* } else {
* // for all other failures, fail the sink;
* // here the failure is simply rethrown, but users can also choose to throw custom exceptions
* throw failure;
* }
* }
* }
*
* }</pre>
*
* <p>The above example will let the sink re-add requests that failed due to queue capacity
* saturation and drop requests with malformed documents, without failing the sink. For all other
* failures, the sink will fail.
*
* <p>Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the
* exact type could not be retrieved through the older version Java client APIs (thus, the types
* will be general {@link Exception}s and only differ in the failure message). In this case, it is
* recommended to match on the provided REST status code.
*
*/
@PublicEvolving
public interface ActionRequestFailureHandler extends Serializable {

/**
* Handle a failed {@link ActionRequest}.
*
* @param action the {@link ActionRequest} that failed due to the failure
* @param failure the cause of failure
* @param restStatusCode the REST status code of the failure (-1 if none can be retrieved)
* @param indexer request indexer to re-add the failed action, if intended to do so
* @throws Throwable if the sink should fail on this failure, the implementation should rethrow
* the exception or a custom one
*/
void onFailure(
ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.elasticsearch;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;

import javax.annotation.concurrent.NotThreadSafe;

import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Implementation of a {@link RequestIndexer} that buffers {@link ActionRequest ActionRequests}
* before re-sending them to the Elasticsearch cluster upon request.
*/
@NotThreadSafe
public class BufferingNoOpRequestIndexer implements RequestIndexer {

private ConcurrentLinkedQueue<ActionRequest> bufferedRequests;

BufferingNoOpRequestIndexer() {
this.bufferedRequests = new ConcurrentLinkedQueue<ActionRequest>();
}

@Override
public void add(DeleteRequest... deleteRequests) {
Collections.addAll(bufferedRequests, deleteRequests);
}

@Override
public void add(IndexRequest... indexRequests) {
Collections.addAll(bufferedRequests, indexRequests);
}

@Override
public void add(UpdateRequest... updateRequests) {
Collections.addAll(bufferedRequests, updateRequests);
}

void processBufferedRequests(RequestIndexer actualIndexer) {
for (ActionRequest request : bufferedRequests) {
if (request instanceof IndexRequest) {
actualIndexer.add((IndexRequest) request);
} else if (request instanceof DeleteRequest) {
actualIndexer.add((DeleteRequest) request);
} else if (request instanceof UpdateRequest) {
actualIndexer.add((UpdateRequest) request);
}
}

bufferedRequests.clear();
}
}
Loading

0 comments on commit 4eee655

Please sign in to comment.