Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 6, 2024
1 parent c088bbe commit aee5b24
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@

@Slf4j
public class MessagePriorityGrowableArrayBlockingQueue extends GrowableArrayBlockingQueue<Message> {

static int getPriority(Message m) {
return PulsarMessage.readJMSPriority(m, PulsarMessage.DEFAULT_PRIORITY);
}

private final PriorityBlockingQueue<Pair<Integer, Message>> queue;
private final AtomicBoolean terminated = new AtomicBoolean(false);

Expand All @@ -41,11 +36,12 @@ static int getPriority(Message m) {

private static final Comparator<Pair<Integer, Message>> comparator =
(o1, o2) -> {
// ORDER BY priority DESC, messageId ASC
int priority1 = o1.getLeft();
int priority2 = o2.getLeft();
if (priority1 == priority2) {
// if priorities are equal, we want to sort by messageId
return o2.getRight().getMessageId().compareTo(o1.getRight().getMessageId());
return o1.getRight().getMessageId().compareTo(o2.getRight().getMessageId());
}
return Integer.compare(priority2, priority1);
};
Expand Down Expand Up @@ -93,7 +89,7 @@ public Message peek() {
return null;
}
Message result = pair.getRight();
log.info("peeking message: {} prio {}", result.getMessageId(), getPriority(result));
log.info("peeking message: {} prio {}", result.getMessageId(), PulsarMessage.readJMSPriority(result));
return result;
}

Expand All @@ -102,12 +98,12 @@ public boolean offer(Message e) {

boolean result;
if (!this.terminated.get()) {
int prio = getPriority(e);
int prio = PulsarMessage.readJMSPriority(e);
numberMessagesByPrority[prio].incrementAndGet();
result = queue.offer(Pair.of(prio, e));
if (log.isDebugEnabled()) {
log.debug("offered message: {} prio {} stats {}",
e.getMessageId(), getPriority(e), Arrays.toString(numberMessagesByPrority));
e.getMessageId(), prio, Arrays.toString(numberMessagesByPrority));
}
} else {
log.info("queue is terminated, not offering message: {}", e.getMessageId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,8 +1091,7 @@ Producer<byte[]> getProducerForDestination(Destination defaultDestination, boole
public int choosePartition(Message<?> msg, TopicMetadata metadata) {

int key =
PulsarMessage.readJMSPriority(
msg, PulsarMessage.DEFAULT_PRIORITY);
PulsarMessage.readJMSPriority(msg);
return Utils.mapPriorityToPartition(
key,
metadata.numPartitions(),
Expand Down Expand Up @@ -1348,8 +1347,8 @@ private static void replaceIncomingMessageList(Consumer c) {
new Comparator<Message>() {
@Override
public int compare(Message o1, Message o2) {
int priority1 = MessagePriorityGrowableArrayBlockingQueue.getPriority(o1);
int priority2 = MessagePriorityGrowableArrayBlockingQueue.getPriority(o2);
int priority1 = PulsarMessage.readJMSPriority(o1);
int priority2 = PulsarMessage.readJMSPriority(o2);
return Integer.compare(priority2, priority1);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ protected PulsarMessage applyMessage(
this.correlationId = Base64.getDecoder().decode(msg.getProperty("JMSCorrelationID"));
}
if (msg.hasProperty("JMSPriority")) {
this.jmsPriority = readJMSPriority(msg, Message.DEFAULT_PRIORITY);
this.jmsPriority = readJMSPriority(msg);
}
if (msg.hasProperty("JMSDeliveryMode")) {
try {
Expand Down Expand Up @@ -1478,14 +1478,17 @@ public org.apache.pulsar.client.api.Message<?> getReceivedPulsarMessage() {
return receivedPulsarMessage;
}

public static int readJMSPriority(org.apache.pulsar.client.api.Message<?> msg, int defaultValue) {
public static int readJMSPriority(org.apache.pulsar.client.api.Message<?> msg) {
if (msg.hasProperty("JMSPriority")) {
try {
return Integer.parseInt(msg.getProperty("JMSPriority"));
int value = Integer.parseInt(msg.getProperty("JMSPriority"));
if (value < 0 || value >= 10) {
return PulsarMessage.DEFAULT_PRIORITY;
}
} catch (NumberFormatException err) {
// cannot decode priority, not a big deal as it is not supported in Pulsar
}
}
return defaultValue;
return PulsarMessage.DEFAULT_PRIORITY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private static void test(int... priorities) {
}

private static int getPriority(Message m) {
return PulsarMessage.readJMSPriority(m, PulsarMessage.DEFAULT_PRIORITY);
return PulsarMessage.readJMSPriority(m);
}

private static Message messageWithPriority(int priority) {
Expand Down

0 comments on commit aee5b24

Please sign in to comment.