Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for (optional) ack timeout and nack delay for consumers #67

Closed
wants to merge 30 commits into from

Conversation

dionjansen
Copy link
Contributor

Implements: #45 , #46

General comments

  • I've added a reference to Microsoft.Bcl.AsyncInterfaces to support the IDisposable interface in net5.0. I only had this issue in VSCode for the test and sample projects. This does not influence the tests or running samples on VSC (on mac).
  • This is still a draft mostly to get the discussion going.
  • I would like to add some more unit tests though this is a bit hard without a proper mocking framework, would you be open to introducing something like Moq to allow testing classes in a shallow way? Or do you rather have I follow a different unit test strategy?

Discussion

I think we can split up the discussion in 2 parts.

  1. The integration in the rest of the lib: where what is created, started and what is optional, general structure/ logic of the lib
  2. The implementation of the tracker: performance concerns, disposing

@blankensteiner I think firstly I would like to get some thoughts on the current implementation, I mostly focussed now on setting things up so I don't break existing processes:

Integration

  1. Right now I create a IMessageAcksTracker instance in the PulsarClient, though when clients do not have the consumer configured to use the tracking, a dummy InactiveMessageAcksTracker is passed. This instance is passed to the ConsumerChannelFactory and used when a channel is created. Does this setup makes sense to you?
  2. The ConsumerChannel now expects a MessageQueue which wraps the tracker and the AsyncQueue. The channel now informs the tracker through this queue when acking (and later nacking which I can add later) and the dequeue method automatically starts tracking messages received.
  3. When the tracker is started it runs indefinitely I find it hard to find a good place to start this thread, also since I need the consumer to call RedeliverUnacknowledgedMessages. Any ideas on how to improve this pattern? Perhaps a static method like some of the StateMonitor methods.
  4. I wonder how batched messages fit into all this, does a batch have a single message id? And should I be able to just redeliver that message id to release the batch back to the broker?

Implementation

  1. The MessageAcksTracker uses a polling mechanism to re-check for timed out messages (either due to being unacked for too long or the nack delay has been exceeded). Is this (generally) what you were thinking about too? Alternatively I could think of an approach where polling is done through a Timer. Or we could create individual Tasks for each added message to the tracker but I'm concerned of the overhead created by this.
  2. Atm I haven't considered the scenario yet that only a nack delay is configured and not an ack timeout in which case we will not have to track all dequeued messages.

@blankensteiner
Copy link
Contributor

Hi @dionjansen
Thanks for the PR!
I'll try and answer the best I can :-)

I've added a reference to Microsoft.Bcl.AsyncInterfaces to support the IDisposable interface in net5.0. I only had this issue in VSCode for the test and sample projects. This does not influence the tests or running samples on VSC (on mac).

We have tried adding support for Visual Studio Code before, but sadly it's just not a pleasant road to go down. I have no idea why Microsoft has created two IDE's for C# and doesn't ensure that they behave the same.
Visual Studio Code will create nonsense warnings and require unnecessary changes to the code-base and therefore it's not supported for developing DotPulsar. Visual Studio Community Edition and the commercial offerings and Rider are supported, so you have to use one of those.

I would like to add some more unit tests though this is a bit hard without a proper mocking framework, would you be open to introducing something like Moq to allow testing classes in a shallow way? Or do you rather have I follow a different unit test strategy?

Feel free to add one or more of these (if you need them):

  • AutoFixture
  • AutoFixture.AutoNSubstitute
  • AutoFixture.Xunit2
  • NSubstitute

Right now I create a IMessageAcksTracker instance in the PulsarClient, though when clients do not have the consumer configured to use the tracking, a dummy InactiveMessageAcksTracker is passed. This instance is passed to the ConsumerChannelFactory and used when a channel is created. Does this setup makes sense to you?

Yes.

When the tracker is started it runs indefinitely I find it hard to find a good place to start this thread, also since I need the consumer to call RedeliverUnacknowledgedMessages. Any ideas on how to improve this pattern? Perhaps a static method like some of the StateMonitor methods.

The question is if you actually need to consumer or just the consumer channel. The tracking should start and end together with the MessageQueue/ConsumerChannel.

I wonder how batched messages fit into all this, does a batch have a single message id? And should I be able to just redeliver that message id to release the batch back to the broker?

That's a really good question. Bookkeeper stores batched messages as one, so if you have a batched message consisting of 5 messages and you ack 4 of them, but the last times out and you ask the broker to redeliver, you will get the entire batch again. You could keep track of this, but it will hurt performance. I don't know what the other clients do here, but maybe you could test that?

The MessageAcksTracker uses a polling mechanism to re-check for timed out messages (either due to being unacked for too long or the nack delay has been exceeded). Is this (generally) what you were thinking about too? Alternatively I could think of an approach where polling is done through a Timer. Or we could create individual Tasks for each added message to the tracker but I'm concerned of the overhead created by this.

