Skip to content

Commit

Permalink
tweak received queue size depending on priority
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 4, 2024
1 parent d5e21b1 commit add756e
Showing 1 changed file with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
Expand Down Expand Up @@ -1370,6 +1371,26 @@ public int compare(Message o1, Message o2) {
((BlockingQueue<Message>) oldQueue).drainTo(newQueue);

incomingMessages.set(c, newQueue);


Field consumersField = MultiTopicsConsumerImpl.class
.getDeclaredField("consumers");

consumersField.setAccessible(true);

ConcurrentHashMap<String, ConsumerImpl<?>> consumers = (ConcurrentHashMap) consumersField
.get(consumerBase);
Method setCurrentReceiverQueueSizeMethod = ConsumerImpl.class.getDeclaredMethod("setCurrentReceiverQueueSize", int.class);
setCurrentReceiverQueueSizeMethod.setAccessible(true);
for (ConsumerImpl<?> consumer : consumers.values()) {
String topic = consumer.getTopic();
int last = topic.lastIndexOf('-');
int prio = Integer.parseInt(topic.substring(last + 1));
int size = (prio + 1) * 10;
log.info("Setting queue size for {} to {}", topic, size);
setCurrentReceiverQueueSizeMethod.invoke(consumer, size);
}

} catch (Exception err) {
throw new RuntimeException(err);
}
Expand Down

0 comments on commit add756e

Please sign in to comment.