Skip to content

Commit

Permalink
Feature: jms.emulateTransactions (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Jun 6, 2022
1 parent 73ee0a0 commit d38e2b0
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class PulsarConnectionFactory
private String systemNamespace = "public/default";
private String defaultClientId = null;
private boolean enableTransaction = false;
private boolean emulateTransactions = false;
private boolean enableClientSideEmulation = false;
private boolean useServerSideFiltering = false;
private boolean forceDeleteTemporaryDestinations = false;
Expand Down Expand Up @@ -301,6 +302,15 @@ private synchronized void ensureInitialized() throws JMSException {
this.enableTransaction =
Boolean.parseBoolean(configuration.getOrDefault("enableTransaction", "false").toString());

this.emulateTransactions =
Boolean.parseBoolean(
getAndRemoveString("jms.emulateTransactions", "false", configuration).toString());

if (emulateTransactions && enableTransaction) {
throw new IllegalStateException(
"You cannot set both enableTransaction and jms.emulateTransactions");
}

String webServiceUrl =
getAndRemoveString("webServiceUrl", "http://localhost:8080", configuration);

Expand Down Expand Up @@ -406,6 +416,10 @@ public synchronized boolean isEnableTransaction() {
return enableTransaction;
}

public synchronized boolean isEmulateTransactions() {
return emulateTransactions;
}

public synchronized PulsarClient getPulsarClient() {
return pulsarClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;

@Slf4j
class PulsarMessageProducer implements MessageProducer, TopicPublisher, QueueSender {
Expand Down Expand Up @@ -1198,7 +1199,13 @@ private void sendMessage(Destination defaultDestination, Message message) throws
PulsarMessage pulsarMessage = prepareMessageForSend(message);
final TypedMessageBuilder<byte[]> typedMessageBuilder;
if (session.getTransacted()) {
typedMessageBuilder = producer.newMessage(session.getTransaction());
Transaction transaction = session.getTransaction();
if (transaction != null) {
typedMessageBuilder = producer.newMessage(transaction);
} else {
// emulated transactions
typedMessageBuilder = producer.newMessage();
}
} else {
typedMessageBuilder = producer.newMessage();
}
Expand Down Expand Up @@ -1245,7 +1252,13 @@ public void onException(Message completedMessage, Exception e) {
}
TypedMessageBuilder<byte[]> typedMessageBuilder;
if (session.getTransacted()) {
typedMessageBuilder = producer.newMessage(session.getTransaction());
Transaction transaction = session.getTransaction();
if (transaction != null) {
typedMessageBuilder = producer.newMessage(transaction);
} else {
// emulated transactions
typedMessageBuilder = producer.newMessage();
}
} else {
typedMessageBuilder = producer.newMessage();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class PulsarSession implements Session, QueueSession, TopicSession {
private boolean jms20;
private final int sessionMode;
private final boolean transacted;
private final boolean emulateTransactions;
// this is to emulate QueueSession/TopicSession
private boolean allowQueueOperations = true;
private boolean allowTopicOperations = true;
Expand All @@ -93,21 +94,26 @@ public class PulsarSession implements Session, QueueSession, TopicSession {
private final List<PulsarQueueBrowser> browsers = new CopyOnWriteArrayList<>();

public PulsarSession(int sessionMode, PulsarConnection connection) throws JMSException {
if (sessionMode == SESSION_TRANSACTED && !connection.getFactory().isEnableTransaction()) {
if (connection.getFactory().isEmulateTransactions()) {
emulateTransactions = true;
} else {
throw new JMSException(
"Please enable transactions on PulsarConnectionFactory with enableTransaction=true, you can configure "
+ "jms.emulateTransactions if your Pulsar cluster does not support transactions");
}
} else {
emulateTransactions = false;
}
this.jms20 = false;
this.connection = connection;
this.sessionMode = sessionMode;
this.transacted = sessionMode == Session.SESSION_TRANSACTED;
validateSessionMode(sessionMode);
if (sessionMode == SESSION_TRANSACTED) {
if (!connection.getFactory().isEnableTransaction()) {
throw new JMSException(
"Please enable transactions on PulsarConnectionFactory with enableTransaction=true");
}
}
}

Transaction getTransaction() throws JMSException {
if (transaction == null && sessionMode == SESSION_TRANSACTED) {
if (transaction == null && sessionMode == SESSION_TRANSACTED && !emulateTransactions) {
this.transaction = startTransaction(connection);
}
return this.transaction;
Expand Down Expand Up @@ -389,6 +395,13 @@ public void commit() throws JMSException {
}
closeLock.readLock().lock();
try {
if (emulateTransactions) {
// we are postponing to this moment the acknowledgment
for (PulsarMessage msg : unackedMessages) {
msg.acknowledgeInternal();
}
unackedMessages.clear();
}
if (transaction != null) {
// we are postponing to this moment the acknowledgment
List<CompletableFuture<?>> handles = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.datastax.oss.pulsar.jms.utils.PulsarCluster;
import java.nio.file.Path;
Expand All @@ -42,12 +43,10 @@
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

@Slf4j
@Disabled
public class TransactionsTest {

@TempDir public static Path tempDir;
Expand Down Expand Up @@ -694,4 +693,41 @@ public void onException(Message message, Exception e) {}
}
}
}

@Test
public void emulatedTransactionsTest() throws Exception {

Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
properties.put("enableTransaction", "false");
properties.put("jms.emulateTransactions", "true");
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (Connection connection = factory.createConnection()) {
connection.start();

try (Session consumerSession = connection.createSession(Session.SESSION_TRANSACTED); ) {
Destination destination =
consumerSession.createTopic("persistent://public/default/test-" + UUID.randomUUID());
try (MessageConsumer consumer = consumerSession.createConsumer(destination)) {

try (Session transaction = connection.createSession(Session.SESSION_TRANSACTED); ) {
assertTrue(transaction.getTransacted());
try (MessageProducer producer = transaction.createProducer(destination); ) {
TextMessage textMsg = transaction.createTextMessage("foo");
producer.send(textMsg);
producer.send(textMsg);
}

transaction.commit();

assertNotNull(consumer.receive());
assertNotNull(consumer.receive());

consumerSession.commit();
}
}
}
}
}
}
}

0 comments on commit d38e2b0

Please sign in to comment.