Skip to content

Commit 531b5fe

Browse files
lhotarinikhil-ctds
authored andcommitted
[fix][client] Prevent retry topic and dead letter topic producer leaks when sending of message fails (apache#23824)
(cherry picked from commit 04e89fe) (cherry picked from commit b6beb90)
1 parent 0a18e95 commit 531b5fe

File tree

5 files changed

+416
-154
lines changed

5 files changed

+416
-154
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java

+93
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.testng.Assert.assertEquals;
2223
import static org.testng.Assert.assertNotNull;
2324
import static org.testng.Assert.assertNull;
@@ -39,9 +40,11 @@
3940
import lombok.Cleanup;
4041
import lombok.Data;
4142
import org.apache.avro.reflect.Nullable;
43+
import org.apache.pulsar.broker.BrokerTestUtil;
4244
import org.apache.pulsar.client.api.schema.GenericRecord;
4345
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
4446
import org.apache.pulsar.client.util.RetryMessageUtil;
47+
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
4548
import org.awaitility.Awaitility;
4649
import org.slf4j.Logger;
4750
import org.slf4j.LoggerFactory;
@@ -1019,4 +1022,94 @@ public void testDeadLetterPolicyDeserialize() throws Exception {
10191022
consumerBuilder.loadConf(config);
10201023
assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(), policy);
10211024
}
1025+
1026+
@Data
1027+
static class Payload {
1028+
String number;
1029+
1030+
public Payload() {
1031+
1032+
}
1033+
1034+
public Payload(String number) {
1035+
this.number = number;
1036+
}
1037+
}
1038+
1039+
@Data
1040+
static class PayloadIncompatible {
1041+
long number;
1042+
1043+
public PayloadIncompatible() {
1044+
1045+
}
1046+
1047+
public PayloadIncompatible(long number) {
1048+
this.number = number;
1049+
}
1050+
}
1051+
1052+
// reproduce issue reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
1053+
@Test
1054+
public void testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
1055+
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
1056+
admin.namespaces().createNamespace(namespace);
1057+
// don't enforce schema validation
1058+
admin.namespaces().setSchemaValidationEnforced(namespace, false);
1059+
// set schema compatibility strategy to always compatible
1060+
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
1061+
1062+
Schema<Payload> schema = Schema.AVRO(Payload.class);
1063+
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(PayloadIncompatible.class);
1064+
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
1065+
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
1066+
String dlqTopic = topic + "-DLQ";
1067+
1068+
// create topics
1069+
admin.topics().createNonPartitionedTopic(topic);
1070+
admin.topics().createNonPartitionedTopic(dlqTopic);
1071+
1072+
AtomicInteger nackCounter = new AtomicInteger(0);
1073+
Consumer<Payload> payloadConsumer = null;
1074+
try {
1075+
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
1076+
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
1077+
.ackTimeout(1, TimeUnit.SECONDS)
1078+
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
1079+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build())
1080+
.messageListener((c, msg) -> {
1081+
if (nackCounter.incrementAndGet() < 10) {
1082+
c.negativeAcknowledge(msg);
1083+
}
1084+
}).subscribe();
1085+
1086+
// send a message to the topic with the incompatible schema
1087+
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
1088+
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
1089+
.create()) {
1090+
producer.send(payloadIncompatible);
1091+
}
1092+
1093+
Thread.sleep(2000L);
1094+
1095+
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
1096+
.describedAs("producer count of dlq topic %s should be <= 1 so that it doesn't leak producers",
1097+
dlqTopic)
1098+
.isLessThanOrEqualTo(1);
1099+
1100+
} finally {
1101+
if (payloadConsumer != null) {
1102+
try {
1103+
payloadConsumer.close();
1104+
} catch (PulsarClientException e) {
1105+
// ignore
1106+
}
1107+
}
1108+
}
1109+
1110+
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
1111+
.describedAs("producer count of dlq topic %s should be 0 here",
1112+
dlqTopic)
1113+
.isEqualTo(0);
1114+
}
10221115
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

