diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java index d30c97d0..217761e2 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/MessagePriorityGrowableArrayBlockingQueue.java @@ -222,5 +222,9 @@ public List toList() { throw new UnsupportedOperationException(); } + public String getPriorityStats() { + return Arrays.toString(numberMessagesByPrority); + } + } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java index 6c2e1c7e..eabe9e0a 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java @@ -12,6 +12,7 @@ import javax.jms.Queue; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ConsumerBase; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.common.policies.data.BacklogQuota; public class PriorityExample @@ -23,8 +24,15 @@ public static void main(String[] args) throws Exception Map producerConfig = new HashMap<>(); producerConfig.put("batchingEnabled", false); Map consumerConfig = new HashMap<>(); - consumerConfig.put("receiverQueueSize", "64"); - consumerConfig.put("maxTotalReceiverQueueSizeAcrossPartitions", "64"); + + // Observation: + // receiverQueueSize = 128 + // 128 stay in the queue for prio 0 + // at every re-fill round we get 64 prio 9 messages and stop refilling for 9 + // at every re-fill round we get 64 prio 0 messages and stop refilling for 0 + + consumerConfig.put("receiverQueueSize", "128"); + //consumerConfig.put("maxTotalReceiverQueueSizeAcrossPartitions", "64"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory( ImmutableMap.of("jms.useServerSideFiltering", "true", "jms.clientId", "test", "producerConfig", producerConfig, @@ -76,24 +84,27 @@ public static void main(String[] args) throws Exception try (JMSConsumer consumer = jmsContext.createConsumer(topic1, null);) { for (int i = 0; i < numMessages; i++) { - //dumpInternalQueue(consumer); + Message receive = consumer.receive(); if (lastPrio != receive.getJMSPriority() && i > 0) { System.out.println("************************************************************************+"); - System.out.println("AFTER "+groupSize); + System.out.println("AFTER "+groupSize +" with prio "+lastPrio); + dumpInternalQueue(consumer); groupSize = 0; } lastPrio = receive.getJMSPriority(); groupSize++; System.out.println("Received priority: " + receive.getJMSPriority() + " and body: " + receive.getBody(String.class)); - Thread.sleep(100); + Thread.sleep(10); receive.acknowledge(); } + System.out.println("************************************************************************+"); + System.out.println("AFTER "+groupSize +" with prio "+lastPrio); + dumpInternalQueue(consumer); } - } @@ -106,13 +117,14 @@ private static void dumpInternalQueue(JMSConsumer consumer) throws Exception { Field incomingMessages = ConsumerBase.class.getDeclaredField("incomingMessages"); incomingMessages.setAccessible(true); + Field pausedConsumers = MultiTopicsConsumerImpl.class.getDeclaredField("pausedConsumers"); + pausedConsumers.setAccessible(true); + Object o = pausedConsumers.get(consumer1); + MessagePriorityGrowableArrayBlockingQueue oldQueue = (MessagePriorityGrowableArrayBlockingQueue) incomingMessages.get(consumer1); - System.out.println("Contents of the internal queue: "); - AtomicInteger pos = new AtomicInteger(); - oldQueue.forEach(m -> { - System.out.println("#" + pos.getAndIncrement() + " Message with priority: " + m.getProperty("JMSPriority")); - }); + System.out.println("Contents of the internal queue: "+ oldQueue.getPriorityStats()+" paused "+o); + } } \ No newline at end of file