From 060ac1fdc2f4fd2b53e31e921d0e531ead9ff1de Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 16 May 2022 14:38:08 +0200 Subject: [PATCH] Add test about Batching and Transactions --- .../oss/pulsar/jms/TransactionsTest.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java index 5a7ed274..283fdff3 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import javax.jms.CompletionListener; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSConsumer; @@ -38,6 +39,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; @@ -570,4 +572,78 @@ public void sendMessageJMSContextTest() throws Exception { } } } + + @Test + public void sendMessageWithBatchingTest() throws Exception { + + Map properties = new HashMap<>(); + properties.put("webServiceUrl", cluster.getAddress()); + properties.put("enableTransaction", "true"); + Map producerConfig = new HashMap<>(); + producerConfig.put("batchingEnabled", true); + properties.put("producerConfig", producerConfig); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (Connection connection = factory.createConnection()) { + connection.start(); + + try (Session consumerSession = connection.createSession(); ) { + Destination destination = + consumerSession.createTopic("persistent://public/default/test-" + UUID.randomUUID()); + try (MessageConsumer consumer = consumerSession.createConsumer(destination)) { + + try (Session transaction = connection.createSession(Session.SESSION_TRANSACTED); ) { + + try (MessageProducer producer = transaction.createProducer(destination); ) { + TextMessage textMsg = transaction.createTextMessage("foo"); + producer.send( + textMsg, + new CompletionListener() { + @Override + public void onCompletion(Message message) {} + + @Override + public void onException(Message message, Exception e) {} + }); + producer.send( + textMsg, + new CompletionListener() { + @Override + public void onCompletion(Message message) {} + + @Override + public void onException(Message message, Exception e) {} + }); + } + + // message is not "visible" as transaction is not committed + assertNull(consumer.receive(1000)); + + transaction.commit(); + + // message is now visible to consumers + + // verify that the two messages are part of the same batch + PulsarMessage message1 = (PulsarMessage) consumer.receive(); + org.apache.pulsar.client.api.Message receivedPulsarMessage1 = + message1.getReceivedPulsarMessage(); + BatchMessageIdImpl messageId1 = + (BatchMessageIdImpl) receivedPulsarMessage1.getMessageId(); + + PulsarMessage message2 = (PulsarMessage) consumer.receive(); + org.apache.pulsar.client.api.Message receivedPulsarMessage2 = + message2.getReceivedPulsarMessage(); + BatchMessageIdImpl messageId2 = + (BatchMessageIdImpl) receivedPulsarMessage2.getMessageId(); + log.info("ids {} {}", messageId1, messageId2); + + assertEquals(messageId1.getLedgerId(), messageId2.getLedgerId()); + assertEquals(messageId1.getEntryId(), messageId2.getEntryId()); + assertEquals(messageId1.getBatchIndex() + 1, messageId2.getBatchIndex()); + } + } + } + } + } + } }