diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 0143e6c9..494fedcc 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -91,6 +91,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.impl.ConsumerBase; +import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.auth.AuthenticationToken; @@ -1370,6 +1371,26 @@ public int compare(Message o1, Message o2) { ((BlockingQueue) oldQueue).drainTo(newQueue); incomingMessages.set(c, newQueue); + + + Field consumersField = MultiTopicsConsumerImpl.class + .getDeclaredField("consumers"); + + consumersField.setAccessible(true); + + ConcurrentHashMap> consumers = (ConcurrentHashMap) consumersField + .get(consumerBase); + Method setCurrentReceiverQueueSizeMethod = ConsumerImpl.class.getDeclaredMethod("setCurrentReceiverQueueSize", int.class); + setCurrentReceiverQueueSizeMethod.setAccessible(true); + for (ConsumerImpl consumer : consumers.values()) { + String topic = consumer.getTopic(); + int last = topic.lastIndexOf('-'); + int prio = Integer.parseInt(topic.substring(last + 1)); + int size = (prio + 1) * 10; + log.info("Setting queue size for {} to {}", topic, size); + setCurrentReceiverQueueSizeMethod.invoke(consumer, size); + } + } catch (Exception err) { throw new RuntimeException(err); }