Skip to content

Commit

Permalink
Add test case about ackReceiptEnabled
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 16, 2022
1 parent 060ac1f commit 9c1128c
Showing 1 changed file with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
Expand All @@ -34,13 +35,15 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

@Timeout(30)
@Slf4j
public class AcknowledgementModeTest {

@TempDir public static Path tempDir;
Expand Down Expand Up @@ -77,7 +80,57 @@ public void testAUTO_ACKNOWLEDGE() throws Exception {

try (MessageConsumer consumer = session.createConsumer(destination); ) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
// message is automatically acknowledged on receive
// message is automatically acknowledged on receive,
// but, as we are not setting ackReceiptEnabled, the acknowledgement does not wait
// for the server to return success or failure
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
assertNull(consumer.receive(100));
}
}
}
}
}

@Test
public void testAUTO_ACKNOWLEDGE_ackReceipt() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("ackReceiptEnabled", true);
properties.put("consumerConfig", consumerConfig);
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (Connection connection = factory.createConnection()) {
connection.start();
try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) {
Queue destination =
session.createQueue("persistent://public/default/test-" + UUID.randomUUID());
try (MessageProducer producer = session.createProducer(destination); ) {
for (int i = 0; i < 1000; i++) {
TextMessage textMsg = session.createTextMessage("foo");
textMsg.setStringProperty("test", "foo");
producer.send(
textMsg,
new CompletionListener() {
@Override
public void onCompletion(Message message) {}

@Override
public void onException(Message message, Exception e) {}
});
}
}

// the consumer waits for the confirmation of the ack
// with you set ackReceiptEnabled=false you will see
// that this test is notably faster
try (MessageConsumer consumer = session.createConsumer(destination); ) {
for (int i = 0; i < 1000; i++) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
log.info("ack {}", i);
// message is automatically acknowledged on receive
}
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
Expand Down

0 comments on commit 9c1128c

Please sign in to comment.