Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Partitioned shadow topic not work properly #22797

Merged
merged 9 commits into from
Jun 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,23 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
// update remote cluster
return namespaceResources().getPoliciesAsync(namespaceName)
.thenCompose(policies -> {
if (!policies.isPresent()) {
if (policies.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
// Combine namespace level policies and topic level policies.
Set<String> replicationClusters = policies.get().replication_clusters;
TopicPolicies topicPolicies =
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
if (topicPolicies != null) {
replicationClusters = new HashSet<>(topicPolicies.getReplicationClusters());
if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) {
replicationClusters = new HashSet<>();
}
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved

CompletableFuture<Void> checkShadowTopics = checkShadowTopicsPartitions(topicPolicies,
expectPartitions);

// Do check replicated clusters.
if (replicationClusters.size() == 0) {
return CompletableFuture.completedFuture(null);
if (replicationClusters.isEmpty()) {
return checkShadowTopics;
}
boolean containsCurrentCluster =
replicationClusters.contains(pulsar().getConfig().getClusterName());
Expand All @@ -474,7 +478,7 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
}
if (replicationClusters.size() == 1) {
// The replication clusters just has the current cluster itself.
return CompletableFuture.completedFuture(null);
return checkShadowTopics;
}
// Do sync operation to other clusters.
List<CompletableFuture<Void>> futures = replicationClusters.stream()
Expand All @@ -496,6 +500,35 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
});
}

private CompletableFuture<Void> checkShadowTopicsPartitions(TopicPolicies topicPolicies, int expectPartitions) {
return Optional.ofNullable(topicPolicies)
.map(TopicPolicies::getShadowTopics)
.filter(shadowTopics -> !shadowTopics.isEmpty())
.map(shadowTopics -> {
List<CompletableFuture<Void>> futures = shadowTopics.stream()
.map(shadowTopic -> getPartitionedTopicMetadataAsync(
TopicName.get(shadowTopic), false, false)
.thenCompose(metadata -> {
if (metadata.partitions < expectPartitions) {
return CompletableFuture.<Void>failedFuture(
new RestException(400, String.format(
"The shadow topic %s has fewer partitions than the "
+ "current topic. Currently, it only contains %d "
+ "partitions. Please expand the partitions of the shadow"
+ " topic %s first.",
shadowTopic, metadata.partitions,
shadowTopic
))
);
}
return CompletableFuture.completedFuture(null);
}))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures);
})
.orElse(CompletableFuture.completedFuture(null));
}

protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null && metadata.partitions > 0) {
Expand Down Expand Up @@ -5360,6 +5393,11 @@ protected CompletableFuture<Void> internalSetShadowTopic(List<String> shadowTopi
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Cannot specify empty shadow topics, please use remove command instead."));
}
if (shadowTopics.stream().map(TopicName::get).anyMatch(TopicName::isPartitioned)) {
return CompletableFuture.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Couldn't set a partition of a topic as the shadow topic."));
}

return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> validateShadowTopics(shadowTopics))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,12 @@ protected CompletableFuture<Void> addShadowReplicationCluster(String shadowTopic
.thenAccept(replicationClient -> {
Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> {
try {
return new ShadowReplicator(shadowTopic, PersistentTopic.this, cursor, brokerService,
TopicName sourceTopicName = TopicName.get(getName());
String shadow = shadowTopic;
if (sourceTopicName.isPartitioned()) {
shadow += "-partition-" + sourceTopicName.getPartitionIndex();
}
return new ShadowReplicator(shadow, PersistentTopic.this, cursor, brokerService,
(PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.Lists;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
Expand All @@ -29,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -113,6 +116,52 @@ public void testPartitionedShadowTopicSetup() throws Exception {
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopicPartition);
}

@Test
public void testPartitionedShadowTopicProduceAndConsume() throws Exception {
String sourceTopic = newShadowSourceTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createPartitionedTopic(sourceTopic, 3);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);

// We should not allow to set with the shadow partition topic which contains `-partition-n`.
Assert.assertThrows(PulsarAdminException.class, ()-> admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic+"-partition-0")));
admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic));

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("test")
.subscribe();

for (int i = 0; i < 10; i++) {
producer.send("msg-" + i);
}

Set<String> set = new HashSet<>();
for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive();
set.add(msg.getValue());
}
for (int i = 0; i < 10; i++) {
Assert.assertTrue(set.contains("msg-" + i));
}
}

@Test
public void testPartitionedShadowTopicExpansion() throws Exception {
String sourceTopic = newShadowSourceTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createPartitionedTopic(sourceTopic, 1);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic));

Assert.assertThrows(PulsarAdminException.class, () -> admin.topics().updatePartitionedTopic(sourceTopic, 3));

admin.topics().updatePartitionedTopic(shadowTopic, 3);

admin.topics().updatePartitionedTopic(sourceTopic, 3);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testShadowTopicNotWritable() throws Exception {
String sourceTopic = newShadowSourceTopicName();
Expand Down
Loading