Skip to content

Commit

Permalink
Improve JMSPriority handling on the consumer side (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored May 6, 2024
1 parent 40e9720 commit 68580e5
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 79 deletions.
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
8 changes: 4 additions & 4 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<mkdir dir="${project.build.outputDirectory}/interceptors" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,147 +19,225 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;

public class MessagePriorityGrowableArrayBlockingQueue extends GrowableArrayBlockingQueue<Message> {
@Slf4j
public class MessagePriorityGrowableArrayBlockingQueue<T>
extends GrowableArrayBlockingQueue<Message<T>> {
private final PriorityBlockingQueue<MessageWithPriority<T>> queue;
private final AtomicBoolean terminated = new AtomicBoolean(false);

private volatile Consumer<Message<T>> itemAfterTerminatedHandler;
private final AtomicInteger[] numberMessagesByPriority = new AtomicInteger[10];

static int getPriority(Message m) {
Integer priority = PulsarMessage.readJMSPriority(m);
return priority == null ? PulsarMessage.DEFAULT_PRIORITY : priority;
@AllArgsConstructor
private static final class MessageWithPriority<T> {
final int priority;
final Message<T> message;
}

private final PriorityBlockingQueue<Message> queue;
private final AtomicBoolean terminated = new AtomicBoolean(false);
private static final Comparator<MessageWithPriority<?>> comparator =
(o1, o2) -> {
// ORDER BY priority DESC, messageId ASC
int priority1 = o1.priority;
int priority2 = o2.priority;
if (priority1 == priority2) {
// if priorities are equal, we want to sort by messageId
return o1.message.getMessageId().compareTo(o2.message.getMessageId());
}
return Integer.compare(priority2, priority1);
};

public MessagePriorityGrowableArrayBlockingQueue() {
this(10);
}

public MessagePriorityGrowableArrayBlockingQueue(int initialCapacity) {
queue =
new PriorityBlockingQueue<>(
initialCapacity,
new Comparator<Message>() {
@Override
public int compare(Message o1, Message o2) {
int priority1 = getPriority(o1);
int priority2 = getPriority(o2);
return Integer.compare(priority2, priority1);
}
});
queue = new PriorityBlockingQueue<>(initialCapacity, comparator);
for (int i = 0; i < 10; i++) {
numberMessagesByPriority[i] = new AtomicInteger();
}
}

@Override
public Message remove() {
return queue.remove();
public Message<T> remove() {
throw new UnsupportedOperationException();
}

@Override
public Message poll() {
return queue.poll();
public Message<T> poll() {
MessageWithPriority<T> pair = queue.poll();
if (pair == null) {
return null;
}
Message<T> result = pair.message;
int prio = pair.priority;
if (log.isDebugEnabled()) {
log.debug(
"polled message prio {} {} stats {}",
prio,
result.getMessageId(),
Arrays.toString(numberMessagesByPriority));
}
numberMessagesByPriority[prio].decrementAndGet();
return result;
}

@Override
public Message element() {
return queue.element();
public Message<T> element() {
throw new UnsupportedOperationException();
}

@Override
public Message peek() {
return queue.peek();
public Message<T> peek() {
MessageWithPriority<T> pair = queue.peek();
if (pair == null) {
return null;
}
Message<T> result = pair.message;
if (log.isDebugEnabled()) {
log.debug(
"peeking message: {} prio {}",
result.getMessageId(),
PulsarMessage.readJMSPriority(result));
}
return result;
}

@Override
public boolean offer(Message e) {
return queue.offer(e);
public boolean offer(Message<T> e) {
boolean result;
if (!this.terminated.get()) {
int prio = PulsarMessage.readJMSPriority(e);
numberMessagesByPriority[prio].incrementAndGet();
result = queue.offer(new MessageWithPriority(prio, e));
if (log.isDebugEnabled()) {
log.debug(
"offered message: {} prio {} stats {}",
e.getMessageId(),
prio,
Arrays.toString(numberMessagesByPriority));
}
} else {
if (log.isDebugEnabled()) {
log.debug("queue is terminated, not offering message: {}", e.getMessageId());
}
if (itemAfterTerminatedHandler != null) {
itemAfterTerminatedHandler.accept(e);
}
result = false;
}
return result;
}

@Override
public void put(Message e) {
queue.put(e);
public void put(Message<T> e) {
throw new UnsupportedOperationException();
}

@Override
public boolean add(Message e) {
return queue.add(e);
public boolean add(Message<T> e) {
throw new UnsupportedOperationException();
}

@Override
public boolean offer(Message e, long timeout, TimeUnit unit) {
return queue.offer(e, timeout, unit);
throw new UnsupportedOperationException();
}

@Override
public Message take() throws InterruptedException {
return queue.take();
public Message<T> take() throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public Message poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
public Message<T> poll(long timeout, TimeUnit unit) throws InterruptedException {
MessageWithPriority<T> pair = queue.poll(timeout, unit);
if (pair == null) {
return null;
}
Message<T> result = pair.message;
int prio = pair.priority;
if (log.isDebugEnabled()) {
log.debug(
"polled message (tm {} {}):prio {} {} stats {}",
timeout,
unit,
prio,
result.getMessageId(),
Arrays.toString(numberMessagesByPriority));
}
numberMessagesByPriority[prio].decrementAndGet();
return result;
}

@Override
public int remainingCapacity() {
return queue.remainingCapacity();
public void clear() {
queue.clear();
}

@Override
public int drainTo(Collection<? super Message> c) {
return queue.drainTo(c);
public int size() {
return queue.size();
}

@Override
public int drainTo(Collection<? super Message> c, int maxElements) {
return queue.drainTo(c, maxElements);
public void forEach(Consumer<? super Message<T>> action) {
queue.stream().sorted(comparator).forEach(x -> action.accept(x.message));
}

@Override
public void clear() {
queue.clear();
public String toString() {
return "queue:" + queue + ", stats:" + getPriorityStats() + ", terminated:" + terminated.get();
}

@Override
public boolean remove(Object o) {
return queue.remove(o);
public void terminate(Consumer<Message<T>> itemAfterTerminatedHandler) {
this.itemAfterTerminatedHandler = itemAfterTerminatedHandler;
terminated.set(true);
}

@Override
public int size() {
return queue.size();
public boolean isTerminated() {
return terminated.get();
}

@Override
public Iterator<Message> iterator() {
return queue.iterator();
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public List<Message> toList() {
List<Message> list = new ArrayList<>(size());
forEach(list::add);
return list;
public int remainingCapacity() {
throw new UnsupportedOperationException();
}

@Override
public void forEach(Consumer<? super Message> action) {
queue.forEach(action);
public int drainTo(Collection<? super Message<T>> c) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return queue.toString();
public int drainTo(Collection<? super Message<T>> c, int maxElements) {
throw new UnsupportedOperationException();
}

@Override
public void terminate(Consumer<Message> itemAfterTerminatedHandler) {
terminated.set(true);
public Iterator<Message<T>> iterator() {
throw new UnsupportedOperationException();
}

@Override
public boolean isTerminated() {
return terminated.get();
public List<Message<T>> toList() {
throw new UnsupportedOperationException();
}

public String getPriorityStats() {
return Arrays.toString(numberMessagesByPriority);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;

@Slf4j
Expand Down Expand Up @@ -1088,9 +1090,7 @@ Producer<byte[]> getProducerForDestination(Destination defaultDestination, boole
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {

Integer priority = PulsarMessage.readJMSPriority(msg);
int key =
priority == null ? PulsarMessage.DEFAULT_PRIORITY : priority;
int key = PulsarMessage.readJMSPriority(msg);
return Utils.mapPriorityToPartition(
key,
metadata.numPartitions(),
Expand Down Expand Up @@ -1346,8 +1346,8 @@ private static void replaceIncomingMessageList(Consumer c) {
new Comparator<Message>() {
@Override
public int compare(Message o1, Message o2) {
int priority1 = MessagePriorityGrowableArrayBlockingQueue.getPriority(o1);
int priority2 = MessagePriorityGrowableArrayBlockingQueue.getPriority(o2);
int priority1 = PulsarMessage.readJMSPriority(o1);
int priority2 = PulsarMessage.readJMSPriority(o2);
return Integer.compare(priority2, priority1);
}
});
Expand All @@ -1370,11 +1370,46 @@ public int compare(Message o1, Message o2) {
((BlockingQueue<Message>) oldQueue).drainTo(newQueue);

incomingMessages.set(c, newQueue);

if (consumerBase instanceof MultiTopicsConsumerImpl) {
setReceiverQueueSizeForJMSPriority(consumerBase);
}
} catch (Exception err) {
throw new RuntimeException(err);
}
}

private static void setReceiverQueueSizeForJMSPriority(ConsumerBase consumerBase) throws Exception {
Field consumersField = MultiTopicsConsumerImpl.class.getDeclaredField("consumers");

consumersField.setAccessible(true);

ConcurrentHashMap<String, ConsumerImpl<?>> consumers =
(ConcurrentHashMap) consumersField.get(consumerBase);
Method setCurrentReceiverQueueSizeMethod =
ConsumerImpl.class.getDeclaredMethod("setCurrentReceiverQueueSize", int.class);
setCurrentReceiverQueueSizeMethod.setAccessible(true);

// set the queue size for each consumer based on the partition index
// we set a higher number to the consumers for the higher priority partitions
// this way the backlog is drained more quickly for the higher priority partitions
int numConsumers = consumers.size();
int sumPriorities =
(numConsumers * (numConsumers + 1)) / 2; // 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10
int receiverQueueSize = consumerBase.getCurrentReceiverQueueSize();

for (ConsumerImpl<?> consumer : consumers.values()) {
String topic = consumer.getTopic();
int partitionIndex = TopicName.get(topic).getPartitionIndex();
// no need to map exactly the partition index to the priority
int prio = Math.max(partitionIndex, 0);
// the size is proportional to the priority (partition index)
int size = Math.max(1, (prio + 1) * receiverQueueSize / sumPriorities);
log.info("Setting receiverQueueSize={} for {} (to handle JMSPriority)", size, topic);
setCurrentReceiverQueueSizeMethod.invoke(consumer, size);
}
}

public String downloadServerSideFilter(
String fullQualifiedTopicName, String subscriptionName, SubscriptionMode subscriptionMode)
throws JMSException {
Expand Down
Loading

0 comments on commit 68580e5

Please sign in to comment.