diff --git a/pom.xml b/pom.xml
index a60bd2fc..53b71a1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,10 +53,8 @@
8
8
2.0.3
-
org.apache.pulsar
- 3.0.0
+ 3.2.2
5.16.1
1.11
5.1.0
diff --git a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java
index e9b1c955..bf9bd59b 100644
--- a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java
+++ b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java
@@ -31,7 +31,6 @@
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.auth.AuthenticationToken;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
@@ -52,6 +51,7 @@ public class DockerTest {
private static final String TEST_PULSAR_DOCKER_IMAGE_NAME =
System.getProperty("testPulsarDockerImageName");
public static final String LUNASTREAMING = "datastax/lunastreaming:2.10_4.4";
+ public static final String LUNASTREAMING_31 = "datastax/lunastreaming:3.1_3.0";
@TempDir Path tempDir;
@@ -75,6 +75,11 @@ public void testLunaStreaming210() throws Exception {
// waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x
test(LUNASTREAMING, false);
}
+ @Test
+ public void testLunaStreaming31() throws Exception {
+ test(LUNASTREAMING_31, false);
+ }
+
@Test
public void testPulsar292Transactions() throws Exception {
@@ -93,12 +98,12 @@ public void testPulsar211Transactions() throws Exception {
@Test
public void testPulsar3Transactions() throws Exception {
- test("apachepulsar/pulsar:3.0.0", true);
+ test("apachepulsar/pulsar:3.2.2", true);
}
@Test
public void testNoAuthentication() throws Exception {
- test("apachepulsar/pulsar:3.0.0", false, false, false);
+ test("apachepulsar/pulsar:3.2.2", false, false, false);
}
@Test
@@ -107,11 +112,22 @@ public void testLunaStreaming210Transactions() throws Exception {
test(LUNASTREAMING, true);
}
+ @Test
+ public void testLunaStreaming31Transactions() throws Exception {
+ // waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x
+ test(LUNASTREAMING_31, true);
+ }
+
@Test
public void testLunaStreaming210ServerSideSelectors() throws Exception {
test(LUNASTREAMING, false, true);
}
+ @Test
+ public void testLunaStreaming31ServerSideSelectors() throws Exception {
+ test(LUNASTREAMING_31, false, true);
+ }
+
@Test
public void testGenericPulsar() throws Exception {
assumeTrue(TEST_PULSAR_DOCKER_IMAGE_NAME != null && !TEST_PULSAR_DOCKER_IMAGE_NAME.isEmpty());
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PriorityGrowableArrayBlockingQueue.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PriorityGrowableArrayBlockingQueue.java
new file mode 100644
index 00000000..6ff61c44
--- /dev/null
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PriorityGrowableArrayBlockingQueue.java
@@ -0,0 +1,155 @@
+package com.datastax.oss.pulsar.jms;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+
+import javax.annotation.Nullable;
+import java.util.*;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+public class PriorityGrowableArrayBlockingQueue extends GrowableArrayBlockingQueue {
+
+ static int getPriority(Message m) {
+ String jmsPriority = m.getProperty("JMSPriority");
+ if (jmsPriority == null || jmsPriority.isEmpty()) {
+ return PulsarMessage.DEFAULT_PRIORITY;
+ }
+ try {
+ return Integer.parseInt(jmsPriority);
+ } catch (NumberFormatException err) {
+ return PulsarMessage.DEFAULT_PRIORITY;
+ }
+ }
+
+ private final PriorityBlockingQueue queue;
+ private final AtomicBoolean terminated = new AtomicBoolean(false);
+ public PriorityGrowableArrayBlockingQueue() {
+ this(10);
+ }
+
+ public PriorityGrowableArrayBlockingQueue(int initialCapacity) {
+ queue = new PriorityBlockingQueue<>(initialCapacity, new Comparator() {
+ @Override
+ public int compare(Message o1, Message o2) {
+ int priority1 = getPriority(o1);
+ int priority2 = getPriority(o2);
+ return Integer.compare(priority2, priority1);
+ }
+ });
+ }
+
+ @Override
+ public Message remove() {
+ return queue.remove();
+ }
+
+ @Override
+ public Message poll() {
+ return queue.poll();
+ }
+
+ @Override
+ public Message element() {
+ return queue.element();
+ }
+
+ @Override
+ public Message peek() {
+ return queue.peek();
+ }
+
+ @Override
+ public boolean offer(Message e) {
+ return queue.offer(e);
+ }
+
+ @Override
+ public void put(Message e) {
+ queue.put(e);
+ }
+
+ @Override
+ public boolean add(Message e) {
+ return queue.add(e);
+ }
+
+ @Override
+ public boolean offer(Message e, long timeout, TimeUnit unit) {
+ return queue.offer(e, timeout, unit);
+ }
+
+ @Override
+ public Message take() throws InterruptedException {
+ return queue.take();
+ }
+
+ @Override
+ public Message poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return queue.remainingCapacity();
+ }
+
+ @Override
+ public int drainTo(Collection super Message> c) {
+ return queue.drainTo(c);
+ }
+
+ @Override
+ public int drainTo(Collection super Message> c, int maxElements) {
+ return queue.drainTo(c, maxElements);
+ }
+
+ @Override
+ public void clear() {
+ queue.clear();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return queue.remove(o);
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public Iterator iterator() {
+ return queue.iterator();
+ }
+
+ @Override
+ public List toList() {
+ List list = new ArrayList<>(size());
+ forEach(list::add);
+ return list;
+ }
+
+ @Override
+ public void forEach(Consumer super Message> action) {
+ queue.forEach(action);
+ }
+
+ @Override
+ public String toString() {
+ return queue.toString();
+ }
+
+ @Override
+ public void terminate(@Nullable Consumer itemAfterTerminatedHandler) {
+ terminated.set(true);
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return terminated.get();
+ }
+}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java
index 2ac93832..f45bceb4 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java
@@ -95,6 +95,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@Slf4j
public class PulsarConnectionFactory
@@ -1339,47 +1340,42 @@ private static void replaceIncomingMessageList(Consumer c) {
incomingMessages.setAccessible(true);
Object oldQueue = incomingMessages.get(consumerBase);
+ BlockingQueue newQueue;
if (oldQueue.getClass().isAssignableFrom(PriorityBlockingQueue.class)) {
- BlockingQueue newQueue =
+ newQueue =
new PriorityBlockingQueue(
10,
new Comparator() {
@Override
public int compare(Message o1, Message o2) {
- int priority1 = getPriority(o1);
- int priority2 = getPriority(o2);
+ int priority1 = PriorityGrowableArrayBlockingQueue.getPriority(o1);
+ int priority2 = PriorityGrowableArrayBlockingQueue.getPriority(o2);
return Integer.compare(priority2, priority1);
}
});
- // drain messages that could have been pre-fetched (the Consumer is paused, so this should
- // not
- // happen)
- ((BlockingQueue) oldQueue).drainTo(newQueue);
-
- incomingMessages.set(c, newQueue);
+ } else if (oldQueue.getClass().isAssignableFrom(PriorityGrowableArrayBlockingQueue.class)) {
+ newQueue = new PriorityGrowableArrayBlockingQueue();
} else {
- log.debug(
- "Field incomingMessages is not a PriorityBlockingQueue, it is a {}."
- + "We cannot apply priority to the messages in the local buffer.",
+ log.warn(
+ "Field incomingMessages is not a PriorityBlockingQueue/GrowableArrayBlockingQueue, it is a {}."
+ + " We cannot apply priority to the messages in the local buffer.",
oldQueue.getClass().getName());
+ return;
}
+
+ // drain messages that could have been pre-fetched (the Consumer is paused, so this should
+ // not
+ // happen)
+ ((BlockingQueue) oldQueue).drainTo(newQueue);
+
+ incomingMessages.set(c, newQueue);
} catch (Exception err) {
throw new RuntimeException(err);
}
}
- private static int getPriority(Message m) {
- String jmsPriority = m.getProperty("JMSPriority");
- if (jmsPriority == null || jmsPriority.isEmpty()) {
- return PulsarMessage.DEFAULT_PRIORITY;
- }
- try {
- return Integer.parseInt(jmsPriority);
- } catch (NumberFormatException err) {
- return PulsarMessage.DEFAULT_PRIORITY;
- }
- }
+
public String downloadServerSideFilter(
String fullQualifiedTopicName, String subscriptionName, SubscriptionMode subscriptionMode)
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java
index 98299327..d4794f05 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java
@@ -108,7 +108,6 @@ static String ACKNOWLEDGE_MODE_TO_STRING(int mode) {
private final int sessionMode;
private final boolean transacted;
private final boolean emulateTransactions;
- private final boolean enableJMSPriority;
// this is to emulate QueueSession/TopicSession
private boolean allowQueueOperations = true;
private boolean allowTopicOperations = true;
@@ -163,7 +162,6 @@ static String ACKNOWLEDGE_MODE_TO_STRING(int mode) {
this.transacted = sessionMode == Session.SESSION_TRANSACTED;
this.overrideConsumerConfiguration = overrideConsumerConfiguration;
PulsarConnectionFactory factory = getFactory();
- this.enableJMSPriority = factory.isEnableJMSPriority();
this.useDedicatedListenerThread = factory.getSessionListenersThreads() <= 0;
if (transacted && factory.isTransactionsStickyPartitions()) {
generateNewTransactionStickyKey();
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java
index aab9dfcc..a20e9ad0 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java
@@ -23,6 +23,8 @@
import com.datastax.oss.pulsar.jms.utils.PulsarCluster;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+
+import java.lang.reflect.Field;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Enumeration;
@@ -32,17 +34,20 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.jms.*;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -346,12 +351,12 @@ private static void verifyPriorities(List received) throws JMSException
for (int priority : received) {
if (priority == LOW_PRIORITY && foundHighPriority) {
- log.info("received priority {} after {}", priority, count);
+ log.info("received priority {} (low) after {} messages and one high priority", priority, count);
foundLowPriorityAfterHighPriority = true;
break;
}
if (priority == HIGH_PRIORITY) {
- log.info("received priority {} after {}", priority, count);
+ log.info("received priority {} (high) after {} messages", priority, count);
foundHighPriority = true;
}
count++;
@@ -504,4 +509,104 @@ public void basicPriorityJMSContextTest() throws Exception {
}
}
}
+
+ @ParameterizedTest(name = "mapping {0}")
+ @ValueSource(strings = {"linear", "non-linear"})
+ public void testConsumerPriorityQueue(String mapping) throws Exception {
+
+
+ final int numMessages = 500;
+ Map properties = new HashMap<>();
+ properties.put("webServiceUrl", cluster.getAddress());
+ properties.put("jms.enableJMSPriority", true);
+ properties.put("jms.priorityMapping", mapping);
+ properties.put(
+ "producerConfig", ImmutableMap.of("blockIfQueueFull", true, "batchingEnabled", false));
+ properties.put("consumerConfig", ImmutableMap.of("receiverQueueSize", numMessages));
+ log.info("running basicPriorityBigBacklogTest with {}", properties);
+
+ try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
+ try (Connection connection = factory.createConnection()) {
+ assertTrue(factory.isEnableJMSPriority());
+ assertEquals(mapping.equals("linear"), factory.isPriorityUseLinearMapping());
+ connection.start();
+ try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);) {
+ Queue destination = session.createQueue("test-" + UUID.randomUUID());
+
+ cluster
+ .getService()
+ .getAdminClient()
+ .topics()
+ .createPartitionedTopic(factory.getPulsarTopicName(destination), 10);
+
+ int numHighPriority = 100;
+
+ try (MessageProducer producer = session.createProducer(destination);) {
+ List> handles = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage textMessage = session.createTextMessage("foo-" + i);
+ if (i < numMessages - numHighPriority) {
+ // the first messages are lower priority
+ producer.setPriority(LOW_PRIORITY);
+ } else {
+ producer.setPriority(HIGH_PRIORITY);
+ }
+
+ CompletableFuture> handle = new CompletableFuture<>();
+ producer.send(
+ textMessage,
+ new CompletionListener() {
+ @Override
+ public void onCompletion(Message message) {
+ handle.complete(null);
+ }
+
+ @Override
+ public void onException(Message message, Exception e) {
+ handle.completeExceptionally(e);
+ }
+ });
+ handles.add(handle);
+ }
+ FutureUtil.waitForAll(handles).get();
+ }
+
+ try (MessageConsumer consumer1 = session.createConsumer(destination);) {
+ List received = new ArrayList<>();
+
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage msg = (TextMessage) consumer1.receive();
+ if (i == 0) {
+ // await all messages in the consumer receive queue
+ ConsumerBase> consumerBase = ((PulsarMessageConsumer) consumer1).getConsumer();
+ Field incomingMessages = ConsumerBase.class.getDeclaredField("incomingMessages");
+ incomingMessages.setAccessible(true);
+ Object queue = incomingMessages.get(consumerBase);
+ Awaitility.await()
+ .until(() -> ((BlockingQueue) queue).size() == numMessages - 1);
+ }
+ received.add(msg.getJMSPriority());
+
+ }
+
+ assertNull(consumer1.receiveNoWait());
+ assertEquals(numMessages, received.size());
+ int firstMessagePriority = 0;
+ for (int i = 0; i < received.size(); i++) {
+ if (i == 0) {
+ firstMessagePriority = received.get(i);
+ continue;
+ }
+ final int lastNHigh = numHighPriority + (firstMessagePriority == HIGH_PRIORITY ? 0 : 1);
+ if (i < lastNHigh) {
+ assertEquals(HIGH_PRIORITY, received.get(i));
+ } else {
+ assertEquals(LOW_PRIORITY, received.get(i));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/tck-executor/start_pulsar.sh b/tck-executor/start_pulsar.sh
index fc168ad3..a02b89a5 100755
--- a/tck-executor/start_pulsar.sh
+++ b/tck-executor/start_pulsar.sh
@@ -2,7 +2,7 @@
set -x -e
-IMAGENAME=${PULSAR_IMAGE_NAME:-apachepulsar/pulsar:3.0.0}
+IMAGENAME=${PULSAR_IMAGE_NAME:-apachepulsar/pulsar:3.2.2}
HERE=$(dirname $0)
HERE=$(realpath "$HERE")