Skip to content

Commit

Permalink
Add test about Batching and Transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 16, 2022
1 parent 1c8458b commit 060ac1f
Showing 1 changed file with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSConsumer;
Expand All @@ -38,6 +39,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -570,4 +572,78 @@ public void sendMessageJMSContextTest() throws Exception {
}
}
}

@Test
public void sendMessageWithBatchingTest() throws Exception {

Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
properties.put("enableTransaction", "true");
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("batchingEnabled", true);
properties.put("producerConfig", producerConfig);

try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (Connection connection = factory.createConnection()) {
connection.start();

try (Session consumerSession = connection.createSession(); ) {
Destination destination =
consumerSession.createTopic("persistent://public/default/test-" + UUID.randomUUID());
try (MessageConsumer consumer = consumerSession.createConsumer(destination)) {

try (Session transaction = connection.createSession(Session.SESSION_TRANSACTED); ) {

try (MessageProducer producer = transaction.createProducer(destination); ) {
TextMessage textMsg = transaction.createTextMessage("foo");
producer.send(
textMsg,
new CompletionListener() {
@Override
public void onCompletion(Message message) {}

@Override
public void onException(Message message, Exception e) {}
});
producer.send(
textMsg,
new CompletionListener() {
@Override
public void onCompletion(Message message) {}

@Override
public void onException(Message message, Exception e) {}
});
}

// message is not "visible" as transaction is not committed
assertNull(consumer.receive(1000));

transaction.commit();

// message is now visible to consumers

// verify that the two messages are part of the same batch
PulsarMessage message1 = (PulsarMessage) consumer.receive();
org.apache.pulsar.client.api.Message<byte[]> receivedPulsarMessage1 =
message1.getReceivedPulsarMessage();
BatchMessageIdImpl messageId1 =
(BatchMessageIdImpl) receivedPulsarMessage1.getMessageId();

PulsarMessage message2 = (PulsarMessage) consumer.receive();
org.apache.pulsar.client.api.Message<byte[]> receivedPulsarMessage2 =
message2.getReceivedPulsarMessage();
BatchMessageIdImpl messageId2 =
(BatchMessageIdImpl) receivedPulsarMessage2.getMessageId();
log.info("ids {} {}", messageId1, messageId2);

assertEquals(messageId1.getLedgerId(), messageId2.getLedgerId());
assertEquals(messageId1.getEntryId(), messageId2.getEntryId());
assertEquals(messageId1.getBatchIndex() + 1, messageId2.getBatchIndex());
}
}
}
}
}
}
}

0 comments on commit 060ac1f

Please sign in to comment.