Skip to content

Commit 95a43a7

Browse files
committed
upgrade opensearch sink to client 2.16 and tests to use server 2.16.0
1 parent 778217d commit 95a43a7

File tree

8 files changed

+27
-19
lines changed

8 files changed

+27
-19
lines changed

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ flexible messaging model and an intuitive client API.</description>
158158
<bouncycastle.version>1.78.1</bouncycastle.version>
159159
<bouncycastle.bcpkix-fips.version>1.0.7</bouncycastle.bcpkix-fips.version>
160160
<bouncycastle.bc-fips.version>1.0.2.5</bouncycastle.bc-fips.version>
161-
<jackson.version>2.14.2</jackson.version>
161+
<jackson.version>2.16.0</jackson.version>
162162
<reflections.version>0.10.2</reflections.version>
163163
<swagger.version>1.6.2</swagger.version>
164164
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
@@ -194,7 +194,7 @@ flexible messaging model and an intuitive client API.</description>
194194
<openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version>
195195
<hdfs-offload-version3>3.3.5</hdfs-offload-version3>
196196
<json-smart.version>2.4.10</json-smart.version>
197-
<opensearch.version>1.2.4</opensearch.version>
197+
<opensearch.version>2.16.0</opensearch.version>
198198
<elasticsearch-java.version>8.12.1</elasticsearch-java.version>
199199
<trino.version>368</trino.version>
200200
<debezium.version>2.6.1.Final</debezium.version>

pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static java.lang.String.format;
2323
import static java.util.Objects.requireNonNull;
24-
import com.fasterxml.jackson.databind.AnnotationIntrospector;
25-
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
24+
import com.fasterxml.jackson.databind.DeserializationConfig;
25+
import com.fasterxml.jackson.databind.ObjectMapper;
2626
import com.fasterxml.jackson.databind.util.EnumResolver;
2727
import java.lang.reflect.Field;
2828
import java.lang.reflect.Method;
@@ -58,7 +58,7 @@ public final class FieldParser {
5858
private static final Map<String, Method> CONVERTERS = new HashMap<>();
5959
private static final Map<Class<?>, Class<?>> WRAPPER_TYPES = new HashMap<>();
6060

61-
private static final AnnotationIntrospector ANNOTATION_INTROSPECTOR = new JacksonAnnotationIntrospector();
61+
private static final DeserializationConfig DESERIALIZATION_CONFIG = new ObjectMapper().getDeserializationConfig();
6262

6363
static {
6464
// Preload converters and wrapperTypes.
@@ -100,7 +100,7 @@ public static <T> T convert(Object from, Class<T> to) {
100100

101101
if (to.isEnum()) {
102102
// Converting string to enum
103-
EnumResolver r = EnumResolver.constructUsingToString((Class<Enum<?>>) to, ANNOTATION_INTROSPECTOR);
103+
EnumResolver r = EnumResolver.constructUsingToString(DESERIALIZATION_CONFIG, to);
104104
T value = (T) r.findEnum((String) from);
105105
if (value == null) {
106106
throw new RuntimeException("Invalid value '" + from + "' for enum " + to);

pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@
4949
import org.opensearch.client.indices.CreateIndexRequest;
5050
import org.opensearch.client.indices.CreateIndexResponse;
5151
import org.opensearch.client.indices.GetIndexRequest;
52-
import org.opensearch.common.Strings;
5352
import org.opensearch.common.settings.Settings;
54-
import org.opensearch.common.unit.ByteSizeUnit;
55-
import org.opensearch.common.unit.ByteSizeValue;
5653
import org.opensearch.common.unit.TimeValue;
5754
import org.opensearch.common.xcontent.XContentType;
55+
import org.opensearch.core.common.Strings;
56+
import org.opensearch.core.common.unit.ByteSizeUnit;
57+
import org.opensearch.core.common.unit.ByteSizeValue;
5858
import org.opensearch.index.query.QueryBuilder;
5959
import org.opensearch.index.query.QueryBuilders;
6060
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -229,7 +229,7 @@ public boolean indexDocument(String index, String documentId, String documentSou
229229
if (!Strings.isNullOrEmpty(documentId)) {
230230
indexRequest.id(documentId);
231231
}
232-
indexRequest.type(config.getTypeName());
232+
233233
indexRequest.source(documentSource, XContentType.JSON);
234234

235235
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
@@ -245,7 +245,7 @@ public boolean indexDocument(String index, String documentId, String documentSou
245245
public boolean deleteDocument(String index, String documentId) throws IOException {
246246
DeleteRequest deleteRequest = Requests.deleteRequest(index);
247247
deleteRequest.id(documentId);
248-
deleteRequest.type(config.getTypeName());
248+
249249
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
250250
if (log.isDebugEnabled()) {
251251
log.debug("delete result {}", deleteResponse.getResult());
@@ -301,7 +301,7 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
301301
if (!Strings.isNullOrEmpty(request.getDocumentId())) {
302302
indexRequest.id(request.getDocumentId());
303303
}
304-
indexRequest.type(config.getTypeName());
304+
305305
indexRequest.source(request.getDocumentSource(), XContentType.JSON);
306306
if (log.isDebugEnabled()) {
307307
log.debug("append index request id={}, type={}, source={}", request.getDocumentId(), config.getTypeName(),
@@ -314,7 +314,7 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
314314
public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) throws IOException {
315315
DeleteRequest deleteRequest = new DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord());
316316
deleteRequest.id(request.getDocumentId());
317-
deleteRequest.type(config.getTypeName());
317+
318318
if (log.isDebugEnabled()) {
319319
log.debug("append delete request id={}, type={}", request.getDocumentId(), config.getTypeName());
320320
}

pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class ElasticSearchTestBase {
4646
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
4747

4848
public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
49-
.orElse("opensearchproject/opensearch:1.2.4");
49+
.orElse("opensearchproject/opensearch:2.16.0");
5050

5151
protected final String elasticImageName;
5252

@@ -59,6 +59,7 @@ protected ElasticsearchContainer createElasticsearchContainer() {
5959
if (elasticImageName.equals(OPENSEARCH)) {
6060
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
6161
elasticsearchContainer = new ElasticsearchContainer(dockerImageName)
62+
.withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!")
6263
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
6364
.withEnv("bootstrap.memory_lock", "true")
6465
.withEnv("plugins.security.disabled", "true");

pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void testSslBasic() throws IOException {
7777
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
7878
.setIndexName(INDEX)
7979
.setUsername("admin")
80-
.setPassword("admin")
80+
.setPassword("0pEn7earch!")
8181
.setSsl(new ElasticSearchSslConfig()
8282
.setEnabled(true)
8383
.setTruststorePath(sslResourceDir + "/truststore.jks")
@@ -102,7 +102,7 @@ public void testSslWithHostnameVerification() throws IOException {
102102
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
103103
.setIndexName(INDEX)
104104
.setUsername("admin")
105-
.setPassword("admin")
105+
.setPassword("0pEn7earch!")
106106
.setSsl(new ElasticSearchSslConfig()
107107
.setEnabled(true)
108108
.setProtocols("TLSv1.2")
@@ -128,7 +128,7 @@ public void testSslWithClientAuth() throws IOException {
128128
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
129129
.setIndexName(INDEX)
130130
.setUsername("admin")
131-
.setPassword("admin")
131+
.setPassword("0pEn7earch!")
132132
.setSsl(new ElasticSearchSslConfig()
133133
.setEnabled(true)
134134
.setHostnameVerification(true)

tests/docker-images/latest-version-image/conf/functions_worker.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ autostart=false
2222
redirect_stderr=true
2323
stdout_logfile=/var/log/pulsar/functions_worker.log
2424
directory=/pulsar
25-
environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/pulsar/logs/functions",PULSAR_GC="-XX:+UseZGC"
25+
environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/pulsar/logs/functions",PULSAR_GC="-XX:+UseG1GC"
2626
command=/pulsar/bin/pulsar functions-worker
2727
user=pulsar
2828
stopwaitsecs=15

tests/integration/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@
220220
<scope>test</scope>
221221
</dependency>
222222

223+
<dependency>
224+
<groupId>org.projectlombok</groupId>
225+
<artifactId>lombok</artifactId>
226+
<scope>test</scope>
227+
</dependency>
228+
223229
<!-- mongodb -->
224230
<dependency>
225231
<groupId>org.testcontainers</groupId>

tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class OpenSearchSinkTester extends ElasticSearchSinkTester {
3636

3737
public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
38-
.orElse("opensearchproject/opensearch:1.2.4");
38+
.orElse("opensearchproject/opensearch:2.16.0");
3939

4040
private RestHighLevelClient elasticClient;
4141

@@ -49,6 +49,7 @@ protected ElasticsearchContainer createElasticContainer() {
4949
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
5050
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
5151
return new ElasticsearchContainer(dockerImageName)
52+
.withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!")
5253
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
5354
.withEnv("bootstrap.memory_lock", "true")
5455
.withEnv("plugins.security.disabled", "true");

0 commit comments

Comments
 (0)