From 03a4cfe6c051ca01b339f825e328d829ba9d2532 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov <8622884+dlg99@users.noreply.github.com> Date: Thu, 15 Aug 2024 13:01:29 -0700 Subject: [PATCH] fix: DLQ to handle bytes key properly (#306) redo of https://github.com/apache/pulsar/pull/23172 on LS 2.10 --- .../client/api/DeadLetterTopicTest.java | 59 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 19 ++++-- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index cb9a4c3c104c7..f40f013f5e016 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -125,6 +125,65 @@ public void testDeadLetterTopicWithMessageKey() throws Exception { consumer.close(); } + @Test + public void testDeadLetterTopicWithBinaryMessageKey() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 1; + + final int sendMessages = 100; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + byte[] key = new byte[] {1, 2, 3, 4}; + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .keyBytes(key) + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .send(); + } + + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + assertEquals(message.getKeyBytes(), key); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + } @Test(groups = "quarantine") public void testDeadLetterTopic() throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4a7fef11af49b..fd85a78ea6376 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -657,6 +657,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())) .value(retryMessage.getData()) .properties(propertiesMap); + copyMessageKeyIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { result.complete(null); @@ -680,9 +681,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a if (delayTime > 0) { typedMessageBuilderNew.deliverAfter(delayTime, unit); } - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } + copyMessageKeyIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) .thenAccept(v -> result.complete(null)) @@ -707,6 +706,16 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a return result; } + private static void copyMessageKeyIfNeeded(Message message, TypedMessageBuilder typedMessageBuilderNew) { + if (message.hasKey()) { + if (message.hasBase64EncodedKey()) { + typedMessageBuilderNew.keyBytes(message.getKeyBytes()); + } else { + typedMessageBuilderNew.key(message.getKey()); + } + } + } + private SortedMap getPropertiesMap(Message message, String originMessageIdStr, String originTopicNameStr) { @@ -2033,9 +2042,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdImpl messageId) producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) .value(message.getData()) .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } + copyMessageKeyIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenAccept(messageIdInDLQ -> { possibleSendToDeadLetterTopicMessages.remove(finalMessageId);