Skip to content

Commit

Permalink
Fix batch listener error handling (#1007)
Browse files Browse the repository at this point in the history
Updates the batch listener error handler code to handle the case where there
is only a single outstanding message in the batch list when retries are expired.

See #998

Signed-off-by: darshimo <[email protected]>
Co-authored-by: darshimo <[email protected]>
  • Loading branch information
onobc and darshimo authored Jan 20, 2025
1 parent b493e4f commit 7881c8f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,11 @@ private List<Message<T>> invokeBatchListenerErrorHandler(AtomicBoolean inRetryMo
pulsarBatchListenerFailedException);
handleAck(pulsarMessage);
if (messageList.size() == 1) {
messageList.remove(0);
messagesPendingInBatch.set(false);
}
else {
messageList = messageList.subList(1, messageList.size());
}
if (!messageList.isEmpty()) {
messagesPendingInBatch.set(true);
}
this.pulsarConsumerErrorHandler.clearMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.client.api.Schema;
import org.junit.jupiter.api.Test;

import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.PulsarOperations;
Expand All @@ -51,6 +52,8 @@
*/
public class DefaultPulsarConsumerErrorHandlerTests implements PulsarTestContainerSupport {

private final LogAccessor logger = new LogAccessor(this.getClass());

@Test
@SuppressWarnings("unchecked")
void happyPathErrorHandlingForRecordMessageListener() throws Exception {
Expand Down Expand Up @@ -564,4 +567,78 @@ else if (message.getValue() == 7) {
pulsarClient.close();
}

@Test
@SuppressWarnings("unchecked")
void whenBatchRecordListenerOneMessageBatchFailsThenSentToDltProperly() throws Exception {
var topicName = "default-error-handler-tests-9";
var pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build();
var pulsarConsumerFactory = new DefaultPulsarConsumerFactory<Integer>(pulsarClient,
List.of((consumerBuilder) -> {
consumerBuilder.topic(topicName);
consumerBuilder.subscriptionName("%s-sub".formatted(topicName));
}));
// Prepare container for batch consume
var pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setSchema(Schema.INT32);
pulsarContainerProperties.setAckMode(AckMode.MANUAL);
pulsarContainerProperties.setBatchListener(true);
pulsarContainerProperties.setMaxNumMessages(1);
pulsarContainerProperties.setBatchTimeoutMillis(60_000);
PulsarBatchAcknowledgingMessageListener<?> pulsarBatchMessageListener = mock();
doAnswer(invocation -> {
List<Message<Integer>> message = invocation.getArgument(1);
Message<Integer> integerMessage = message.get(0);
Integer value = integerMessage.getValue();
if (value == 0) {
throw new PulsarBatchListenerFailedException("failed", integerMessage);
}
Acknowledgement acknowledgment = invocation.getArgument(2);
List<MessageId> messageIds = new ArrayList<>();
for (Message<Integer> integerMessage1 : message) {
messageIds.add(integerMessage1.getMessageId());
}
acknowledgment.acknowledge(messageIds);
return new Object();
}).when(pulsarBatchMessageListener).received(any(Consumer.class), any(List.class), any(Acknowledgement.class));
pulsarContainerProperties.setMessageListener(pulsarBatchMessageListener);
var container = new DefaultPulsarMessageListenerContainer<>(pulsarConsumerFactory, pulsarContainerProperties);

// Set error handler to recover after 2 retries
PulsarTemplate<Integer> mockPulsarTemplate = mock(RETURNS_DEEP_STUBS);
PulsarOperations.SendMessageBuilder<Integer> sendMessageBuilderMock = mock();
when(mockPulsarTemplate.newMessage(any(Integer.class))
.withTopic(any(String.class))
.withMessageCustomizer(any(TypedMessageBuilderCustomizer.class))).thenReturn(sendMessageBuilderMock);
container.setPulsarConsumerErrorHandler(new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(mockPulsarTemplate), new FixedBackOff(100, 2)));
try {
container.start();
// Send single message in batch
var pulsarProducerFactory = new DefaultPulsarProducerFactory<Integer>(pulsarClient, topicName);
var pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
pulsarTemplate.sendAsync(0);
// Initial call should fail
// Next 2 calls should fail (retries 2)
// No more calls after that - msg should go to DLT
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> verify(pulsarBatchMessageListener, times(3)).received(any(Consumer.class),
any(List.class), any(Acknowledgement.class)));
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> verify(sendMessageBuilderMock, times(1)).sendAsync());
}
finally {
safeStopContainer(container);
}
pulsarClient.close();
}

private void safeStopContainer(PulsarMessageListenerContainer container) {
try {
container.stop();
}
catch (Exception ex) {
logger.warn(ex, "Failed to stop container %s: %s".formatted(container, ex.getMessage()));
}
}

}

0 comments on commit 7881c8f

Please sign in to comment.