Skip to content

Commit

Permalink
Upgrade Pulsar client (3.2.2) and improve JMS priority on consumer side
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Apr 8, 2024
1 parent 5c009c6 commit f0929a1
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 34 deletions.
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<jms.version>2.0.3</jms.version>
<!-- waiting for a Apache Pulsar 2.11.0 release, in the meantime we use DataStax Luna Streaming
that is a fork of Apache Pulsar -->
<pulsar.groupId>org.apache.pulsar</pulsar.groupId>
<pulsar.version>3.0.0</pulsar.version>
<pulsar.version>3.2.2</pulsar.version>
<activemq.version>5.16.1</activemq.version>
<hawtbuf.version>1.11</hawtbuf.version>
<curator.version>5.1.0</curator.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.auth.AuthenticationToken;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
Expand All @@ -52,6 +51,7 @@ public class DockerTest {
private static final String TEST_PULSAR_DOCKER_IMAGE_NAME =
System.getProperty("testPulsarDockerImageName");
public static final String LUNASTREAMING = "datastax/lunastreaming:2.10_4.4";
public static final String LUNASTREAMING_31 = "datastax/lunastreaming:3.1_3.0";

@TempDir Path tempDir;

Expand All @@ -75,6 +75,11 @@ public void testLunaStreaming210() throws Exception {
// waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x
test(LUNASTREAMING, false);
}
@Test
public void testLunaStreaming31() throws Exception {
test(LUNASTREAMING_31, false);
}


@Test
public void testPulsar292Transactions() throws Exception {
Expand All @@ -93,12 +98,12 @@ public void testPulsar211Transactions() throws Exception {

@Test
public void testPulsar3Transactions() throws Exception {
test("apachepulsar/pulsar:3.0.0", true);
test("apachepulsar/pulsar:3.2.2", true);
}

@Test
public void testNoAuthentication() throws Exception {
test("apachepulsar/pulsar:3.0.0", false, false, false);
test("apachepulsar/pulsar:3.2.2", false, false, false);
}

@Test
Expand All @@ -107,11 +112,22 @@ public void testLunaStreaming210Transactions() throws Exception {
test(LUNASTREAMING, true);
}

@Test
public void testLunaStreaming31Transactions() throws Exception {
// waiting for Apache Pulsar 2.10.1, in the meantime we use Luna Streaming 2.10.0.x
test(LUNASTREAMING_31, true);
}

@Test
public void testLunaStreaming210ServerSideSelectors() throws Exception {
test(LUNASTREAMING, false, true);
}

@Test
public void testLunaStreaming31ServerSideSelectors() throws Exception {
test(LUNASTREAMING_31, false, true);
}

@Test
public void testGenericPulsar() throws Exception {
assumeTrue(TEST_PULSAR_DOCKER_IMAGE_NAME != null && !TEST_PULSAR_DOCKER_IMAGE_NAME.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package com.datastax.oss.pulsar.jms;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;

import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class PriorityGrowableArrayBlockingQueue extends GrowableArrayBlockingQueue<Message> {

static int getPriority(Message m) {
String jmsPriority = m.getProperty("JMSPriority");
if (jmsPriority == null || jmsPriority.isEmpty()) {
return PulsarMessage.DEFAULT_PRIORITY;
}
try {
return Integer.parseInt(jmsPriority);
} catch (NumberFormatException err) {
return PulsarMessage.DEFAULT_PRIORITY;
}
}

private final PriorityBlockingQueue<Message> queue;
private final AtomicBoolean terminated = new AtomicBoolean(false);
public PriorityGrowableArrayBlockingQueue() {
this(10);
}

public PriorityGrowableArrayBlockingQueue(int initialCapacity) {
queue = new PriorityBlockingQueue<>(initialCapacity, new Comparator<Message>() {
@Override
public int compare(Message o1, Message o2) {
int priority1 = getPriority(o1);
int priority2 = getPriority(o2);
return Integer.compare(priority2, priority1);
}
});
}

@Override
public Message remove() {
return queue.remove();
}

@Override
public Message poll() {
return queue.poll();
}

@Override
public Message element() {
return queue.element();
}

@Override
public Message peek() {
return queue.peek();
}

@Override
public boolean offer(Message e) {
return queue.offer(e);
}

@Override
public void put(Message e) {
queue.put(e);
}

@Override
public boolean add(Message e) {
return queue.add(e);
}

@Override
public boolean offer(Message e, long timeout, TimeUnit unit) {
return queue.offer(e, timeout, unit);
}

@Override
public Message take() throws InterruptedException {
return queue.take();
}

@Override
public Message poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}

@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}

@Override
public int drainTo(Collection<? super Message> c) {
return queue.drainTo(c);
}

@Override
public int drainTo(Collection<? super Message> c, int maxElements) {
return queue.drainTo(c, maxElements);
}

@Override
public void clear() {
queue.clear();
}

@Override
public boolean remove(Object o) {
return queue.remove(o);
}

@Override
public int size() {
return queue.size();
}

@Override
public Iterator<Message> iterator() {
return queue.iterator();
}

@Override
public List<Message> toList() {
List<Message> list = new ArrayList<>(size());
forEach(list::add);
return list;
}

@Override
public void forEach(Consumer<? super Message> action) {
queue.forEach(action);
}

@Override
public String toString() {
return queue.toString();
}

@Override
public void terminate(@Nullable Consumer<Message> itemAfterTerminatedHandler) {
terminated.set(true);
}

@Override
public boolean isTerminated() {
return terminated.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;

@Slf4j
public class PulsarConnectionFactory
Expand Down Expand Up @@ -1339,47 +1340,42 @@ private static void replaceIncomingMessageList(Consumer c) {
incomingMessages.setAccessible(true);

Object oldQueue = incomingMessages.get(consumerBase);
BlockingQueue<Message> newQueue;
if (oldQueue.getClass().isAssignableFrom(PriorityBlockingQueue.class)) {
BlockingQueue<Message> newQueue =
newQueue =
new PriorityBlockingQueue<Message>(
10,
new Comparator<Message>() {
@Override
public int compare(Message o1, Message o2) {
int priority1 = getPriority(o1);
int priority2 = getPriority(o2);
int priority1 = PriorityGrowableArrayBlockingQueue.getPriority(o1);
int priority2 = PriorityGrowableArrayBlockingQueue.getPriority(o2);
return Integer.compare(priority2, priority1);
}
});

// drain messages that could have been pre-fetched (the Consumer is paused, so this should
// not
// happen)
((BlockingQueue<Message>) oldQueue).drainTo(newQueue);

incomingMessages.set(c, newQueue);
} else if (oldQueue.getClass().isAssignableFrom(PriorityGrowableArrayBlockingQueue.class)) {
newQueue = new PriorityGrowableArrayBlockingQueue();
} else {
log.debug(
"Field incomingMessages is not a PriorityBlockingQueue, it is a {}."
+ "We cannot apply priority to the messages in the local buffer.",
log.warn(
"Field incomingMessages is not a PriorityBlockingQueue/GrowableArrayBlockingQueue, it is a {}."
+ " We cannot apply priority to the messages in the local buffer.",
oldQueue.getClass().getName());
return;
}

// drain messages that could have been pre-fetched (the Consumer is paused, so this should
// not
// happen)
((BlockingQueue<Message>) oldQueue).drainTo(newQueue);

incomingMessages.set(c, newQueue);
} catch (Exception err) {
throw new RuntimeException(err);
}
}

private static int getPriority(Message m) {
String jmsPriority = m.getProperty("JMSPriority");
if (jmsPriority == null || jmsPriority.isEmpty()) {
return PulsarMessage.DEFAULT_PRIORITY;
}
try {
return Integer.parseInt(jmsPriority);
} catch (NumberFormatException err) {
return PulsarMessage.DEFAULT_PRIORITY;
}
}


public String downloadServerSideFilter(
String fullQualifiedTopicName, String subscriptionName, SubscriptionMode subscriptionMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ static String ACKNOWLEDGE_MODE_TO_STRING(int mode) {
private final int sessionMode;
private final boolean transacted;
private final boolean emulateTransactions;
private final boolean enableJMSPriority;
// this is to emulate QueueSession/TopicSession
private boolean allowQueueOperations = true;
private boolean allowTopicOperations = true;
Expand Down Expand Up @@ -163,7 +162,6 @@ static String ACKNOWLEDGE_MODE_TO_STRING(int mode) {
this.transacted = sessionMode == Session.SESSION_TRANSACTED;
this.overrideConsumerConfiguration = overrideConsumerConfiguration;
PulsarConnectionFactory factory = getFactory();
this.enableJMSPriority = factory.isEnableJMSPriority();
this.useDedicatedListenerThread = factory.getSessionListenersThreads() <= 0;
if (transacted && factory.isTransactionsStickyPartitions()) {
generateNewTransactionStickyKey();
Expand Down
Loading

0 comments on commit f0929a1

Please sign in to comment.