From b9ae34a706b26884dcbb235b959d19cc1a6209d7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 17 Dec 2024 19:47:28 +0800 Subject: [PATCH] add test to reproduce the issues --- .../OneWayReplicatorDeduplicationTest.java | 265 +++++++++++++++++- 1 file changed, 263 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java index ce66e9f69dd569..a3e38595bf647d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java @@ -27,16 +27,23 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.vertx.core.impl.ConcurrentHashSet; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; @@ -53,6 +60,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.AuthData; @@ -64,6 +72,7 @@ import org.apache.pulsar.zookeeper.ZookeeperServerTest; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.stubbing.Answer; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -225,7 +234,7 @@ public ChannelHandlerContext ctx() { } final ChannelHandlerContext originalCtx = super.ctx; ChannelHandlerContext spyContext = spy(originalCtx); - doAnswer(invocation -> { + Answer injectedAnswer = invocation -> { // Do not repeat the messages re-sending, and clear the previous cached messages when // calling re-sending, to avoid publishing outs of order. for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { @@ -279,7 +288,11 @@ public ChannelHandlerContext ctx() { } else { return invocation.callRealMethod(); } - }).when(spyContext).write(any(), any(ChannelPromise.class)); + }; + doAnswer(injectedAnswer).when(spyContext).write(any()); + doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class)); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any()); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class)); return spyContext; } }); @@ -490,4 +503,252 @@ public ChannelHandlerContext ctx() { admin2.topics().unload(topicName); admin2.topics().delete(topicName); } + + @DataProvider(name = "enabledDeduplication") + public Object[][] enabledDeduplication() { + return new Object[][] { + {true}, + {false} + }; + } + + /*** + * To reproduce the issue that replication loss message if enabled deduplication + * 1. Publishing in the source cluster + * 1-1. Producer-1 send 2 messages: M1, M2 + * 1-2. Producer-2 send 2 messages: M3, M4 + * 2. Replicate messages to the remote cluster + * 2-1. Copies M1 and M2 + * 2-2. Repeatedly copies M1 and M2. and copies M3 and M4. + * 2-2-1. After repeatedly copies M1 and M2, the network broke. + * 3. After a topic unloading. + * 3-1. The replicator will start after the topic is loaded yp. + * 3-2. The client will create a new connection. + * 4. Verify: All 4 messages are copied to the remote cluster. + */ + @Test(timeOut = 360 * 1000, dataProvider = "enabledDeduplication") + public void testDeduplicationNotLostMessage(boolean enabledDeduplication) throws Exception { + waitInternalClientCreated(); + + /** + * step-2: Inject a mechanism that makes the client connect broke after repeatedly copied M1 and M2. + */ + final List duplicatedMsgs = new ArrayList<>(); + final int repeatMsgIndex = 2; + AtomicInteger msgSent = new AtomicInteger(0); + ConcurrentHashSet injectedChannel = new ConcurrentHashSet<>(); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + + @Override + public ChannelHandlerContext ctx() { + final ChannelHandlerContext originalCtx = super.ctx; + ChannelHandlerContext spyContext = spy(originalCtx); + Answer injectedAnswer = invocation -> { + // Do not repeat the messages re-sending, and clear the previous cached messages when + // calling re-sending, to avoid publishing outs of order. + for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { + if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom") + || stackTraceElement.toString().contains("resendMessages")) { + duplicatedMsgs.clear(); + return invocation.callRealMethod(); + } + } + + Object data = invocation.getArguments()[0]; + if (true && !(data instanceof ByteBufPair)) { + return invocation.callRealMethod(); + } + // Repeatedly send every message. + ByteBufPair byteBufPair = (ByteBufPair) data; + ByteBuf buf1 = byteBufPair.getFirst(); + ByteBuf buf2 = byteBufPair.getSecond(); + int bufferIndex1 = buf1.readerIndex(); + int bufferIndex2 = buf2.readerIndex(); + // Skip totalSize. + buf1.readInt(); + int cmdSize = buf1.readInt(); + BaseCommand cmd = new BaseCommand(); + cmd.parseFrom(buf1, cmdSize); + buf1.readerIndex(bufferIndex1); + if (cmd.getType().equals(BaseCommand.Type.SEND)) { + synchronized (duplicatedMsgs) { + if (duplicatedMsgs.isEmpty() && msgSent.get() == repeatMsgIndex) { + return null; + } + if (msgSent.get() == repeatMsgIndex) { + for (ByteBufPair bufferPair : duplicatedMsgs) { + originalCtx.channel().write(bufferPair, originalCtx.voidPromise()); + originalCtx.channel().flush(); + } + duplicatedMsgs.clear(); + return null; + } + } + ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf1.readableBytes()); + buf1.readBytes(newBuffer1); + buf1.readerIndex(bufferIndex1); + ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf2.readableBytes()); + buf2.readBytes(newBuffer2); + buf2.readerIndex(bufferIndex2); + synchronized (duplicatedMsgs) { + if (newBuffer2.readableBytes() > 0 && msgSent.incrementAndGet() <= repeatMsgIndex) { + duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2)); + } + } + return invocation.callRealMethod(); + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(injectedAnswer).when(spyContext).write(any()); + doAnswer(injectedAnswer).when(spyContext).write(any(), any(ChannelPromise.class)); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any()); + doAnswer(injectedAnswer).when(spyContext).writeAndFlush(any(), any(ChannelPromise.class)); + injectedChannel.add(originalCtx.channel()); + return spyContext; + } + }); + + // Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) tp2.getManagedLedger(); + if (enabledDeduplication) { + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), + Boolean.TRUE); + }); + } + // Let broker persist messages one by one, in other words, it starts to persist the next message after the + // previous has been written into BKs. + PersistentTopic spyTp2 = spy(tp2); + doAnswer(invocation -> { + try { + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + assertEquals(ml2.getPendingAddEntriesCount(), 0); + }); + } catch (Throwable throwable) { + // Ignore this timeout error. + } + return invocation.callRealMethod(); + }).when(spyTp2).publishMessage(any(ByteBuf.class), any(Topic.PublishContext.class)); + CompletableFuture> originalTp2 = pulsar2.getBrokerService().getTopics().put(tp2.getName(), + CompletableFuture.completedFuture(Optional.of(spyTp2))); + + /** + * Step-1: Publishes messages in the source cluster and start replication, + */ + ProducerImpl p1 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p1").create(); + ProducerImpl p2 = (ProducerImpl) client1.newProducer().topic(topicName).producerName("p2").create(); + p1.send("1".toString().getBytes(StandardCharsets.UTF_8)); + p1.send("2".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("3".toString().getBytes(StandardCharsets.UTF_8)); + p2.send("4".toString().getBytes(StandardCharsets.UTF_8)); + + // Enable replication and wait the task to be finished, it should not finish if no bug. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + try { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + } catch (Throwable t) { + // Ignore the error. + } + + /** + * Step-3: remove the injections, unload topics and rebuild connections of the replicator. + */ + taskToClearInjection.run(); + pulsar2.getBrokerService().getTopics().put(tp2.getName(), originalTp2); + admin1.topics().unload(topicName); + admin2.topics().unload(topicName); + for (Channel channel : injectedChannel) { + channel.close(); + } + waitReplicatorStarted(topicName); + PersistentTopic tp12 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp22 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + /** + * Verify: All 4 messages are copied to the remote cluster. + */ + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer().topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(new String(msg.getData(), StandardCharsets.UTF_8)); + consumer.acknowledgeAsync(msg); + } + + log.info("received msgs: {}", msgReceived); + assertTrue(msgReceived.contains("1")); + assertTrue(msgReceived.contains("2")); + assertTrue(msgReceived.contains("3")); + assertTrue(msgReceived.contains("4")); + if (enabledDeduplication) { + assertEquals(msgReceived, Arrays.asList("1", "2", "3", "4")); + } + + // cleanup. + consumer.close(); + p1.close(); + p2.close(); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp12.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().unload(topicName); + admin1.topics().delete(topicName); + admin2.topics().unload(topicName); + admin2.topics().delete(topicName); + } }