Skip to content

Commit 02e8f69

Browse files
nodecesrinath-ctds
authored andcommitted
[fix][client] Use dedicated executor for requests in BinaryProtoLookupService (apache#23378) (apache#23461)
Signed-off-by: Zixuan Liu <[email protected]> (cherry picked from commit 7006381)
1 parent 637014b commit 02e8f69

File tree

4 files changed

+143
-23
lines changed

4 files changed

+143
-23
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+5
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
244244
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
245245
private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
246246
private final Timer brokerClientSharedTimer;
247+
private final ExecutorProvider brokerClientSharedLookupExecutorProvider;
247248

248249
private MetricsGenerator metricsGenerator;
249250

@@ -349,6 +350,8 @@ public PulsarService(ServiceConfiguration config,
349350
new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor");
350351
this.brokerClientSharedTimer =
351352
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
353+
this.brokerClientSharedLookupExecutorProvider =
354+
new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor");
352355

353356
// here in the constructor we don't have the offloader scheduler yet
354357
this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
@@ -601,6 +604,7 @@ public CompletableFuture<Void> closeAsync() {
601604
brokerClientSharedExternalExecutorProvider.shutdownNow();
602605
brokerClientSharedInternalExecutorProvider.shutdownNow();
603606
brokerClientSharedScheduledExecutorProvider.shutdownNow();
607+
brokerClientSharedLookupExecutorProvider.shutdownNow();
604608
brokerClientSharedTimer.stop();
605609

606610
asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
@@ -1584,6 +1588,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
15841588
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
15851589
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
15861590
.scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
1591+
.lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
15871592
.build();
15881593
}
15891594

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java

+50-15
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import static java.lang.String.format;
2222
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
2323
import io.netty.buffer.ByteBuf;
24+
import io.netty.util.concurrent.DefaultThreadFactory;
2425
import java.net.InetSocketAddress;
2526
import java.net.URI;
2627
import java.util.Optional;
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Executors;
3032
import java.util.concurrent.ScheduledExecutorService;
3133
import java.util.concurrent.TimeUnit;
3234
import java.util.concurrent.atomic.AtomicLong;
@@ -55,37 +57,68 @@ public class BinaryProtoLookupService implements LookupService {
5557
private final PulsarClientImpl client;
5658
private final ServiceNameResolver serviceNameResolver;
5759
private final boolean useTls;
58-
private final ExecutorService executor;
60+
private final ExecutorService scheduleExecutor;
5961
private final String listenerName;
6062
private final int maxLookupRedirects;
63+
private final ExecutorService lookupPinnedExecutor;
64+
private final boolean createdLookupPinnedExecutor;
6165

6266
private final ConcurrentHashMap<TopicName, CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
6367
lookupInProgress = new ConcurrentHashMap<>();
6468

6569
private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
6670
partitionedMetadataInProgress = new ConcurrentHashMap<>();
6771

72+
/**
73+
* @deprecated use {@link
74+
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
75+
*/
76+
@Deprecated
77+
public BinaryProtoLookupService(PulsarClientImpl client,
78+
String serviceUrl,
79+
boolean useTls,
80+
ExecutorService scheduleExecutor)
81+
throws PulsarClientException {
82+
this(client, serviceUrl, null, useTls, scheduleExecutor);
83+
}
84+
85+
/**
86+
* @deprecated use {@link
87+
* #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead.
88+
*/
89+
@Deprecated
6890
public BinaryProtoLookupService(PulsarClientImpl client,
6991
String serviceUrl,
92+
String listenerName,
7093
boolean useTls,
71-
ExecutorService executor)
94+
ExecutorService scheduleExecutor)
7295
throws PulsarClientException {
73-
this(client, serviceUrl, null, useTls, executor);
96+
this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null);
7497
}
7598

7699
public BinaryProtoLookupService(PulsarClientImpl client,
77100
String serviceUrl,
78101
String listenerName,
79102
boolean useTls,
80-
ExecutorService executor)
103+
ExecutorService scheduleExecutor,
104+
ExecutorService lookupPinnedExecutor)
81105
throws PulsarClientException {
82106
this.client = client;
83107
this.useTls = useTls;
84-
this.executor = executor;
108+
this.scheduleExecutor = scheduleExecutor;
85109
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
86110
this.serviceNameResolver = new PulsarServiceNameResolver();
87111
this.listenerName = listenerName;
88112
updateServiceUrl(serviceUrl);
113+
114+
if (lookupPinnedExecutor == null) {
115+
this.createdLookupPinnedExecutor = true;
116+
this.lookupPinnedExecutor =
117+
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup"));
118+
} else {
119+
this.createdLookupPinnedExecutor = false;
120+
this.lookupPinnedExecutor = lookupPinnedExecutor;
121+
}
89122
}
90123

