Skip to content

Commit

Permalink
[feat][client] PIP-374: Visibility of messages in receiverQueue for t…
Browse files Browse the repository at this point in the history
…he consumers (apache#23303)
  • Loading branch information
shibd authored Sep 17, 2024
1 parent 9ebd979 commit 2e98736
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 4 deletions.
4 changes: 2 additions & 2 deletions pip/pip-374.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ Since we added a default method onArrival() in interface, one who has provided t
<!--
Updated afterwards
-->
* Mailing List discussion thread:
* Mailing List voting thread:
* Mailing List discussion thread: https://lists.apache.org/thread/hcfpm4j6hpwxb2olfrro8g4dls35q8rx
* Mailing List voting thread: https://lists.apache.org/thread/wrr02s4cdzqmo1vonp92w6229qo0rv0z
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.KeyValue;
Expand Down Expand Up @@ -870,6 +871,101 @@ public void onPartitionsChange(String topicName, int partitions) {
Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
}

@Test(dataProvider = "topicPartition")
public void testConsumerInterceptorForOnArrive(int topicPartition) throws PulsarClientException,
InterruptedException, PulsarAdminException {
String topicName = "persistent://my-property/my-ns/on-arrive";
if (topicPartition > 0) {
admin.topics().createPartitionedTopic(topicName, topicPartition);
}

final int receiveQueueSize = 100;
final int totalNumOfMessages = receiveQueueSize * 2;

// The onArrival method is called for half of the receiveQueueSize messages before beforeConsume is called for all messages.
CountDownLatch latch = new CountDownLatch(receiveQueueSize / 2);
final AtomicInteger onArrivalCount = new AtomicInteger(0);
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
public void close() {}

@Override
public Message<String> onArrival(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
msg.getMessageBuilder().addProperty().setKey("onArrival").setValue("1");
latch.countDown();
onArrivalCount.incrementAndGet();
return msg;
}

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {

}

@Override
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {

}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
}

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-arrive")
.intercept(interceptor)
.receiverQueueSize(receiveQueueSize)
.subscribe();

for (int i = 0; i < totalNumOfMessages; i++) {
producer.send("Mock message");
}

// Not call receive message, just wait for onArrival interceptor.
latch.await();
Assert.assertEquals(latch.getCount(), 0);

for (int i = 0; i < totalNumOfMessages; i++) {
Message<String> message = consumer.receive();
MessageImpl<String> msgImpl;
if (message instanceof MessageImpl<String>) {
msgImpl = (MessageImpl<String>) message;
} else if (message instanceof TopicMessageImpl<String>) {
msgImpl = (MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage();
} else {
throw new ClassCastException("Message type is not expected");
}
boolean haveKey = false;
for (KeyValue keyValue : msgImpl.getMessageBuilder().getPropertiesList()) {
if ("onArrival".equals(keyValue.getKey())) {
haveKey = true;
}
}
Assert.assertTrue(haveKey);
}
Assert.assertEquals(totalNumOfMessages, onArrivalCount.get());

producer.close();
consumer.close();
}

private void produceAndConsume(int msgCount, Producer<byte[]> producer, Reader<byte[]> reader)
throws PulsarClientException {
for (int i = 0; i < msgCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,44 @@ public interface ConsumerInterceptor<T> extends AutoCloseable {
*/
void close();

/**
* This method is called when a message arrives in the consumer.
*
* <p>This method provides visibility into the messages that have been received
* by the consumer but have not yet been processed. This can be useful for
* monitoring the state of the consumer's receiver queue and understanding
* the consumer's processing rate.
*
* <p>The method is allowed to modify the message, in which case the modified
* message will be returned.
*
* <p>Any exception thrown by this method will be caught by the caller, logged,
* but not propagated to the client.
*
* <p>Since the consumer may run multiple interceptors, a particular
* interceptor's <tt>onArrival</tt> callback will be called in the order
* specified by {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The
* first interceptor in the list gets the consumed message, the following
* interceptor will be passed the message returned by the previous interceptor,
* and so on. Since interceptors are allowed to modify the message, interceptors
* may potentially get the messages already modified by other interceptors.
* However, building a pipeline of mutable interceptors that depend on the output
* of the previous interceptor is discouraged, because of potential side-effects
* caused by interceptors potentially failing to modify the message and throwing
* an exception. If one of the interceptors in the list throws an exception from
* <tt>onArrival</tt>, the exception is caught, logged, and the next interceptor
* is called with the message returned by the last successful interceptor in the
* list, or otherwise the original consumed message.
*
* @param consumer the consumer which contains the interceptor
* @param message the message that has arrived in the receiver queue
* @return the message that is either modified by the interceptor or the same
* message passed into the method
*/
default Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
return message;
}

/**
* This is called just before the message is returned by
* {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,14 @@ public String toString() {
+ '}';
}

protected Message<T> onArrival(Message<T> message) {
if (interceptors != null) {
return interceptors.onArrival(this, message);
} else {
return message;
}
}

protected Message<T> beforeConsume(Message<T> message) {
if (interceptors != null) {
return interceptors.beforeConsume(this, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,9 +1301,10 @@ private void executeNotifyCallback(final MessageImpl<T> message) {
increaseAvailablePermits(cnx());
return;
}
Message<T> interceptMsg = onArrival(message);
if (hasNextPendingReceive()) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingReceivedCallback(interceptMsg, null);
} else if (enqueueMessageAndCheckBatchReceive(interceptMsg) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,38 @@ public ConsumerInterceptors(List<ConsumerInterceptor<T>> interceptors) {
this.interceptors = interceptors;
}


/**
* This method is called when a message arrives in the consumer.
* <p>
* This method calls {@link ConsumerInterceptor#onArrival(Consumer, Message) method for each
* interceptor.
* <p>
* This method does not throw exceptions. If any of the interceptors in the chain throws an exception, it gets
* caught and logged, and next interceptor in int the chain is called with 'messages' returned by the previous
* successful interceptor beforeConsume call.
*
* @param consumer the consumer which contains the interceptors
* @param message message to be consume by the client.
* @return messages that are either modified by interceptors or same as messages passed to this method.
*/
public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
Message<T> interceptorMessage = message;
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptorMessage = interceptors.get(i).onArrival(consumer, interceptorMessage);
} catch (Throwable e) {
if (consumer != null) {
log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}",
consumer.getTopic(), consumer.getConsumerName(), e);
} else {
log.warn("Error executing interceptor beforeConsume callback", e);
}
}
}
return interceptorMessage;
}

/**
* This is called just before the message is returned by {@link Consumer#receive()},
* {@link MessageListener#received(Consumer, Message)} or the {@link java.util.concurrent.CompletableFuture}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,11 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
return new ConsumerInterceptors<T>(new ArrayList<>()) {

@Override
public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
return multiTopicInterceptors.onArrival(consumer, message);
}

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
return message;
Expand Down

0 comments on commit 2e98736

Please sign in to comment.