diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java index 38a28301..d658e2c6 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import javax.jms.CompletionListener; import javax.jms.Connection; import javax.jms.JMSConsumer; import javax.jms.JMSContext; @@ -34,6 +35,7 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,6 +43,7 @@ import org.junit.jupiter.api.io.TempDir; @Timeout(30) +@Slf4j public class AcknowledgementModeTest { @TempDir public static Path tempDir; @@ -77,7 +80,57 @@ public void testAUTO_ACKNOWLEDGE() throws Exception { try (MessageConsumer consumer = session.createConsumer(destination); ) { assertEquals("foo", consumer.receive().getStringProperty("test")); - // message is automatically acknowledged on receive + // message is automatically acknowledged on receive, + // but, as we are not setting ackReceiptEnabled, the acknowledgement does not wait + // for the server to return success or failure + } + + try (MessageConsumer consumer = session.createConsumer(destination); ) { + assertNull(consumer.receive(100)); + } + } + } + } + } + + @Test + public void testAUTO_ACKNOWLEDGE_ackReceipt() throws Exception { + Map properties = new HashMap<>(); + properties.put("webServiceUrl", cluster.getAddress()); + Map consumerConfig = new HashMap<>(); + consumerConfig.put("ackReceiptEnabled", true); + properties.put("consumerConfig", consumerConfig); + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (Connection connection = factory.createConnection()) { + connection.start(); + try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { + Queue destination = + session.createQueue("persistent://public/default/test-" + UUID.randomUUID()); + try (MessageProducer producer = session.createProducer(destination); ) { + for (int i = 0; i < 1000; i++) { + TextMessage textMsg = session.createTextMessage("foo"); + textMsg.setStringProperty("test", "foo"); + producer.send( + textMsg, + new CompletionListener() { + @Override + public void onCompletion(Message message) {} + + @Override + public void onException(Message message, Exception e) {} + }); + } + } + + // the consumer waits for the confirmation of the ack + // with you set ackReceiptEnabled=false you will see + // that this test is notably faster + try (MessageConsumer consumer = session.createConsumer(destination); ) { + for (int i = 0; i < 1000; i++) { + assertEquals("foo", consumer.receive().getStringProperty("test")); + log.info("ack {}", i); + // message is automatically acknowledged on receive + } } try (MessageConsumer consumer = session.createConsumer(destination); ) {