91124
@Override
@@ -153,7 +186,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
153186
return addressFuture;
154187
}
155188

156-
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
189+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
157190
long requestId = client.newRequestId();
158191
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId);
159192
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
@@ -218,7 +251,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
218251
}
219252
client.getCnxPool().releaseConnection(clientCnx);
220253
});
221-
}).exceptionally(connectionException -> {
254+
}, lookupPinnedExecutor).exceptionally(connectionException -> {
222255
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
223256
return null;
224257
});
@@ -230,7 +263,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
230263

231264
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();
232265

233-
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
266+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
234267
boolean finalAutoCreationEnabled = metadataAutoCreationEnabled;
235268
if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) {
236269
if (useFallbackForNonPIP344Brokers) {
@@ -269,7 +302,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
269302
}
270303
client.getCnxPool().releaseConnection(clientCnx);
271304
});
272-
}).exceptionally(connectionException -> {
305+
}, lookupPinnedExecutor).exceptionally(connectionException -> {
273306
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
274307
return null;
275308
});
@@ -291,7 +324,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
291324
return schemaFuture;
292325
}
293326
InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
294-
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
327+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
295328
long requestId = client.newRequestId();
296329
ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
297330
Optional.ofNullable(BytesSchemaVersion.of(version)));
@@ -305,7 +338,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
305338
}
306339
client.getCnxPool().releaseConnection(clientCnx);
307340
});
308-
}).exceptionally(ex -> {
341+
}, lookupPinnedExecutor).exceptionally(ex -> {
309342
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
310343
return null;
311344
});
@@ -348,7 +381,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
348381
Mode mode,
349382
String topicsPattern,
350383
String topicsHash) {
351-
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
384+
client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> {
352385
long requestId = client.newRequestId();
353386
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
354387
namespace.toString(), requestId, mode, topicsPattern, topicsHash);
@@ -365,7 +398,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
365398
}
366399
client.getCnxPool().releaseConnection(clientCnx);
367400
});
368-
}).exceptionally((e) -> {
401+
}, lookupPinnedExecutor).exceptionally((e) -> {
369402
long nextDelay = Math.min(backoff.next(), remainingTime.get());
370403
if (nextDelay <= 0) {
371404
getTopicsResultFuture.completeExceptionally(
@@ -375,7 +408,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
375408
return null;
376409
}
377410

378-
((ScheduledExecutorService) executor).schedule(() -> {
411+
((ScheduledExecutorService) scheduleExecutor).schedule(() -> {
379412
log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in"
380413
+ " {} ms", namespace, nextDelay);
381414
remainingTime.addAndGet(-nextDelay);
@@ -389,7 +422,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
389422

390423
@Override
391424
public void close() throws Exception {
392-
// no-op
425+
if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) {
426+
lookupPinnedExecutor.shutdown();
427+
}
393428
}
394429

395430
public static class LookupDataResult {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

+33-6
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public class PulsarClientImpl implements PulsarClient {
109109
private final boolean createdExecutorProviders;
110110

111111
private final boolean createdScheduledProviders;
112+
private final boolean createdLookupProviders;
112113
private LookupService lookup;
113114
private Map<String, LookupService> urlLookupMap = new ConcurrentHashMap<>();
114115
private final ConnectionPool cnxPool;
@@ -117,6 +118,7 @@ public class PulsarClientImpl implements PulsarClient {
117118
private boolean needStopTimer;
118119
private final ExecutorProvider externalExecutorProvider;
119120
private final ExecutorProvider internalExecutorProvider;
121+
private final ExecutorProvider lookupExecutorProvider;
120122

121123
private final ScheduledExecutorProvider scheduledExecutorProvider;
122124
private final boolean createdEventLoopGroup;
@@ -157,29 +159,40 @@ public SchemaInfoProvider load(String topicName) {
157159
private TransactionCoordinatorClientImpl tcClient;
158160

159161
public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
160-
this(conf, null, null, null, null, null, null);
162+
this(conf, null, null, null, null, null, null, null);
161163
}
162164

163165
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
164-
this(conf, eventLoopGroup, null, null, null, null, null);
166+
this(conf, eventLoopGroup, null, null, null, null, null, null);
165167
}
166168

167169
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
168170
throws PulsarClientException {
169-
this(conf, eventLoopGroup, cnxPool, null, null, null, null);
171+
this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
170172
}
171173

172174
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
173175
Timer timer)
174176
throws PulsarClientException {
175-
this(conf, eventLoopGroup, cnxPool, timer, null, null, null);
177+
this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
178+
}
179+
180+
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
181+
Timer timer, ExecutorProvider externalExecutorProvider,
182+
ExecutorProvider internalExecutorProvider,
183+
ScheduledExecutorProvider scheduledExecutorProvider)
184+
throws PulsarClientException {
185+
this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider,
186+
scheduledExecutorProvider, null);
176187
}
177188

178189
@Builder(builderClassName = "PulsarClientImplBuilder")
179190
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
180191
Timer timer, ExecutorProvider externalExecutorProvider,
181192
ExecutorProvider internalExecutorProvider,
182-
ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
193+
ScheduledExecutorProvider scheduledExecutorProvider,
194+
ExecutorProvider lookupExecutorProvider) throws PulsarClientException {
195+
183196
EventLoopGroup eventLoopGroupReference = null;
184197
ConnectionPool connectionPoolReference = null;
185198
try {
@@ -191,6 +204,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
191204
}
192205
this.createdExecutorProviders = externalExecutorProvider == null;
193206
this.createdScheduledProviders = scheduledExecutorProvider == null;
207+
this.createdLookupProviders = lookupExecutorProvider == null;
194208
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
195209
this.eventLoopGroup = eventLoopGroupReference;
196210
if (conf == null || isBlank(conf.getServiceUrl())) {
@@ -206,13 +220,16 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
206220
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
207221
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
208222
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
223+
this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider :
224+
new ExecutorProvider(1, "pulsar-client-lookup");
209225
this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
210226
new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
211227
if (conf.getServiceUrl().startsWith("http")) {
212228
lookup = new HttpLookupService(conf, this.eventLoopGroup);
213229
} else {
214230
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
215-
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
231+
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(),
232+
this.lookupExecutorProvider.getExecutor());
216233
}
217234
if (timer == null) {
218235
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
@@ -965,6 +982,16 @@ private void shutdownExecutors() throws PulsarClientException {
965982
pulsarClientException = PulsarClientException.unwrap(t);
966983
}
967984
}
985+
986+
if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) {
987+
try {
988+
lookupExecutorProvider.shutdownNow();
989+
} catch (Throwable t) {
990+
log.warn("Failed to shutdown lookupExecutorProvider", t);
991+
pulsarClientException = PulsarClientException.unwrap(t);
992+
}
993+
}
994+
968995
if (pulsarClientException != null) {
969996
throw pulsarClientException;
970997
}

0 commit comments

Comments
 (0)