Skip to content

Commit 62fbcc9

Browse files
lhotarisrinath-ctds
authored andcommitted
[improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder (apache#22541)
(cherry picked from commit 3e7dbb4) (cherry picked from commit 4c480fd)
1 parent 5149aef commit 62fbcc9

File tree

24 files changed

+687
-230
lines changed

24 files changed

+687
-230
lines changed

distribution/server/src/assemble/LICENSE.bin.txt

+11-9
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,8 @@ The Apache Software License, Version 2.0
499499
- io.reactivex.rxjava3-rxjava-3.0.1.jar
500500
* RoaringBitmap
501501
- org.roaringbitmap-RoaringBitmap-1.2.0.jar
502+
* Spotify completable-futures
503+
- com.spotify-completable-futures-0.3.6.jar
502504

503505
BSD 3-clause "New" or "Revised" License
504506
* Google auth library
@@ -543,15 +545,15 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
543545
- org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
544546
- org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
545547
* Jersey
546-
- org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar
547-
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar
548-
- org.glassfish.jersey.core-jersey-client-2.41.jar
549-
- org.glassfish.jersey.core-jersey-common-2.41.jar
550-
- org.glassfish.jersey.core-jersey-server-2.41.jar
551-
- org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar
552-
- org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar
553-
- org.glassfish.jersey.media-jersey-media-multipart-2.41.jar
554-
- org.glassfish.jersey.inject-jersey-hk2-2.41.jar
548+
- org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar
549+
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar
550+
- org.glassfish.jersey.core-jersey-client-2.42.jar
551+
- org.glassfish.jersey.core-jersey-common-2.42.jar
552+
- org.glassfish.jersey.core-jersey-server-2.42.jar
553+
- org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar
554+
- org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar
555+
- org.glassfish.jersey.media-jersey-media-multipart-2.42.jar
556+
- org.glassfish.jersey.inject-jersey-hk2-2.42.jar
555557
* Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar
556558

557559
Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt

distribution/shell/src/assemble/LICENSE.bin.txt

+7-6
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ The Apache Software License, Version 2.0
409409
* Apache Avro
410410
- avro-1.11.3.jar
411411
- avro-protobuf-1.11.3.jar
412+
* Spotify completable-futures -- completable-futures-0.3.6.jar
412413

413414
BSD 3-clause "New" or "Revised" License
414415
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
@@ -438,12 +439,12 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
438439
- aopalliance-repackaged-2.6.1.jar
439440
- osgi-resource-locator-1.0.3.jar
440441
* Jersey
441-
- jersey-client-2.41.jar
442-
- jersey-common-2.41.jar
443-
- jersey-entity-filtering-2.41.jar
444-
- jersey-media-json-jackson-2.41.jar
445-
- jersey-media-multipart-2.41.jar
446-
- jersey-hk2-2.41.jar
442+
- jersey-client-2.42.jar
443+
- jersey-common-2.42.jar
444+
- jersey-entity-filtering-2.42.jar
445+
- jersey-media-json-jackson-2.42.jar
446+
- jersey-media-multipart-2.42.jar
447+
- jersey-hk2-2.42.jar
447448
* Mimepull -- mimepull-1.9.15.jar
448449

449450
Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt

pom.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ flexible messaging model and an intuitive client API.</description>
147147
<netty-iouring.version>0.0.21.Final</netty-iouring.version>
148148
<jetty.version>9.4.54.v20240208</jetty.version>
149149
<conscrypt.version>2.5.2</conscrypt.version>
150-
<jersey.version>2.41</jersey.version>
150+
<jersey.version>2.42</jersey.version>
151151
<athenz.version>1.10.50</athenz.version>
152152
<prometheus.version>0.16.0</prometheus.version>
153153
<vertx.version>4.5.8</vertx.version>
@@ -255,6 +255,7 @@ flexible messaging model and an intuitive client API.</description>
255255
<disruptor.version>3.4.3</disruptor.version>
256256
<zstd-jni.version>1.5.2-3</zstd-jni.version>
257257
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
258+
<completable-futures.version>0.3.6</completable-futures.version>
258259
<failsafe.version>3.3.2</failsafe.version>
259260

260261
<!-- test dependencies -->
@@ -646,6 +647,12 @@ flexible messaging model and an intuitive client API.</description>
646647
<version>${bookkeeper.version}</version>
647648
</dependency>
648649

650+
<dependency>
651+
<groupId>com.spotify</groupId>
652+
<artifactId>completable-futures</artifactId>
653+
<version>${completable-futures.version}</version>
654+
</dependency>
655+
649656
<dependency>
650657
<groupId>org.rocksdb</groupId>
651658
<artifactId>rocksdbjni</artifactId>

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java

+26
Original file line numberDiff line numberDiff line change
@@ -327,4 +327,30 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
327327
*/
328328
PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader);
329329

330+
331+
/**
332+
* Configures the maximum number of connections that the client library will establish with a single host.
333+
* <p>
334+
* By default, the connection pool maintains up to 16 connections to a single host. This method allows you to
335+
* modify this default behavior and limit the number of connections.
336+
* <p>
337+
* This setting can be useful in scenarios where you want to limit the resources used by the client library,
338+
* or control the level of parallelism for operations so that a single client does not overwhelm
339+
* the Pulsar cluster with too many concurrent connections.
340+
*
341+
* @param maxConnectionsPerHost the maximum number of connections to establish per host. Set to <= 0 to disable
342+
* the limit.
343+
* @return the PulsarAdminBuilder instance, allowing for method chaining
344+
*/
345+
PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost);
346+
347+
/**
348+
* Sets the maximum idle time for a pooled connection. If a connection is idle for more than the specified
349+
* amount of seconds, it will be released back to the connection pool.
350+
* Defaults to 25 seconds.
351+
*
352+
* @param connectionMaxIdleSeconds the maximum idle time, in seconds, for a pooled connection
353+
* @return the PulsarAdminBuilder instance
354+
*/
355+
PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
330356
}

