Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve JMSPriority handling on the consumer side #139

Merged
merged 13 commits into from
May 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,113 +15,38 @@
*/
package com.datastax.oss.pulsar.jms;

import com.google.common.collect.Streams;
import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;

@Slf4j
public class MessagePriorityGrowableArrayBlockingQueue extends GrowableArrayBlockingQueue<Message> {

static int getPriority(Message m) {
return PulsarMessage.readJMSPriority(m, PulsarMessage.DEFAULT_PRIORITY);
}

// for drainTo only, needs .add() method and that's it
private static class ForwardingCollection implements Collection<Pair<Integer, Message>> {

private final Collection<? super Message> c;

public ForwardingCollection(Collection<? super Message> c) {
this.c = c;
}

@Override
public int size() {
throw new IllegalStateException("Not supported");
}

@Override
public boolean isEmpty() {
throw new IllegalStateException("Not supported");
}

@Override
public boolean contains(Object o) {
throw new IllegalStateException("Not supported");
}

@Override
public Iterator<Pair<Integer, Message>> iterator() {
throw new IllegalStateException("Not supported");
}

@Override
public Object[] toArray() {
throw new IllegalStateException("Not supported");
}

@Override
public <T> T[] toArray(T[] a) {
throw new IllegalStateException("Not supported");
}

@Override
public boolean add(Pair<Integer, Message> message) {
return c.add(message.getRight());
}

@Override
public boolean remove(Object o) {
throw new IllegalStateException("Not supported");
}

@Override
public boolean containsAll(Collection<?> c) {
throw new IllegalStateException("Not supported");
}

@Override
public boolean addAll(Collection<? extends Pair<Integer, Message>> c) {
throw new IllegalStateException("Not supported");
}

@Override
public boolean removeAll(Collection<?> c) {
throw new IllegalStateException("Not supported");
}

@Override
public boolean retainAll(Collection<?> c) {
throw new IllegalStateException("Not supported");
}

@Override
public void clear() {}

@Override
public boolean equals(Object o) {
// shouldn't be used, but we need to keep spotbugs happy
return this == o || c.equals(o);
}

@Override
public int hashCode() {
return c.hashCode();
}
}

private final PriorityBlockingQueue<Pair<Integer, Message>> queue;
private final AtomicBoolean terminated = new AtomicBoolean(false);

private volatile Consumer<Message> itemAfterTerminatedHandler;
private final AtomicInteger[] numberMessagesByPrority = new AtomicInteger[10];

private static final Comparator<Pair<Integer, Message>> comparator =
(o1, o2) -> {
int priority1 = getPriority(o1.getRight());
int priority2 = getPriority(o2.getRight());
int priority1 = o1.getLeft();
int priority2 = o2.getLeft();
if (priority1 == priority2) {
// if priorities are equal, we want to sort by messageId
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
return o2.getRight().getMessageId().compareTo(o1.getRight().getMessageId());
}
return Integer.compare(priority2, priority1);
};

Expand All @@ -131,11 +56,14 @@ public MessagePriorityGrowableArrayBlockingQueue() {

public MessagePriorityGrowableArrayBlockingQueue(int initialCapacity) {
queue = new PriorityBlockingQueue<>(initialCapacity, comparator);
for (int i = 0; i < 10; i++) {
numberMessagesByPrority[i] = new AtomicInteger();
}
}

@Override
public Message remove() {
return queue.remove().getRight();
throw new UnsupportedOperationException();
}

@Override
Expand All @@ -144,12 +72,18 @@ public Message poll() {
if (pair == null) {
return null;
}
return pair.getRight();
Message result = pair.getRight();
int prio = pair.getLeft();
if (log.isDebugEnabled()) {
log.debug("polled message prio {} {} stats {}", prio, result.getMessageId(), Arrays.toString(numberMessagesByPrority));
}
numberMessagesByPrority[prio].decrementAndGet();
return result;
}

@Override
public Message element() {
return queue.element().getRight();
throw new UnsupportedOperationException();
}

@Override
Expand All @@ -158,32 +92,51 @@ public Message peek() {
if (pair == null) {
return null;
}
return pair.getRight();
Message result = pair.getRight();
log.info("peeking message: {} prio {}", result.getMessageId(), getPriority(result));
return result;
}

@Override
public boolean offer(Message e) {
return queue.offer(Pair.of(getPriority(e), e));

boolean result;
if (!this.terminated.get()) {
int prio = getPriority(e);
numberMessagesByPrority[prio].incrementAndGet();
result = queue.offer(Pair.of(prio, e));
if (log.isDebugEnabled()) {
log.debug("offered message: {} prio {} stats {}",
e.getMessageId(), getPriority(e), Arrays.toString(numberMessagesByPrority));
}
} else {
log.info("queue is terminated, not offering message: {}", e.getMessageId());
if (itemAfterTerminatedHandler != null) {
itemAfterTerminatedHandler.accept(e);
}
result = false;
}
return result;
}

@Override
public void put(Message e) {
queue.put(Pair.of(getPriority(e), e));
throw new UnsupportedOperationException();
}

@Override
public boolean add(Message e) {
return queue.add(Pair.of(getPriority(e), e));
throw new UnsupportedOperationException();
}

@Override
public boolean offer(Message e, long timeout, TimeUnit unit) {
return queue.offer(Pair.of(getPriority(e), e), timeout, unit);
throw new UnsupportedOperationException();
}

@Override
public Message take() throws InterruptedException {
return queue.take().getRight();
throw new UnsupportedOperationException();
}

@Override
Expand All @@ -192,73 +145,82 @@ public Message poll(long timeout, TimeUnit unit) throws InterruptedException {
if (pair == null) {
return null;
}
return pair.getRight();
Message result = pair.getRight();
int prio = pair.getLeft();
if (log.isDebugEnabled()) {
log.debug(
"polled message (tm {} {}):prio {} {} stats {}",
timeout,
unit,
prio,
result.getMessageId(),
Arrays.toString(numberMessagesByPrority));
}
numberMessagesByPrority[prio].decrementAndGet();
return result;
}

@Override
public int remainingCapacity() {
return queue.remainingCapacity();
public void clear() {
queue.clear();
}

@Override
public int drainTo(Collection<? super Message> c) {
return queue.drainTo(new ForwardingCollection(c));
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(Collection<? super Message> c, int maxElements) {
return queue.drainTo(new ForwardingCollection(c), maxElements);
public int size() {
return queue.size();
}

@Override
public void clear() {
queue.clear();
public void forEach(Consumer<? super Message> action) {
queue.stream().sorted(comparator).forEach(x -> action.accept(x.getRight()));
}

@Override
public boolean remove(Object o) {
for (Pair<Integer, Message> pair : queue) {
if (pair.getRight().equals(o)) {
return queue.remove(pair);
}
}
return false;
public String toString() {
return queue.toString();
}

@Override
public int size() {
return queue.size();
public void terminate(Consumer<Message> itemAfterTerminatedHandler) {
this.itemAfterTerminatedHandler = itemAfterTerminatedHandler;
terminated.set(true);
}

@Override
public Iterator<Message> iterator() {
return Streams.stream(queue.iterator()).sorted(comparator).map(Pair::getRight).iterator();
public boolean isTerminated() {
return terminated.get();
}

@Override
public List<Message> toList() {
List<Message> list = new ArrayList<>(size());
forEach(list::add);
return list;
public int remainingCapacity() {
throw new UnsupportedOperationException();
}

@Override
public void forEach(Consumer<? super Message> action) {
queue.stream().sorted(comparator).forEach(x -> action.accept(x.getRight()));
public int drainTo(Collection<? super Message> c) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return queue.toString();
public int drainTo(Collection<? super Message> c, int maxElements) {
throw new UnsupportedOperationException();
}


@Override
public void terminate(Consumer<Message> itemAfterTerminatedHandler) {
terminated.set(true);
public Iterator<Message> iterator() {
throw new UnsupportedOperationException();
}

@Override
public boolean isTerminated() {
return terminated.get();
public List<Message> toList() {
throw new UnsupportedOperationException();
}


}
Loading
Loading