Skip to content

Commit

Permalink
Format PulsarConnectionFactory.java
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 7, 2024
1 parent 77a8ce3 commit 9a48d2b
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1379,23 +1379,24 @@ public int compare(Message o1, Message o2) {
}
}

private static void setReceiverQueueSizeForJMSPriority(ConsumerBase consumerBase) throws Exception {
private static void setReceiverQueueSizeForJMSPriority(ConsumerBase consumerBase)
throws Exception {
Field consumersField = MultiTopicsConsumerImpl.class.getDeclaredField("consumers");

consumersField.setAccessible(true);

ConcurrentHashMap<String, ConsumerImpl<?>> consumers =
(ConcurrentHashMap) consumersField.get(consumerBase);
(ConcurrentHashMap) consumersField.get(consumerBase);
Method setCurrentReceiverQueueSizeMethod =
ConsumerImpl.class.getDeclaredMethod("setCurrentReceiverQueueSize", int.class);
ConsumerImpl.class.getDeclaredMethod("setCurrentReceiverQueueSize", int.class);
setCurrentReceiverQueueSizeMethod.setAccessible(true);

// set the queue size for each consumer based on the partition index
// we set a higher number to the consumers for the higher priority partitions
// this way the backlog is drained more quickly for the higher priority partitions
int numConsumers = consumers.size();
int sumPriorities =
(numConsumers * (numConsumers + 1)) / 2; // 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10
(numConsumers * (numConsumers + 1)) / 2; // 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10
int receiverQueueSize = consumerBase.getCurrentReceiverQueueSize();

for (ConsumerImpl<?> consumer : consumers.values()) {
Expand Down

0 comments on commit 9a48d2b

Please sign in to comment.