+120-46
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.testng.Assert.assertEquals;
2223
import static org.testng.Assert.assertFalse;
2324
import static org.testng.Assert.assertNull;
2425
import static org.testng.Assert.assertTrue;
2526
import static org.testng.Assert.fail;
26-
import java.lang.reflect.Field;
2727
import java.util.HashMap;
2828
import java.util.HashSet;
2929
import java.util.List;
@@ -36,11 +36,10 @@
3636
import lombok.Data;
3737
import org.apache.avro.AvroRuntimeException;
3838
import org.apache.avro.reflect.Nullable;
39+
import org.apache.pulsar.broker.BrokerTestUtil;
3940
import org.apache.pulsar.client.api.schema.GenericRecord;
40-
import org.apache.pulsar.client.impl.ConsumerImpl;
41-
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
4241
import org.apache.pulsar.client.util.RetryMessageUtil;
43-
import org.reflections.ReflectionUtils;
42+
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
4443
import org.slf4j.Logger;
4544
import org.slf4j.LoggerFactory;
4645
import org.testng.annotations.AfterMethod;
@@ -617,10 +616,12 @@ public void testRetryTopicByCustomTopicName() throws Exception {
617616

618617
@Test(timeOut = 30000L)
619618
public void testRetryTopicException() throws Exception {
620-
final String topic = "persistent://my-property/my-ns/retry-topic";
619+
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
620+
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
621621
final int maxRedeliveryCount = 2;
622622
final int sendMessages = 1;
623623
// subscribe before publish
624+
@Cleanup
624625
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
625626
.topic(topic)
626627
.subscriptionName("my-subscription")
@@ -629,7 +630,7 @@ public void testRetryTopicException() throws Exception {
629630
.receiverQueueSize(100)
630631
.deadLetterPolicy(DeadLetterPolicy.builder()
631632
.maxRedeliverCount(maxRedeliveryCount)
632-
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
633+
.retryLetterTopic(retryLetterTopic)
633634
.build())
634635
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
635636
.subscribe();
@@ -642,30 +643,16 @@ public void testRetryTopicException() throws Exception {
642643
}
643644
producer.close();
644645

645-
// mock a retry producer exception when reconsumelater is called
646-
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
647-
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
648-
for (ConsumerImpl<byte[]> c : consumers) {
649-
Set<Field> deadLetterPolicyField =
650-
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
651-
652-
if (deadLetterPolicyField.size() != 0) {
653-
Field field = deadLetterPolicyField.iterator().next();
654-
field.setAccessible(true);
655-
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
656-
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
657-
}
658-
}
646+
admin.topics().terminateTopic(retryLetterTopic);
647+
659648
Message<byte[]> message = consumer.receive();
660649
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
661650
try {
662651
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
663-
} catch (PulsarClientException.InvalidTopicNameException e) {
664-
assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class);
665-
} catch (Exception e) {
666-
fail("exception should be PulsarClientException.InvalidTopicNameException");
652+
fail("exception should be PulsarClientException.TopicTerminatedException");
653+
} catch (PulsarClientException.TopicTerminatedException e) {
654+
// ok
667655
}
668-
consumer.close();
669656
}
670657

671658

@@ -718,10 +705,12 @@ public void testRetryProducerWillCloseByConsumer() throws Exception {
718705

719706
@Test(timeOut = 30000L)
720707
public void testRetryTopicExceptionWithConcurrent() throws Exception {
721-
final String topic = "persistent://my-property/my-ns/retry-topic";
708+
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
709+
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
722710
final int maxRedeliveryCount = 2;
723711
final int sendMessages = 10;
724712
// subscribe before publish
713+
@Cleanup
725714
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
726715
.topic(topic)
727716
.subscriptionName("my-subscription")
@@ -730,7 +719,7 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
730719
.receiverQueueSize(100)
731720
.deadLetterPolicy(DeadLetterPolicy.builder()
732721
.maxRedeliverCount(maxRedeliveryCount)
733-
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
722+
.retryLetterTopic(retryLetterTopic)
734723
.build())
735724
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
736725
.subscribe();
@@ -739,24 +728,11 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
739728
.topic(topic)
740729
.create();
741730
for (int i = 0; i < sendMessages; i++) {
742-
producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send();
731+
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
743732
}
744733
producer.close();
745734

746-
// mock a retry producer exception when reconsumelater is called
747-
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
748-
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
749-
for (ConsumerImpl<byte[]> c : consumers) {
750-
Set<Field> deadLetterPolicyField =
751-
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
752-
753-
if (deadLetterPolicyField.size() != 0) {
754-
Field field = deadLetterPolicyField.iterator().next();
755-
field.setAccessible(true);
756-
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
757-
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
758-
}
759-
}
735+
admin.topics().terminateTopic(retryLetterTopic);
760736

761737
List<Message<byte[]>> messages = Lists.newArrayList();
762738
for (int i = 0; i < sendMessages; i++) {
@@ -769,16 +745,114 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
769745
new Thread(() -> {
770746
try {
771747
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
772-
} catch (Exception ignore) {
773-
774-
} finally {
748+
} catch (PulsarClientException.TopicTerminatedException e) {
749+
// ok
775750
latch.countDown();
751+
} catch (PulsarClientException e) {
752+
// unexpected exception
753+
fail("unexpected exception", e);
776754
}
777755
}).start();
778756
}
779757

780-
latch.await();
758+
latch.await(sendMessages, TimeUnit.SECONDS);
781759
consumer.close();
782760
}
783761

762+
@Data
763+
static class Payload {
764+
String number;
765+
766+
public Payload() {
767+
768+
}
769+
770+
public Payload(String number) {
771+
this.number = number;
772+
}
773+
}
774+
775+
@Data
776+
static class PayloadIncompatible {
777+
long number;
778+
779+
public PayloadIncompatible() {
780+
781+
}
782+
783+
public PayloadIncompatible(long number) {
784+
this.number = number;
785+
}
786+
}
787+
788+
// reproduce similar issue as reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
789+
// but for retry topic
790+
@Test
791+
public void testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
792+
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
793+
admin.namespaces().createNamespace(namespace);
794+
// don't enforce schema validation
795+
admin.namespaces().setSchemaValidationEnforced(namespace, false);
796+
// set schema compatibility strategy to always compatible
797+
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
798+
799+
Schema<Payload> schema = Schema.AVRO(Payload.class);
800+
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
801+
PayloadIncompatible.class);
802+
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
803+
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
804+
String dlqTopic = topic + "-DLQ";
805+
String retryTopic = topic + "-RETRY";
806+
807+
// create topics
808+
admin.topics().createNonPartitionedTopic(topic);
809+
admin.topics().createNonPartitionedTopic(dlqTopic);
810+
admin.topics().createNonPartitionedTopic(retryTopic);
811+
812+
Consumer<Payload> payloadConsumer = null;
813+
try {
814+
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
815+
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
816+
.ackTimeout(1, TimeUnit.SECONDS)
817+
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
818+
.enableRetry(true)
819+
.deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3)
820+
.deadLetterTopic(dlqTopic).build())
821+
.messageListener((c, msg) -> {
822+
try {
823+
c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS);
824+
} catch (PulsarClientException e) {
825+
throw new RuntimeException(e);
826+
}
827+
}).subscribe();
828+
829+
// send a message to the topic with the incompatible schema
830+
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
831+
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
832+
.create()) {
833+
producer.send(payloadIncompatible);
834+
}
835+
836+
Thread.sleep(2000L);
837+
838+
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
839+
.describedAs("producer count of retry topic %s should be <= 1 so that it doesn't leak producers",
840+
retryTopic)
841+
.isLessThanOrEqualTo(1);
842+
843+
} finally {
844+
if (payloadConsumer != null) {
845+
try {
846+
payloadConsumer.close();
847+
} catch (PulsarClientException e) {
848+
// ignore
849+
}
850+
}
851+
}
852+
853+
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
854+
.describedAs("producer count of retry topic %s should be 0 here",
855+
retryTopic)
856+
.isEqualTo(0);
857+
}
784858
}

0 commit comments

Comments
 (0)