Skip to content

Commit

Permalink
more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 4, 2024
1 parent 17068f6 commit d5e21b1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,9 @@ public List<Message> toList() {
throw new UnsupportedOperationException();
}

public String getPriorityStats() {
return Arrays.toString(numberMessagesByPrority);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import javax.jms.Queue;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;

public class PriorityExample
Expand All @@ -23,8 +24,15 @@ public static void main(String[] args) throws Exception
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("batchingEnabled", false);
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("receiverQueueSize", "64");
consumerConfig.put("maxTotalReceiverQueueSizeAcrossPartitions", "64");

// Observation:
// receiverQueueSize = 128
// 128 stay in the queue for prio 0
// at every re-fill round we get 64 prio 9 messages and stop refilling for 9
// at every re-fill round we get 64 prio 0 messages and stop refilling for 0

consumerConfig.put("receiverQueueSize", "128");
//consumerConfig.put("maxTotalReceiverQueueSizeAcrossPartitions", "64");
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(
ImmutableMap.of("jms.useServerSideFiltering", "true", "jms.clientId",
"test", "producerConfig", producerConfig,
Expand Down Expand Up @@ -76,24 +84,27 @@ public static void main(String[] args) throws Exception
try (JMSConsumer consumer = jmsContext.createConsumer(topic1, null);) {
for (int i = 0; i < numMessages; i++) {

//dumpInternalQueue(consumer);


Message receive = consumer.receive();
if (lastPrio != receive.getJMSPriority() && i > 0)
{
System.out.println("************************************************************************+");
System.out.println("AFTER "+groupSize);
System.out.println("AFTER "+groupSize +" with prio "+lastPrio);
dumpInternalQueue(consumer);
groupSize = 0;
}
lastPrio = receive.getJMSPriority();
groupSize++;
System.out.println("Received priority: " + receive.getJMSPriority() + " and body: "
+ receive.getBody(String.class));
Thread.sleep(100);
Thread.sleep(10);
receive.acknowledge();
}
System.out.println("************************************************************************+");
System.out.println("AFTER "+groupSize +" with prio "+lastPrio);
dumpInternalQueue(consumer);
}

}


Expand All @@ -106,13 +117,14 @@ private static void dumpInternalQueue(JMSConsumer consumer) throws Exception {
Field incomingMessages = ConsumerBase.class.getDeclaredField("incomingMessages");
incomingMessages.setAccessible(true);

Field pausedConsumers = MultiTopicsConsumerImpl.class.getDeclaredField("pausedConsumers");
pausedConsumers.setAccessible(true);
Object o = pausedConsumers.get(consumer1);

MessagePriorityGrowableArrayBlockingQueue oldQueue =
(MessagePriorityGrowableArrayBlockingQueue) incomingMessages.get(consumer1);
System.out.println("Contents of the internal queue: ");
AtomicInteger pos = new AtomicInteger();
oldQueue.forEach(m -> {
System.out.println("#" + pos.getAndIncrement() + " Message with priority: " + m.getProperty("JMSPriority"));
});
System.out.println("Contents of the internal queue: "+ oldQueue.getPriorityStats()+" paused "+o);

}

}

0 comments on commit d5e21b1

Please sign in to comment.