Skip to content

Commit

Permalink
add test to reproduce the issues
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Dec 17, 2024
1 parent 21908df commit b9ae34a
Showing 1 changed file with 263 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.vertx.core.impl.ConcurrentHashSet;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -53,6 +60,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.AuthData;
Expand All @@ -64,6 +72,7 @@
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -225,7 +234,7 @@ public ChannelHandlerContext ctx() {
}
final ChannelHandlerContext originalCtx = super.ctx;
ChannelHandlerContext spyContext = spy(originalCtx);
doAnswer(invocation -> {
Answer injectedAnswer = invocation -> {
// Do not repeat the messages re-sending, and clear the previous cached messages when
// calling re-sending, to avoid publishing outs of order.
for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) {
Expand Down Expand Up @@ -279,7 +288,11 @@ public ChannelHandlerContext ctx() {
} else {
return invocation.callRealMethod();
}
}).when(spyContext).write(any(), any(ChannelPromise.class));
};
doAnswer(injectedAnswer).when(spyContext).write(any());
doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class));
doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any());
doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class));
return spyContext;
}
});
Expand Down Expand Up @@ -490,4 +503,252 @@ public ChannelHandlerContext ctx() {
admin2.topics().unload(topicName);
admin2.topics().delete(topicName);
}

@DataProvider(name = "enabledDeduplication")
public Object[][] enabledDeduplication() {
return new Object[][] {
{true},
{false}
};
}

/***
* To reproduce the issue that replication loss message if enabled deduplication
* 1. Publishing in the source cluster
* 1-1. Producer-1 send 2 messages: M1, M2
* 1-2. Producer-2 send 2 messages: M3, M4
* 2. Replicate messages to the remote cluster
* 2-1. Copies M1 and M2
* 2-2. Repeatedly copies M1 and M2. and copies M3 and M4.
* 2-2-1. After repeatedly copies M1 and M2, the network broke.
* 3. After a topic unloading.
* 3-1. The replicator will start after the topic is loaded yp.
* 3-2. The client will create a new connection.
* 4. Verify: All 4 messages are copied to the remote cluster.
*/
@Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication")
public void testDeduplicationNotLostMessage(boolean enabledDeduplication) throws Exception {
waitInternalClientCreated();

/**
* step-2: Inject a mechanism that makes the client connect broke after repeatedly copied M1 and M2.
*/
final List<ByteBufPair> duplicatedMsgs = new ArrayList<>();
final int repeatMsgIndex = 2;
AtomicInteger msgSent = new AtomicInteger(0);
ConcurrentHashSet<Channel> injectedChannel = new ConcurrentHashSet<>();
Runnable taskToClearInjection = injectReplicatorClientCnx(
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {

@Override
public ChannelHandlerContext ctx() {
final ChannelHandlerContext originalCtx = super.ctx;
ChannelHandlerContext spyContext = spy(originalCtx);
Answer injectedAnswer = invocation -> {
// Do not repeat the messages re-sending, and clear the previous cached messages when
// calling re-sending, to avoid publishing outs of order.
for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) {
if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom")
|| stackTraceElement.toString().contains("resendMessages")) {
duplicatedMsgs.clear();
return invocation.callRealMethod();
}
}

Object data = invocation.getArguments()[0];
if (true && !(data instanceof ByteBufPair)) {
return invocation.callRealMethod();
}
// Repeatedly send every message.
ByteBufPair byteBufPair = (ByteBufPair) data;
ByteBuf buf1 = byteBufPair.getFirst();
ByteBuf buf2 = byteBufPair.getSecond();
int bufferIndex1 = buf1.readerIndex();
int bufferIndex2 = buf2.readerIndex();
// Skip totalSize.
buf1.readInt();
int cmdSize = buf1.readInt();
BaseCommand cmd = new BaseCommand();
cmd.parseFrom(buf1, cmdSize);
buf1.readerIndex(bufferIndex1);
if (cmd.getType().equals(BaseCommand.Type.SEND)) {
synchronized (duplicatedMsgs) {
if (duplicatedMsgs.isEmpty() && msgSent.get() == repeatMsgIndex) {
return null;
}
if (msgSent.get() == repeatMsgIndex) {
for (ByteBufPair bufferPair : duplicatedMsgs) {
originalCtx.channel().write(bufferPair, originalCtx.voidPromise());
originalCtx.channel().flush();
}
duplicatedMsgs.clear();
return null;
}
}
ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(
buf1.readableBytes());
buf1.readBytes(newBuffer1);
buf1.readerIndex(bufferIndex1);
ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(
buf2.readableBytes());
buf2.readBytes(newBuffer2);
buf2.readerIndex(bufferIndex2);
synchronized (duplicatedMsgs) {
if (newBuffer2.readableBytes() > 0 && msgSent.incrementAndGet() <= repeatMsgIndex) {
duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2));
}
}
return invocation.callRealMethod();
} else {
return invocation.callRealMethod();
}
};
doAnswer(injectedAnswer).when(spyContext).write(any());
doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class));
doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any());
doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class));
injectedChannel.add(originalCtx.channel());
return spyContext;
}
});

// Create topics and enable deduplication.
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
admin1.topics().createNonPartitionedTopic(topicName);
admin1.topics().createSubscription(topicName, "s1", MessageId.earliest);
admin2.topics().createNonPartitionedTopic(topicName);
admin2.topics().createSubscription(topicName, "s1", MessageId.earliest);
PersistentTopic tp1 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
PersistentTopic tp2 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml2 = (ManagedLedgerImpl) tp2.getManagedLedger();
if (enabledDeduplication) {
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
PersistentTopic persistentTopic1 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
admin1.topicPolicies().setDeduplicationStatus(topicName, true);
admin2.topicPolicies().setDeduplicationStatus(topicName, true);
assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(),
Boolean.TRUE);
assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(),
Boolean.TRUE);
});
}
// Let broker persist messages one by one, in other words, it starts to persist the next message after the
// previous has been written into BKs.
PersistentTopic spyTp2 = spy(tp2);
doAnswer(invocation -> {
try {
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
assertEquals(ml2.getPendingAddEntriesCount(), 0);
});
} catch (Throwable throwable) {
// Ignore this timeout error.
}
return invocation.callRealMethod();
}).when(spyTp2).publishMessage(any(ByteBuf.class), any(Topic.PublishContext.class));
CompletableFuture<Optional<Topic>> originalTp2 = pulsar2.getBrokerService().getTopics().put(tp2.getName(),
CompletableFuture.completedFuture(Optional.of(spyTp2)));

/**
* Step-1: Publishes messages in the source cluster and start replication,
*/
ProducerImpl p1 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p1").create();
ProducerImpl p2 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p2").create();
p1.send("1".toString().getBytes(StandardCharsets.UTF_8));
p1.send("2".toString().getBytes(StandardCharsets.UTF_8));
p2.send("3".toString().getBytes(StandardCharsets.UTF_8));
p2.send("4".toString().getBytes(StandardCharsets.UTF_8));

// Enable replication and wait the task to be finished, it should not finish if no bug.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
waitReplicatorStarted(topicName);
try {
Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) {
if (cursor.getName().equals("pulsar.repl.r2")) {
long replBacklog = cursor.getNumberOfEntriesInBacklog(true);
log.info("repl backlog: {}", replBacklog);
assertEquals(replBacklog, 0);
}
}
});
} catch (Throwable t) {
// Ignore the error.
}

/**
* Step-3: remove the injections, unload topics and rebuild connections of the replicator.
*/
taskToClearInjection.run();
pulsar2.getBrokerService().getTopics().put(tp2.getName(), originalTp2);
admin1.topics().unload(topicName);
admin2.topics().unload(topicName);
for (Channel channel : injectedChannel) {
channel.close();
}
waitReplicatorStarted(topicName);
PersistentTopic tp12 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
PersistentTopic tp22 =
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) {
if (cursor.getName().equals("pulsar.repl.r2")) {
long replBacklog = cursor.getNumberOfEntriesInBacklog(true);
log.info("repl backlog: {}", replBacklog);
assertEquals(replBacklog, 0);
}
}
});

/**
* Verify: All 4 messages are copied to the remote cluster.
*/
List<String> msgReceived = new ArrayList<>();
Consumer consumer = client2.newConsumer().topic(topicName)
.subscriptionName("s1").subscribe();
while (true) {
Message msg = consumer.receive(10, TimeUnit.SECONDS);
if (msg == null) {
break;
}
MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId();
log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"),
messageIdAdv.getLedgerId(), messageIdAdv.getEntryId());
msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8));
consumer.acknowledgeAsync(msg);
}

log.info("received msgs: {}", msgReceived);
assertTrue(msgReceived.contains("1"));
assertTrue(msgReceived.contains("2"));
assertTrue(msgReceived.contains("3"));
assertTrue(msgReceived.contains("4"));
if (enabledDeduplication) {
assertEquals(msgReceived, Arrays.asList("1", "2", "3", "4"));
}

// cleanup.
consumer.close();
p1.close();
p2.close();
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(topicName);
Awaitility.await().until(() -> {
for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) {
if (cursor.getName().equals("pulsar.repl.r2")) {
return false;
}
}
return true;
});
admin1.topics().unload(topicName);
admin1.topics().delete(topicName);
admin2.topics().unload(topicName);
admin2.topics().delete(topicName);
}
}

0 comments on commit b9ae34a

Please sign in to comment.