diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 31d6aab9..8cbc0bca 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -1379,15 +1379,16 @@ public int compare(Message o1, Message o2) { } } - private static void setReceiverQueueSizeForJMSPriority(ConsumerBase consumerBase) throws Exception { + private static void setReceiverQueueSizeForJMSPriority(ConsumerBase consumerBase) + throws Exception { Field consumersField = MultiTopicsConsumerImpl.class.getDeclaredField("consumers"); consumersField.setAccessible(true); ConcurrentHashMap> consumers = - (ConcurrentHashMap) consumersField.get(consumerBase); + (ConcurrentHashMap) consumersField.get(consumerBase); Method setCurrentReceiverQueueSizeMethod = - ConsumerImpl.class.getDeclaredMethod("setCurrentReceiverQueueSize", int.class); + ConsumerImpl.class.getDeclaredMethod("setCurrentReceiverQueueSize", int.class); setCurrentReceiverQueueSizeMethod.setAccessible(true); // set the queue size for each consumer based on the partition index @@ -1395,7 +1396,7 @@ private static void setReceiverQueueSizeForJMSPriority(ConsumerBase consumerBase // this way the backlog is drained more quickly for the higher priority partitions int numConsumers = consumers.size(); int sumPriorities = - (numConsumers * (numConsumers + 1)) / 2; // 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + (numConsumers * (numConsumers + 1)) / 2; // 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 int receiverQueueSize = consumerBase.getCurrentReceiverQueueSize(); for (ConsumerImpl consumer : consumers.values()) {