diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 49aba8f3..91cf3e03 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -76,6 +76,7 @@ public class PulsarConnectionFactory private String systemNamespace = "public/default"; private String defaultClientId = null; private boolean enableTransaction = false; + private boolean emulateTransactions = false; private boolean enableClientSideEmulation = false; private boolean useServerSideFiltering = false; private boolean forceDeleteTemporaryDestinations = false; @@ -301,6 +302,15 @@ private synchronized void ensureInitialized() throws JMSException { this.enableTransaction = Boolean.parseBoolean(configuration.getOrDefault("enableTransaction", "false").toString()); + this.emulateTransactions = + Boolean.parseBoolean( + getAndRemoveString("jms.emulateTransactions", "false", configuration).toString()); + + if (emulateTransactions && enableTransaction) { + throw new IllegalStateException( + "You cannot set both enableTransaction and jms.emulateTransactions"); + } + String webServiceUrl = getAndRemoveString("webServiceUrl", "http://localhost:8080", configuration); @@ -406,6 +416,10 @@ public synchronized boolean isEnableTransaction() { return enableTransaction; } + public synchronized boolean isEmulateTransactions() { + return emulateTransactions; + } + public synchronized PulsarClient getPulsarClient() { return pulsarClient; } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java index c80be03d..fa0ec205 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java @@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.transaction.Transaction; @Slf4j class PulsarMessageProducer implements MessageProducer, TopicPublisher, QueueSender { @@ -1198,7 +1199,13 @@ private void sendMessage(Destination defaultDestination, Message message) throws PulsarMessage pulsarMessage = prepareMessageForSend(message); final TypedMessageBuilder typedMessageBuilder; if (session.getTransacted()) { - typedMessageBuilder = producer.newMessage(session.getTransaction()); + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + // emulated transactions + typedMessageBuilder = producer.newMessage(); + } } else { typedMessageBuilder = producer.newMessage(); } @@ -1245,7 +1252,13 @@ public void onException(Message completedMessage, Exception e) { } TypedMessageBuilder typedMessageBuilder; if (session.getTransacted()) { - typedMessageBuilder = producer.newMessage(session.getTransaction()); + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + // emulated transactions + typedMessageBuilder = producer.newMessage(); + } } else { typedMessageBuilder = producer.newMessage(); } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java index 0cc2b715..1f17a20f 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java @@ -77,6 +77,7 @@ public class PulsarSession implements Session, QueueSession, TopicSession { private boolean jms20; private final int sessionMode; private final boolean transacted; + private final boolean emulateTransactions; // this is to emulate QueueSession/TopicSession private boolean allowQueueOperations = true; private boolean allowTopicOperations = true; @@ -93,21 +94,26 @@ public class PulsarSession implements Session, QueueSession, TopicSession { private final List browsers = new CopyOnWriteArrayList<>(); public PulsarSession(int sessionMode, PulsarConnection connection) throws JMSException { + if (sessionMode == SESSION_TRANSACTED && !connection.getFactory().isEnableTransaction()) { + if (connection.getFactory().isEmulateTransactions()) { + emulateTransactions = true; + } else { + throw new JMSException( + "Please enable transactions on PulsarConnectionFactory with enableTransaction=true, you can configure " + + "jms.emulateTransactions if your Pulsar cluster does not support transactions"); + } + } else { + emulateTransactions = false; + } this.jms20 = false; this.connection = connection; this.sessionMode = sessionMode; this.transacted = sessionMode == Session.SESSION_TRANSACTED; validateSessionMode(sessionMode); - if (sessionMode == SESSION_TRANSACTED) { - if (!connection.getFactory().isEnableTransaction()) { - throw new JMSException( - "Please enable transactions on PulsarConnectionFactory with enableTransaction=true"); - } - } } Transaction getTransaction() throws JMSException { - if (transaction == null && sessionMode == SESSION_TRANSACTED) { + if (transaction == null && sessionMode == SESSION_TRANSACTED && !emulateTransactions) { this.transaction = startTransaction(connection); } return this.transaction; @@ -389,6 +395,13 @@ public void commit() throws JMSException { } closeLock.readLock().lock(); try { + if (emulateTransactions) { + // we are postponing to this moment the acknowledgment + for (PulsarMessage msg : unackedMessages) { + msg.acknowledgeInternal(); + } + unackedMessages.clear(); + } if (transaction != null) { // we are postponing to this moment the acknowledgment List> handles = new ArrayList<>(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java index 2f0800ea..f3659597 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java @@ -18,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.datastax.oss.pulsar.jms.utils.PulsarCluster; import java.nio.file.Path; @@ -42,12 +43,10 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @Slf4j -@Disabled public class TransactionsTest { @TempDir public static Path tempDir; @@ -694,4 +693,41 @@ public void onException(Message message, Exception e) {} } } } + + @Test + public void emulatedTransactionsTest() throws Exception { + + Map properties = new HashMap<>(); + properties.put("webServiceUrl", cluster.getAddress()); + properties.put("enableTransaction", "false"); + properties.put("jms.emulateTransactions", "true"); + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (Connection connection = factory.createConnection()) { + connection.start(); + + try (Session consumerSession = connection.createSession(Session.SESSION_TRANSACTED); ) { + Destination destination = + consumerSession.createTopic("persistent://public/default/test-" + UUID.randomUUID()); + try (MessageConsumer consumer = consumerSession.createConsumer(destination)) { + + try (Session transaction = connection.createSession(Session.SESSION_TRANSACTED); ) { + assertTrue(transaction.getTransacted()); + try (MessageProducer producer = transaction.createProducer(destination); ) { + TextMessage textMsg = transaction.createTextMessage("foo"); + producer.send(textMsg); + producer.send(textMsg); + } + + transaction.commit(); + + assertNotNull(consumer.receive()); + assertNotNull(consumer.receive()); + + consumerSession.commit(); + } + } + } + } + } + } }