Skip to content

Commit 949214c

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[fix][broker] Fix stuck when enable topic level replication and build remote admin fails (apache#23028)
(cherry picked from commit 88ebe78) (cherry picked from commit 16da5f7)
1 parent 4fb5f48 commit 949214c

File tree

2 files changed

+49
-5
lines changed

2 files changed

+49
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
4949
import org.apache.pulsar.broker.web.PulsarWebResource;
5050
import org.apache.pulsar.broker.web.RestException;
51+
import org.apache.pulsar.client.admin.PulsarAdmin;
5152
import org.apache.pulsar.client.admin.PulsarAdminException;
5253
import org.apache.pulsar.client.admin.internal.TopicsImpl;
5354
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -623,7 +624,7 @@ private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int
623624
});
624625
}
625626

626-
protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground(
627+
protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground (
627628
Set<String> clusters, int numPartitions) {
628629
final String shortTopicName = topicName.getPartitionedTopicName();
629630
Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>();
@@ -642,9 +643,17 @@ protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToR
642643
createRemoteTopicFuture.completeExceptionally(new RestException(ex1));
643644
return;
644645
}
646+
PulsarAdmin remotePulsarAdmin;
647+
try {
648+
remotePulsarAdmin = pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData);
649+
} catch (Exception ex) {
650+
log.error("[{}] [{}] An un-expected error occurs when trying to create remote pulsar admin for"
651+
+ " cluster {}", clientAppId(), topicName, cluster, ex);
652+
createRemoteTopicFuture.completeExceptionally(new RestException(ex));
653+
return;
654+
}
645655
// Get cluster data success.
646-
TopicsImpl topics =
647-
(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics();
656+
TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics();
648657
topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null)
649658
.whenComplete((ignore, ex2) -> {
650659
if (ex2 == null) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

+37-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import java.time.Duration;
3737
import java.util.Arrays;
3838
import java.util.Collections;
39+
import java.util.HashSet;
3940
import java.util.Optional;
41+
import java.util.Set;
4042
import java.util.UUID;
4143
import java.util.concurrent.CompletableFuture;
4244
import java.util.concurrent.CountDownLatch;
@@ -55,6 +57,7 @@
5557
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
5658
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
5759
import org.apache.pulsar.broker.BrokerTestUtil;
60+
import org.apache.pulsar.broker.resources.ClusterResources;
5861
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
5962
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
6063
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -70,11 +73,13 @@
7073
import org.apache.pulsar.client.impl.ProducerImpl;
7174
import org.apache.pulsar.client.impl.PulsarClientImpl;
7275
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
76+
import org.apache.pulsar.common.naming.TopicName;
77+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
78+
import org.apache.pulsar.common.policies.data.ClusterData;
7379
import org.apache.pulsar.common.policies.data.RetentionPolicies;
80+
import org.apache.pulsar.common.policies.data.TenantInfo;
7481
import org.apache.pulsar.common.policies.data.TopicStats;
7582
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
76-
import org.apache.pulsar.common.naming.TopicName;
77-
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
7883
import org.apache.pulsar.common.util.FutureUtil;
7984
import org.awaitility.Awaitility;
8085
import org.awaitility.reflect.WhiteboxImpl;
@@ -915,6 +920,36 @@ protected void disableReplication(String topic) throws Exception {
915920
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
916921
}
917922

923+
@Test(timeOut = 30 * 1000)
924+
public void testCreateRemoteAdminFailed() throws Exception {
925+
final TenantInfo tenantInfo = admin1.tenants().getTenantInfo(defaultTenant);
926+
final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
927+
final String randomClusterName = "c_" + UUID.randomUUID().toString().replace("-", "");
928+
final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp");
929+
admin1.namespaces().createNamespace(ns1);
930+
admin1.topics().createPartitionedTopic(topic, 2);
931+
932+
// Inject a wrong cluster data which with empty fields.
933+
ClusterResources clusterResources = broker1.getPulsar().getPulsarResources().getClusterResources();
934+
clusterResources.createCluster(randomClusterName, ClusterData.builder().build());
935+
Set<String> allowedClusters = new HashSet<>(tenantInfo.getAllowedClusters());
936+
allowedClusters.add(randomClusterName);
937+
admin1.tenants().updateTenant(defaultTenant, TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles())
938+
.allowedClusters(allowedClusters).build());
939+
940+
// Verify.
941+
try {
942+
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, randomClusterName));
943+
fail("Expected a error due to empty fields");
944+
} catch (Exception ex) {
945+
// Expected an error.
946+
}
947+
948+
// cleanup.
949+
admin1.topics().deletePartitionedTopic(topic);
950+
admin1.tenants().updateTenant(defaultTenant, tenantInfo);
951+
}
952+
918953
@Test
919954
public void testConfigReplicationStartAt() throws Exception {
920955
// Initialize.

0 commit comments

Comments
 (0)