pulsar-client-admin-shaded/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
<include>com.google.protobuf:protobuf-java</include>
124124
<include>com.google.guava:guava</include>
125125
<include>com.google.code.gson:gson</include>
126+
<include>com.spotify:completable-futures</include>
126127
<include>com.fasterxml.jackson.*:*</include>
127128
<include>io.netty:*</include>
128129
<include>io.netty.incubator:*</include>
@@ -192,6 +193,10 @@
192193
<exclude>com.google.protobuf.*</exclude>
193194
</excludes>
194195
</relocation>
196+
<relocation>
197+
<pattern>com.spotify.futures</pattern>
198+
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
199+
</relocation>
195200
<relocation>
196201
<pattern>com.fasterxml.jackson</pattern>
197202
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java

+22-48
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.asynchttpclient.Dsl.post;
2323
import static org.asynchttpclient.Dsl.put;
2424
import com.google.gson.Gson;
25-
import io.netty.handler.codec.http.HttpHeaders;
2625
import java.io.File;
2726
import java.io.FileOutputStream;
2827
import java.io.IOException;
@@ -41,6 +40,7 @@
4140
import org.apache.commons.lang3.StringUtils;
4241
import org.apache.pulsar.client.admin.Functions;
4342
import org.apache.pulsar.client.admin.PulsarAdminException;
43+
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
4444
import org.apache.pulsar.client.api.Authentication;
4545
import org.apache.pulsar.common.functions.FunctionConfig;
4646
import org.apache.pulsar.common.functions.FunctionDefinition;
@@ -54,10 +54,8 @@
5454
import org.apache.pulsar.common.policies.data.FunctionStats;
5555
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
5656
import org.apache.pulsar.common.policies.data.FunctionStatus;
57-
import org.asynchttpclient.AsyncHandler;
58-
import org.asynchttpclient.AsyncHttpClient;
57+
import org.asynchttpclient.AsyncCompletionHandlerBase;
5958
import org.asynchttpclient.HttpResponseBodyPart;
60-
import org.asynchttpclient.HttpResponseStatus;
6159
import org.asynchttpclient.RequestBuilder;
6260
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
6361
import org.asynchttpclient.request.body.multipart.FilePart;
@@ -70,12 +68,14 @@
7068
public class FunctionsImpl extends ComponentResource implements Functions {
7169

7270
private final WebTarget functions;
73-
private final AsyncHttpClient asyncHttpClient;
71+
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;
7472

75-
public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) {
73+
public FunctionsImpl(WebTarget web, Authentication auth,
74+
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
75+
long requestTimeoutMs) {
7676
super(auth, requestTimeoutMs);
7777
this.functions = web.path("/admin/v3/functions");
78-
this.asyncHttpClient = asyncHttpClient;
78+
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
7979
}
8080

8181
@Override
@@ -171,8 +171,7 @@ public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig
171171
// If the function code is built in, we don't need to submit here
172172
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
173173
}
174-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
175-
.toCompletableFuture()
174+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
176175
.thenAccept(response -> {
177176
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
178177
future.completeExceptionally(
@@ -263,8 +262,7 @@ public CompletableFuture<Void> updateFunctionAsync(
263262
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
264263
}
265264

266-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
267-
.toCompletableFuture()
265+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
268266
.thenAccept(response -> {
269267
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
270268
future.completeExceptionally(
@@ -464,7 +462,7 @@ public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String pat
464462
.addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
465463
.addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
466464

467-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture()
465+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
468466
.thenAccept(response -> {
469467
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
470468
future.completeExceptionally(
@@ -543,55 +541,31 @@ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTar
543541

544542
RequestBuilder builder = get(target.getUri().toASCIIString());
545543

546-
CompletableFuture<HttpResponseStatus> statusFuture =
547-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(),
548-
new AsyncHandler<HttpResponseStatus>() {
549-
private HttpResponseStatus status;
550-
551-
@Override
552-
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
553-
status = responseStatus;
554-
if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
555-
return State.ABORT;
556-
}
557-
return State.CONTINUE;
558-
}
559-
560-
@Override
561-
public State onHeadersReceived(HttpHeaders headers) throws Exception {
562-
return State.CONTINUE;
563-
}
544+
CompletableFuture<org.asynchttpclient.Response> responseFuture =
545+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build(),
546+
() -> new AsyncCompletionHandlerBase() {
564547

565548
@Override
566549
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
567550
os.write(bodyPart.getBodyByteBuffer());
568551
return State.CONTINUE;
569552
}
553+
});
570554

571-
@Override
572-
public HttpResponseStatus onCompleted() throws Exception {
573-
return status;
574-
}
575-
576-
@Override
577-
public void onThrowable(Throwable t) {
578-
}
579-
}).toCompletableFuture();
580-
581-
statusFuture
582-
.whenComplete((status, throwable) -> {
555+
responseFuture
556+
.whenComplete((response, throwable) -> {
583557
try {
584558
os.close();
585559
} catch (IOException e) {
586560
future.completeExceptionally(getApiException(e));
587561
}
588562
})
589-
.thenAccept(status -> {
590-
if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
563+
.thenAccept(response -> {
564+
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
591565
future.completeExceptionally(
592566
getApiException(Response
593-
.status(status.getStatusCode())
594-
.entity(status.getStatusText())
567+
.status(response.getStatusCode())
568+
.entity(response.getStatusText())
595569
.build()));
596570
} else {
597571
future.complete(null);
@@ -700,7 +674,7 @@ public CompletableFuture<Void> putFunctionStateAsync(
700674
.path("state").path(state.getKey()).getUri().toASCIIString());
701675
builder.addBodyPart(new StringPart("state", objectWriter()
702676
.writeValueAsString(state), MediaType.APPLICATION_JSON));
703-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
677+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
704678
.toCompletableFuture()
705679
.thenAccept(response -> {
706680
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
@@ -740,7 +714,7 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n
740714
.addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
741715
.addBodyPart(new StringPart("delete", Boolean.toString(delete)));
742716

743-
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
717+
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
744718
.toCompletableFuture()
745719
.thenAccept(response -> {
746720
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {

0 commit comments

Comments
 (0)