Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for (optional) ack timeout and nack delay for consumers #67

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4eb2b06
Added missing reference to Microsoft.Bcl.AsyncInterfaces in samples a…
dionjansen Dec 19, 2020
dfb864a
nack delay option
dionjansen Dec 20, 2020
370e1c8
ack timeout config option
dionjansen Dec 20, 2020
278005f
naive implementation tracker and message queue
dionjansen Dec 20, 2020
79bc906
use message package for queue
dionjansen Dec 20, 2020
546d9e9
use int for ms values
dionjansen Dec 21, 2020
5e3cea6
Non generic interface
dionjansen Dec 21, 2020
19931bc
Added basic implementation for tracker, integrated messagequeue in co…
dionjansen Dec 21, 2020
295eab8
Create conditional tracker in client
dionjansen Dec 21, 2020
2be5114
Inform queue when acking
dionjansen Dec 21, 2020
163ea45
Cleanup references for VSCode
dionjansen Dec 22, 2020
6fc4550
Merge branch 'master' into nack
dionjansen Dec 23, 2020
7da197e
Unacked tracker only
dionjansen Dec 23, 2020
d759c06
Cleanup
dionjansen Dec 23, 2020
26cd957
Added test suite for unacked message tracker
dionjansen Jan 3, 2021
f4725f5
Refactored elapsed calc
dionjansen Jan 3, 2021
51f4a98
cleanup
dionjansen Jan 3, 2021
0119ca9
Cleanup old tracker
dionjansen Jan 6, 2021
7ad89a2
Apache header for new files
dionjansen Jan 6, 2021
a7d5f02
Cleanup inactive message tracker
dionjansen Jan 6, 2021
98850bb
Cleanup stress tests deps
dionjansen Jan 6, 2021
2edfa35
Cleanup message queue and interface
dionjansen Jan 6, 2021
b78ac54
Formatting tracker
dionjansen Jan 6, 2021
b57668b
Fixed tests naming, separated tests for struct
dionjansen Jan 6, 2021
871dce6
Renamed options, use timespans instad of micros
dionjansen Jan 6, 2021
8841930
Moved IEquatable and IComparable into MessageIdData partial class
dionjansen Jan 15, 2021
ddfd374
Renamed ack to full acknowledge, cleaned up tests
dionjansen Jan 15, 2021
ad58271
Refactor use for MessageId to MessageIdData, added RedeliverUnacknowl…
dionjansen Jan 15, 2021
f02166b
Refactored Start method to not use unnecessary Task.run
dionjansen Jan 15, 2021
5771533
Merge branch 'master' into nack
dionjansen Feb 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/DotPulsar/ConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,16 @@ public ConsumerOptions(string subscriptionName, string topic)
/// Set the topic for this consumer. This is required.
/// </summary>
public string Topic { get; set; }

/// <summary>
/// Delay to wait before redelivering messages that failed to be processed.
/// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
/// </summary>
public int NegativeAckRedeliveryDelayMicros { get; set; }
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Timeout of unacked messages
/// </summary>
public int AckTimeoutMillis { get; set; }
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
}
}
13 changes: 13 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace DotPulsar.Internal.Abstractions
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using DotPulsar.Abstractions;
using System.Threading;
using System.Threading.Tasks;
public interface IMessageAcksTracker<T>
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
T Add(T message);
T Ack(T message);
T Nack(T message);
Task StartTracker(IConsumer consumer, CancellationToken cancellationToken);
}
}
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 10 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace DotPulsar.Internal.Abstractions
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using System;

public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
{
void Acknowledge(MessageId obj);
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
void NegativeAcknowledge(MessageId obj);
}
}
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 16 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace DotPulsar.Internal.Abstractions
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using DotPulsar.Abstractions;
using System.Threading.Tasks;
using System.Threading;
using System;

public interface IUnackedMessageTracker : IDisposable
{
void Add(MessageId messageId);

void Ack(MessageId messageId);
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

Task Start(IConsumer consumer, CancellationToken cancellationToken = default);
}
}
7 changes: 5 additions & 2 deletions src/DotPulsar/Internal/ConsumerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DotPulsar.Internal
public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
{
private readonly ulong _id;
private readonly AsyncQueue<MessagePackage> _queue;
private readonly MessageQueue _queue;
private readonly IConnection _connection;
private readonly BatchHandler _batchHandler;
private readonly CommandFlow _cachedCommandFlow;
Expand All @@ -35,7 +35,7 @@ public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
public ConsumerChannel(
ulong id,
uint messagePrefetchCount,
AsyncQueue<MessagePackage> queue,
MessageQueue queue,
IConnection connection,
BatchHandler batchHandler)
{
Expand Down Expand Up @@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
}

command.ConsumerId = _id;

_queue.Acknowledge(new MessageId(messageId));
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

await _connection.Send(command, cancellationToken).ConfigureAwait(false);
}

Expand Down
10 changes: 8 additions & 2 deletions src/DotPulsar/Internal/ConsumerChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,25 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory
private readonly IExecute _executor;
private readonly CommandSubscribe _subscribe;
private readonly uint _messagePrefetchCount;
private readonly int _ackTimeoutMillis;
private readonly int _negativeAckRedeliveryDelayMicros;
private readonly BatchHandler _batchHandler;
private readonly IUnackedMessageTracker _tracker;

public ConsumerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
ConsumerOptions options)
ConsumerOptions options,
IUnackedMessageTracker tracker)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
_connectionPool = connectionPool;
_executor = executor;
_messagePrefetchCount = options.MessagePrefetchCount;
_tracker = tracker;

_subscribe = new CommandSubscribe
{
Expand All @@ -64,9 +69,10 @@ private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellat
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var consumerMessageQueue = new MessageQueue(messageQueue, _tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler);
}
}
}
30 changes: 30 additions & 0 deletions src/DotPulsar/Internal/InactiveMessageAcksTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace DotPulsar.Internal
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Events;
using Microsoft.Extensions.ObjectPool;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;

public sealed class InactiveMessageAcksTracker : IMessageAcksTracker<MessageId>
{
public InactiveMessageAcksTracker() { }
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
{
await Task.Yield();
}

public MessageId Add(MessageId message) => message;
public MessageId Ack(MessageId message) => message;
public MessageId Nack(MessageId message) => message;
}
}
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 31 additions & 0 deletions src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace DotPulsar.Internal
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using System.Threading;
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
using System.Threading.Tasks;
using Abstractions;
using DotPulsar.Abstractions;

public class InactiveUnackedMessageTracker : IUnackedMessageTracker
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
public InactiveUnackedMessageTracker()
{
}

public void Ack(MessageId messageId)
{
return;
}

public void Add(MessageId messageId)
{
return;
}

public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask;

public void Dispose() {
return;
}

dionjansen marked this conversation as resolved.
Show resolved Hide resolved
}
}
103 changes: 103 additions & 0 deletions src/DotPulsar/Internal/MessageAcksTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
namespace DotPulsar.Internal
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Events;
using Microsoft.Extensions.ObjectPool;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;

internal class Tracker
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Stopwatch _timer;
private int maxTimeoutMs;
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

public Tracker(int timeoutMs)
{
maxTimeoutMs = timeoutMs;
_timer = new Stopwatch();
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
_timer.Start();
}

public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs;

public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds;

public void Reset(int newTimeoutMs)
{
maxTimeoutMs = newTimeoutMs;
_timer.Restart();
}
}

// TODO add mechnism to stop tracker when disposed
public sealed class MessageAcksTracker : IMessageAcksTracker<MessageId>
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Dictionary<MessageId, Tracker> _trackers;
private readonly int _unackedTimeoutMs;
private readonly int _nackTimeoutMs;
private readonly int _trackerDelayMs;
public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs)
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
_unackedTimeoutMs = unackedTimeoutMs;
_nackTimeoutMs = nackTimeoutMs;
_trackerDelayMs = trackerDelayMs;
_trackers = new Dictionary<MessageId, Tracker>();
}

public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken)
{
await Task.Yield();

while (true)
{
await Task.Delay(_trackerDelayMs);

var messageIds = new List<MessageId>();
foreach (KeyValuePair<MessageId, Tracker> p in _trackers)
{
if (p.Value.IsTimedOut())
messageIds.Add(p.Key);
}

if (messageIds.Count() > 0)
await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false);

}
}
public MessageId Add(MessageId message)
{
if (!_trackers.ContainsKey(message))
{
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
_trackers.Add(message, new Tracker(_unackedTimeoutMs));
}

return message;
}
public MessageId Ack(MessageId message)
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
if (_trackers.ContainsKey(message))
_trackers.Remove(message);
return message;
}
public MessageId Nack(MessageId message)
{
if (_trackers.ContainsKey(message))
{
var timer = _trackers[message];
if (timer.msTillTimeout > _nackTimeoutMs)
timer.Reset(_nackTimeoutMs);
}
else
_trackers.Add(message, new Tracker(_nackTimeoutMs));
return message;
}
}
}
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
45 changes: 45 additions & 0 deletions src/DotPulsar/Internal/MessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
namespace DotPulsar.Internal
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using Events;
using Microsoft.Extensions.ObjectPool;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly AsyncQueue<MessagePackage> _queue;
private readonly IUnackedMessageTracker _tracker;
public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
_queue = queue;
_tracker = tracker;
}
public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
{
var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
_tracker.Add(new MessageId(message.MessageId));
return message;
}
public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

public void NegativeAcknowledge(MessageId obj)
{
throw new NotImplementedException();
}

public void Dispose()
{
_queue.Dispose();
_tracker.Dispose();
}

dionjansen marked this conversation as resolved.
Show resolved Hide resolved
}
}
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 3 additions & 1 deletion src/DotPulsar/Internal/ReaderChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ private async ValueTask<IReaderChannel> GetChannel(CancellationToken cancellatio
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var tracker = new InactiveUnackedMessageTracker(); // No tracker for reader since readers don't ack.
var consumerMessageQueue = new MessageQueue(messageQueue, tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler);
}
}
}
Loading