Is it possible to manually acknowledge messages via msgId? #18544
-
I didn't find the relevant API here: So I tried it with the following code: PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl("http://127.0.0.1:8080")
.build();
String topic ="persistent://public/default/topic-test-partition-1";
Message<byte[]> id = admin.topics().getMessageById(topic, 407, 5);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://127.0.0.1:6650")
.build();
Consumer consumer = client.newConsumer()
.topic("persistent://public/default/topic-test-partition-1")
.subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.ackTimeout(0, TimeUnit.SECONDS)
.subscribe();
consumer.acknowledge(id); it did not achieve the desired effect, and I can still query this message through this API. admin.topics().getInternalStats(topic);
{"individuallyDeletedMessages":"[(407:4..407:5]]"} Could someone please help to explain this to me? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
Hi @crossoverJie,
But I think this is the expected behavior: you acknowledge (mark delete) the message Can you describe what's your desired effect? By the way, you can just use consumer.acknowledge(new MessageIdImpl(legerID, entryID, partitionIndex)); |
Beta Was this translation helpful? Give feedback.
Hi @crossoverJie,
But I think this is the expected behavior: you acknowledge (mark delete) the message
407:5
soindividuallyDeletedMessages
records this message as deleted.Can you describe what's your desired effect?
By the way, you can just use
MessageIdImpl
to specify a message id: