Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] DO NOT MERGE #288

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
64ead41
[improve][ci] Migrate from Gradle Enterprise to Develocity (#22880)
lhotari Jun 8, 2024
65576f9
[fix] Remove blocking calls from BookieRackAffinityMapping (#22846)
merlimat Jun 5, 2024
4e98aa9
[fix][cli] Fix Pulsar standalone shutdown - bkCluster wasn't closed (…
lhotari Jun 7, 2024
40ef1ae
[fix][cli] Fix Pulsar standalone "--wipe-data" (#22885)
lhotari Jun 10, 2024
b984e6d
[improve] Upgrade IPAddress to 5.5.0 (#22886)
merlimat Jun 10, 2024
5c834f2
[fix][misc] Topic name from persistence name should decode local name…
Shawyeok Jun 11, 2024
0337bf6
[improve][broker] Optimize PersistentTopic.getLastDispatchablePositio…
dao-jun Jun 11, 2024
f17a6b2
[improve][broker] Include runtime dependencies in server distribution…
dragosvictor Jan 31, 2024
fe11f9a
[fix][broker][branch-3.0] The topic might reference a closed ledger (…
shibd Jun 13, 2024
e2c4f32
[fix][cli] Fix the pulsar-daemon parameter passthrough syntax (#22905)
coderzc Jun 14, 2024
6091cec
[improve][broker] Include runtime dependencies in server distribution…
dragosvictor Jan 31, 2024
a7bcb02
[improve][misc] Include native epoll library for Netty for arm64 (#22…
lhotari Mar 21, 2024
28fe36c
[fix][broker] Fix topic status for oldestBacklogMessageAgeSeconds con…
shibd Jun 14, 2024
568be99
fix: cannot find symbol from cherry-pick 73b50e
shibd Jun 18, 2024
4fab68d
[fix] [client] PIP-344 Do not create partitioned metadata when callin…
poorbarcode May 23, 2024
4c1f8e2
[improve] [client] PIP-344 support feature flag supportsGetPartitione…
poorbarcode Jun 6, 2024
63e2dcc
[fix] [broker] response not-found error if topic does not exist when …
poorbarcode Jun 17, 2024
134621c
[fix][client] fix producer/consumer perform lookup for migrated topic…
rdhabalia Oct 14, 2023
5775898
[fix][test] Fix thread leaks in Managed Ledger tests and remove dupli…
lhotari Oct 24, 2023
d733598
[fix] [broker] Messages lost on the remote cluster when using topic l…
poorbarcode Jun 19, 2024
3dd8e63
[fix][fn] Enable optimized Netty direct byte buffer support for Pulsa…
lhotari Jun 17, 2024
71e6d02
[fix] [proxy] Add missing parameter in newPartitionMetadataRequest call
nikhil-ctds Jun 24, 2024
ab293a2
[fix][test] Fix TableViewBuilderImplTest NPE and infinite loop (#22924)
lhotari Jun 17, 2024
0d08b5b
[fix] [client] Fix resource leak in Pulsar Client since HttpLookupSer…
poorbarcode Jun 18, 2024
00a6eb0
[fix] [broker] broker log a full thread dump when a deadlock is detec…
yyj8 Jun 20, 2024
befa38d
[fix][fn] Support compression type and crypto config for all producer…
lhotari Jun 20, 2024
c373025
[fix][broker] Check the markDeletePosition and calculate the backlog …
nodece Jun 20, 2024
47caa79
[improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by Roarin…
dao-jun Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[fix][client] fix producer/consumer perform lookup for migrated topic (
…apache#21356)

Co-authored-by: Rajan Dhabalia <rdhabalia@oath.com>
(cherry picked from commit d09642c)
(cherry picked from commit d4f3c59)
  • Loading branch information
rdhabalia authored and nikhil-ctds committed Jun 24, 2024
commit 134621cd5da94cc85bf985f37583af9b93b72331
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,16 @@ protected void grabCnx() {
try {
CompletableFuture<ClientCnx> cnxFuture;
if (state.redirectedClusterURI != null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
state.redirectedClusterURI.getPort());
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
if (state.topic == null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
state.redirectedClusterURI.getPort());
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
} else {
// once, client receives redirection url, client has to perform lookup on migrated
// cluster to find the broker that owns the topic and then create connection.
// below method, performs the lookup for a given topic and then creates connection
cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
}
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,4 @@ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicNam
*/
CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
String topicPattern, String topicsHash);

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class PulsarClientImpl implements PulsarClient {

private final boolean createdScheduledProviders;
private LookupService lookup;
private Map<String, LookupService> urlLookupMap = new ConcurrentHashMap<>();
private final ConnectionPool cnxPool;
@Getter
private final Timer timer;
Expand Down Expand Up @@ -962,6 +964,23 @@ public CompletableFuture<ClientCnx> getConnection(final String topic) {
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}

public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
TopicName topicName = TopicName.get(topic);
return getLookup(url).getBroker(topicName)
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}

public LookupService getLookup(String serviceUrl) {
return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
try {
return createLookup(serviceUrl);
} catch (PulsarClientException e) {
log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
throw new IllegalStateException("Failed to update url " + url);
}
});
}

public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
if (!(lookup instanceof BinaryProtoLookupService)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL(
Expand Down Expand Up @@ -1020,10 +1039,14 @@ public LookupService getLookup() {
}

public void reloadLookUp() throws PulsarClientException {
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
lookup = createLookup(conf.getServiceUrl());
}

public LookupService createLookup(String url) throws PulsarClientException {
if (url.startsWith("http")) {
return new HttpLookupService(conf, eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
}
Expand Down