I agree that a timer/task per message will hurt performance too much. Have one task for the entire Consumer/MessageQueue is the right solution. Waking up and looking at what needs to be redelivered. Here we need to find a thread-safe and performant way of storing and accessing this information.

Atm I haven't considered the scenario yet that only a nack delay is configured and not an ack timeout in which case we will not have to track all dequeued messages.

Just a boolean check on dequeue and ack? We also need to handle cumulative acknowledgment.

Consider this ackTimeout implementation. First ackTimeout should be giving as a TimeSpan (instead of a an int or long as milli- or micro-seconds).

When a message is dequeued, and if we have an ackTimeout, then we store the MessageId and StopWatch.GetTimestamp() in an "AwaitingAck" struct in a ConcurrencyQueue, let's call it "AwaitingAcks". Other suggestions for concurrent collections with fast insertion are welcome.

When a message is acknowledged, and if we have an ackTimeout, then we store the acks instead of removing them from "AwaitingAcks". If we want to remove them right away, then we need the "AwaitingAcks" collection to support both iteration and random deletion. We only need to store the highest cumulative ack we see (if there is one) and the MessageIds not included by that cumulative ack.

When the ackTracker wakes up and has calculated what ackTimeout is in StopWatch ticks (those are not the same as TimeSpan ticks). It will call StopWatch.GetTimestamp(). We will now TryPeek and Dequeue from "AwaitingAck" for as long as the tracker timestamp - AwaitingAck.Timestamp is larger than the calculated timeout.
If the MessageId is not acknowledged, it's added to a CommandRedeliverUnacknowledgedMessages (that we are reusing) and then send it (if MessageIds were added).
If the MessageId was acknowledged, then we can remove that MessageId from the AckedMessageIds.

Consider this nackTimeout implementation.

When a message is nacked, we added the MessageId(s) to a CommandRedeliverUnacknowledgedMessages that we are reusing (should our "RedeliverUnacknowledgedMessages" taking an enumerable of messageId actually have been called "NegativeAcknowledge"?).

When the nackTracker wakes up it will check if the CommandRedeliverUnacknowledgedMessages has MessageIds and if yes, then send it.

Writing such a detailed implementation description was not what I intended, but when I first get going.... :-D
Anyway, if it is unclear, then I can try and make those classes/structs and push them to master.

@blankensteiner blankensteiner added the enhancement New feature or request label Dec 22, 2020
@dionjansen
Copy link
Contributor Author

@blankensteiner I've tried to follow your suggestions and simplify the implementation a bit: I focussed just on the unacked message tracker. Perhaps it is anyway a good idea to separate these two mechanisms (nack tracking and unacked tracking) since they are configured independently of each other.

Feel free to add one or more of these (if you need them):

  • AutoFixture
  • AutoFixture.AutoNSubstitute
  • AutoFixture.Xunit2
  • NSubstitute

Added NSubstitute and AutoFixture.AutoNSubstitute. I started testing the unacked tracker with this but I'm not sure I'm using the correct pattern to verify if messages are being redelivered under different test conditions. Since I don't see any other unit tests that tests internal classes in this way I'm not sure if this strategy agrees with the rest of the lib, so let me know what you think.

The question is if you actually need to consumer or just the consumer channel. The tracking should start and end together with the MessageQueue/ConsumerChannel.

I am starting the thread now in the Pulsar client

IUnackedMessageTracker unackedTracker = options.AckTimeoutMillis > 0
? new UnackedMessageTracker(TimeSpan.FromMilliseconds(options.AckTimeoutMillis), TimeSpan.FromSeconds(1))
: new InactiveUnackedMessageTracker();
unackedTracker.Start(consumer);

The tracker is then passed to the channel factory, so it can be passed to a message queue that is passed to the channel when created. The tracker loop is stopped when disposed which occurs when the message queue is disposed, which in terms occurs when the channel is disposed. I'm still using IConsumer for start in order to avoid duplicate implementation of RedeliverUnacknowledgedMessages in the consumer.

That's a really good question. Bookkeeper stores batched messages as one, so if you have a batched message consisting of 5 messages and you ack 4 of them, but the last times out and you ask the broker to redeliver, you will get the entire batch again. You could keep track of this, but it will hurt performance. I don't know what the other clients do here, but maybe you could test that?

From what I can see in the ConsumerImpl in java in case of batch messages only one item is put in the tracker for a batch message using (ledgerId, entryId, partitionIndex). Then when acking it looks like the batch is again treated as a single message, but only if markAckForBatchMessage returns false which indicates not all messages in the batch have been acked yet. I don't really see from this implementation how they "keep track" of what to ack within the batch in this way, but there is a lot going one here that I can't make sense of. Does the adding removing mechanism I point out here to the tracker make any sense to you?

Consider this ackTimeout implementation. First ackTimeout should be giving as a TimeSpan (instead of a an int or long as milli- or micro-seconds).

done, I kept the configuration options of the consumer to milliseconds though.

When a message is dequeued, and if we have an ackTimeout, then we store the MessageId and StopWatch.GetTimestamp() in an "AwaitingAck" struct in a ConcurrencyQueue, let's call it "AwaitingAcks". Other suggestions for concurrent collections with fast insertion are welcome.

I was struggling a bit to create a comparable TimeSpan (since as you point out stopwatch ticks != timespan ticks). I followed this article and concluded:

public TimeSpan Elapsed => TimeSpan.FromTicks(
(long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond));

Not casting frequency explicitly to double results in considerable loss of accuracy.

When a message is acknowledged, and if we have an ackTimeout, then we store the acks instead of removing them from "AwaitingAcks". If we want to remove them right away, then we need the "AwaitingAcks" collection to support both iteration and random deletion. We only need to store the highest cumulative ack we see (if there is one) and the MessageIds not included by that cumulative ack.

How can I determine if there is a highest cumulative ack from a MessageId instance? And if we do this wouldn't we also need some kind of removal from the unacked list that removes until this highest value, like: removeMessagesTill?

When the ackTracker wakes up and has calculated what ackTimeout is in StopWatch ticks (those are not the same as TimeSpan ticks). It will call StopWatch.GetTimestamp(). We will now TryPeek and Dequeue from "AwaitingAck" for as long as the tracker timestamp - AwaitingAck.Timestamp is larger than the calculated timeout.
If the MessageId is not acknowledged, it's added to a CommandRedeliverUnacknowledgedMessages (that we are reusing) and then send it (if MessageIds were added).
If the MessageId was acknowledged, then we can remove that MessageId from the AckedMessageIds.

I tried to capture this in

while (_awaitingAcks.TryPeek(out AwaitingAck awaiting)
&& awaiting.Elapsed > _ackTimeout)
{
if (_awaitingAcks.TryDequeue(out awaiting))
{
if (!_acked.Contains(awaiting.MessageId))
result.Add(awaiting.MessageId);
else
_acked.Remove(awaiting.MessageId);
}
}
. The only thing I'm still unsure about is the accumulated acking.

Consider this nackTimeout implementation.

When a message is nacked, we added the MessageId(s) to a CommandRedeliverUnacknowledgedMessages that we are reusing (should our "RedeliverUnacknowledgedMessages" taking an enumerable of messageId actually have been called "NegativeAcknowledge"?).

When the nackTracker wakes up it will check if the CommandRedeliverUnacknowledgedMessages has MessageIds and if yes, then send it.

I will implement the nack tracker if you are happy with the unacked tracker as it stands (which might also be refactored into one single tracker if two trackers are a performance concern to you).

Writing such a detailed implementation description was not what I intended, but when I first get going.... :-D
Anyway, if it is unclear, then I can try and make those classes/structs and push them to master.

Let me know what you think, this is still a bit of a learning process for me both on the internals of this lib as well as C# / Pulsar details. Thanks in advance 👍 .

src/DotPulsar/ConsumerOptions.cs Outdated Show resolved Hide resolved
src/DotPulsar/ConsumerOptions.cs Outdated Show resolved Hide resolved
src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs Outdated Show resolved Hide resolved
src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs Outdated Show resolved Hide resolved
src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
@blankensteiner
Copy link
Contributor

Hi @dionjansen
Before doing a deep dive into the implementation, I have some comments on little things that can quickly be fixed :-)

@dionjansen
Copy link
Contributor Author

@blankensteiner thanks for the comments I managed to fix most of it with a view side comments/ questions (see above). Let me know what you think

src/DotPulsar/Internal/ConsumerChannel.cs Outdated Show resolved Hide resolved
src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs Outdated Show resolved Hide resolved
src/DotPulsar/Internal/UnackedMessageTracker.cs Outdated Show resolved Hide resolved
src/DotPulsar/Internal/UnackedMessageTracker.cs Outdated Show resolved Hide resolved
src/DotPulsar/Internal/MessageQueue.cs Outdated Show resolved Hide resolved
@dionjansen
Copy link
Contributor Author

@blankensteiner sorry for the delay, I addressed all your remarks, let me know what you think!

@jbvanzuylen
Copy link

@blankensteiner @dionjansen any idea when this work will be finished and merged? Looks like a lot of work has been done and reviewed but wondering what is missing to cross the finish line.

@blankensteiner
Copy link
Contributor

Hi @jbvanzuylen
Good question :-)
@dionjansen when you feel the PR is ready, poke me and I'll review it again :-)

@dionjansen
Copy link
Contributor Author

@jbvanzuylen @blankensteiner yes this dropped very far off my radar, unfortunately. I see quite a lot has changed since this implementation, I'll try to open a new PR from the latest version.

@dionjansen
Copy link
Contributor Author

@blankensteiner I reworked the implementation from scratch based on version 1.1.2 and opened a new PR #83. I've also added support for negative acknowledgement delays next to the unacked tracking.

Closing this PR since it's no longer needed.

CC: @jbvanzuylen

@dionjansen dionjansen closed this Jul 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants