From c088bbea3d6c297744bc36bc2a6099aa7240d44f Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 6 May 2024 10:32:14 +0200 Subject: [PATCH] Remove file --- pulsar-jms-integration-tests/pom.xml | 4 +- pulsar-jms/pom.xml | 8 +- .../oss/pulsar/jms/PriorityExample.java | 134 ------------------ 3 files changed, 6 insertions(+), 140 deletions(-) delete mode 100644 pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 8ac6fce5..dff7fc1c 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -103,8 +103,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 00136c60..d6f93692 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -128,10 +128,10 @@ copy filters - - - - + + + + diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java deleted file mode 100644 index be2749cf..00000000 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityExample.java +++ /dev/null @@ -1,134 +0,0 @@ -package com.datastax.oss.pulsar.jms; - -import com.google.common.collect.ImmutableMap; -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.JMSConsumer; -import javax.jms.JMSContext; -import javax.jms.JMSProducer; -import javax.jms.Message; -import javax.jms.Queue; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.impl.ConsumerBase; -import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; -import org.apache.pulsar.common.policies.data.BacklogQuota; - -public class PriorityExample -{ - - - public static void main(String[] args) throws Exception - { - Map producerConfig = new HashMap<>(); - producerConfig.put("batchingEnabled", false); - Map consumerConfig = new HashMap<>(); - - // Observation: - // receiverQueueSize = 128 - // 128 stay in the queue for prio 0 - // at every re-fill round we get 64 prio 9 messages and stop refilling for 9 - // at every re-fill round we get 64 prio 0 messages and stop refilling for 0 - - consumerConfig.put("receiverQueueSize", "128"); - //consumerConfig.put("maxTotalReceiverQueueSizeAcrossPartitions", "64"); - try (PulsarConnectionFactory factory = new PulsarConnectionFactory( - ImmutableMap.of("jms.useServerSideFiltering", "true", "jms.clientId", - "test", "producerConfig", producerConfig, - "consumerConfig", consumerConfig, - "jms.emulateTransactions", true, - "jms.enableJMSPriority", "true"));) { - - Queue topic1; - try (JMSContext jmsContext = factory.createContext()) { - topic1 = jmsContext.createQueue("mytopic"); - factory.getPulsarAdmin().namespaces().setBacklogQuota("public/default", BacklogQuota - .builder() - .limitSize(-1) - .limitTime(-1) - .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold) - .build()); - try { - factory.getPulsarAdmin().topics().deletePartitionedTopic(topic1.getQueueName()); - } catch (PulsarAdminException.NotFoundException notFoundException) { - - } - factory.getPulsarAdmin().topics().createPartitionedTopic(topic1.getQueueName(), 10); - } - - try (JMSContext jmsContext = factory.createContext(JMSContext.CLIENT_ACKNOWLEDGE); - ) - { - try (JMSConsumer consumer = jmsContext.createConsumer(topic1, null);){ - } - - int numMessages = 20000; - for (int i = 0; i < numMessages; i++) { - JMSProducer producer = jmsContext.createProducer(); - if (i % 2 == 0) { - producer.setPriority(0); - } else { - producer.setPriority(9); - } - producer.setPriority(i % 10); - String text = "text-"+i+"-"+producer.getPriority(); - producer.send(topic1, text); - //System.out.println("Sent message "+ i + " with priority: " + producer.getPriority() + " and body: " + text); - if (i % 1000 == 0) { - System.out.println("Sent " + i + " messages"); - } - } - - // give time to Prom to see the backlog - Thread.sleep(60000); - - int lastPrio = -1; - int groupSize = 0; - try (JMSConsumer consumer = jmsContext.createConsumer(topic1, null);) { - for (int i = 0; i < numMessages; i++) { - - - - Message receive = consumer.receive(); - if (lastPrio != receive.getJMSPriority() && i > 0) - { - System.out.println("************************************************************************+"); - System.out.println("AFTER "+groupSize +" with prio "+lastPrio); - dumpInternalQueue(consumer); - groupSize = 0; - } - lastPrio = receive.getJMSPriority(); - groupSize++; - System.out.println("Received priority: " + receive.getJMSPriority() + " and body: " - + receive.getBody(String.class)); - Thread.sleep(10); - receive.acknowledge(); - } - System.out.println("************************************************************************+"); - System.out.println("AFTER "+groupSize +" with prio "+lastPrio); - dumpInternalQueue(consumer); - } - } - - - } - } - - private static void dumpInternalQueue(JMSConsumer consumer) throws Exception { - PulsarMessageConsumer pulsarMessageConsumer = ((PulsarJMSConsumer) consumer).asPulsarMessageConsumer(); - ConsumerBase consumer1 = pulsarMessageConsumer.getConsumer(); - Field incomingMessages = ConsumerBase.class.getDeclaredField("incomingMessages"); - incomingMessages.setAccessible(true); - - Field pausedConsumers = MultiTopicsConsumerImpl.class.getDeclaredField("pausedConsumers"); - pausedConsumers.setAccessible(true); - Object o = pausedConsumers.get(consumer1); - - MessagePriorityGrowableArrayBlockingQueue oldQueue = - (MessagePriorityGrowableArrayBlockingQueue) incomingMessages.get(consumer1); - System.out.println("Contents of the internal queue: "+ oldQueue.getPriorityStats()+" paused "+o); - - } - -} \ No newline at end of file