Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for (optional) ack timeout and nack delay for consumers #67

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4eb2b06
Added missing reference to Microsoft.Bcl.AsyncInterfaces in samples a…
dionjansen Dec 19, 2020
dfb864a
nack delay option
dionjansen Dec 20, 2020
370e1c8
ack timeout config option
dionjansen Dec 20, 2020
278005f
naive implementation tracker and message queue
dionjansen Dec 20, 2020
79bc906
use message package for queue
dionjansen Dec 20, 2020
546d9e9
use int for ms values
dionjansen Dec 21, 2020
5e3cea6
Non generic interface
dionjansen Dec 21, 2020
19931bc
Added basic implementation for tracker, integrated messagequeue in co…
dionjansen Dec 21, 2020
295eab8
Create conditional tracker in client
dionjansen Dec 21, 2020
2be5114
Inform queue when acking
dionjansen Dec 21, 2020
163ea45
Cleanup references for VSCode
dionjansen Dec 22, 2020
6fc4550
Merge branch 'master' into nack
dionjansen Dec 23, 2020
7da197e
Unacked tracker only
dionjansen Dec 23, 2020
d759c06
Cleanup
dionjansen Dec 23, 2020
26cd957
Added test suite for unacked message tracker
dionjansen Jan 3, 2021
f4725f5
Refactored elapsed calc
dionjansen Jan 3, 2021
51f4a98
cleanup
dionjansen Jan 3, 2021
0119ca9
Cleanup old tracker
dionjansen Jan 6, 2021
7ad89a2
Apache header for new files
dionjansen Jan 6, 2021
a7d5f02
Cleanup inactive message tracker
dionjansen Jan 6, 2021
98850bb
Cleanup stress tests deps
dionjansen Jan 6, 2021
2edfa35
Cleanup message queue and interface
dionjansen Jan 6, 2021
b78ac54
Formatting tracker
dionjansen Jan 6, 2021
b57668b
Fixed tests naming, separated tests for struct
dionjansen Jan 6, 2021
871dce6
Renamed options, use timespans instad of micros
dionjansen Jan 6, 2021
8841930
Moved IEquatable and IComparable into MessageIdData partial class
dionjansen Jan 15, 2021
ddfd374
Renamed ack to full acknowledge, cleaned up tests
dionjansen Jan 15, 2021
ad58271
Refactor use for MessageId to MessageIdData, added RedeliverUnacknowl…
dionjansen Jan 15, 2021
f02166b
Refactored Start method to not use unnecessary Task.run
dionjansen Jan 15, 2021
5771533
Merge branch 'master' into nack
dionjansen Feb 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/DotPulsar/ConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
namespace DotPulsar
{
using DotPulsar.Abstractions;
using System;

/// <summary>
/// The consumer building options.
Expand Down Expand Up @@ -101,5 +102,16 @@ public ConsumerOptions(string subscriptionName, string topic)
/// Set the topic for this consumer. This is required.
/// </summary>
public string Topic { get; set; }

/// <summary>
/// Delay to wait before redelivering messages that failed to be processed.
/// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
/// </summary>
public TimeSpan NegativeAcknowledgementRedeliveryDelay { get; set; }

/// <summary>
/// Timeout of unacked messages
/// </summary>
public TimeSpan AcknowledgementTimeout { get; set; }
}
}
25 changes: 25 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using System;

public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
{
void Acknowledge(MessageId messageId);

void NegativeAcknowledge(MessageId messageId);
}
}
30 changes: 30 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Abstractions;
using System.Threading.Tasks;
using System.Threading;
using System;

public interface IUnackedMessageTracker : IDisposable
{
void Add(MessageId messageId);

void Ack(MessageId messageId);
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

Task Start(IConsumer consumer, CancellationToken cancellationToken = default);
}
}
7 changes: 5 additions & 2 deletions src/DotPulsar/Internal/ConsumerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DotPulsar.Internal
public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
{
private readonly ulong _id;
private readonly AsyncQueue<MessagePackage> _queue;
private readonly MessageQueue _queue;
private readonly IConnection _connection;
private readonly BatchHandler _batchHandler;
private readonly CommandFlow _cachedCommandFlow;
Expand All @@ -35,7 +35,7 @@ public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
public ConsumerChannel(
ulong id,
uint messagePrefetchCount,
AsyncQueue<MessagePackage> queue,
MessageQueue queue,
IConnection connection,
BatchHandler batchHandler)
{
Expand Down Expand Up @@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
}

command.ConsumerId = _id;

_queue.Acknowledge(new MessageId(messageId));
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

await _connection.Send(command, cancellationToken).ConfigureAwait(false);
}

Expand Down
10 changes: 8 additions & 2 deletions src/DotPulsar/Internal/ConsumerChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,25 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory
private readonly IExecute _executor;
private readonly CommandSubscribe _subscribe;
private readonly uint _messagePrefetchCount;
private readonly int _ackTimeoutMillis;
private readonly int _negativeAckRedeliveryDelayMicros;
private readonly BatchHandler _batchHandler;
private readonly IUnackedMessageTracker _tracker;

public ConsumerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
ConsumerOptions options)
ConsumerOptions options,
IUnackedMessageTracker tracker)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
_connectionPool = connectionPool;
_executor = executor;
_messagePrefetchCount = options.MessagePrefetchCount;
_tracker = tracker;

_subscribe = new CommandSubscribe
{
Expand All @@ -64,9 +69,10 @@ private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellat
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var consumerMessageQueue = new MessageQueue(messageQueue, _tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler);
}
}
}
42 changes: 42 additions & 0 deletions src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal
{
using Abstractions;
using DotPulsar.Abstractions;
using System.Threading;
using System.Threading.Tasks;

public class InactiveUnackedMessageTracker : IUnackedMessageTracker
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{

public void Ack(MessageId messageId)
{
return;
}

public void Add(MessageId messageId)
{
return;
}

public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask;

public void Dispose()
{
return;
}
}
}
53 changes: 53 additions & 0 deletions src/DotPulsar/Internal/MessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
using Abstractions;
using System;
using System.Threading;
using System.Threading.Tasks;

public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly AsyncQueue<MessagePackage> _queue;
private readonly IUnackedMessageTracker _tracker;

public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
_queue = queue;
_tracker = tracker;
}

public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
{
var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
_tracker.Add(new MessageId(message.MessageId));
return message;
}

public void Acknowledge(MessageId obj) => _tracker.Ack(obj);
dionjansen marked this conversation as resolved.
Show resolved Hide resolved

public void NegativeAcknowledge(MessageId obj)
{
throw new NotImplementedException();
}

public void Dispose()
{
_queue.Dispose();
_tracker.Dispose();
}
}
}
4 changes: 3 additions & 1 deletion src/DotPulsar/Internal/ReaderChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ private async ValueTask<IReaderChannel> GetChannel(CancellationToken cancellatio
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var tracker = new InactiveUnackedMessageTracker(); // No tracker for reader since readers don't ack.
var consumerMessageQueue = new MessageQueue(messageQueue, tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler);
}
}
}
115 changes: 115 additions & 0 deletions src/DotPulsar/Internal/UnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal
{
using Abstractions;
using DotPulsar.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public readonly struct AwaitingAck
{
public MessageId MessageId { get; }
public long Timestamp { get; }

public AwaitingAck(MessageId messageId)
{
MessageId = messageId;
Timestamp = Stopwatch.GetTimestamp();
}

public TimeSpan Elapsed => TimeSpan.FromTicks(
(long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond));
}

public sealed class UnackedMessageTracker : IUnackedMessageTracker
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly TimeSpan _ackTimeout;
private readonly TimeSpan _pollingTimeout;
private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
private readonly List<MessageId> _acked;
private readonly CancellationTokenSource _cancellationTokenSource;

dionjansen marked this conversation as resolved.
Show resolved Hide resolved
public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
{
_ackTimeout = ackTimeout;
_pollingTimeout = pollingTimeout;
_awaitingAcks = new ConcurrentQueue<AwaitingAck>();
_acked = new List<MessageId>();
_cancellationTokenSource = new CancellationTokenSource();
}

public void Add(MessageId messageId)
{
_awaitingAcks.Enqueue(new AwaitingAck(messageId));
}

public void Ack(MessageId messageId)
{
// We only need to store the highest cumulative ack we see (if there is one)
// and the MessageIds not included by that cumulative ack.
_acked.Add(messageId);
}

public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
{
CancellationToken token =
CancellationTokenSource.CreateLinkedTokenSource(
_cancellationTokenSource.Token, cancellationToken).Token;

return Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
var messages = CheckUnackedMessages();

if (messages.Count() > 0)
await consumer.RedeliverUnacknowledgedMessages(messages, token);

await Task.Delay(_pollingTimeout, token);
}
}, token);
}

private IEnumerable<MessageId> CheckUnackedMessages()
{
var result = new List<MessageId>();

while (_awaitingAcks.TryPeek(out AwaitingAck awaiting)
&& awaiting.Elapsed > _ackTimeout)
{
if (_awaitingAcks.TryDequeue(out awaiting))
{
if (!_acked.Contains(awaiting.MessageId))
result.Add(awaiting.MessageId);
else
_acked.Remove(awaiting.MessageId);
}
}

return result;
}

public void Dispose()
{
this._cancellationTokenSource.Cancel();
dionjansen marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
6 changes: 5 additions & 1 deletion src/DotPulsar/PulsarClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ public IConsumer CreateConsumer(ConsumerOptions options)
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
IUnackedMessageTracker unackedTracker = options.AcknowledgementTimeout == default
? new InactiveUnackedMessageTracker()
: new UnackedMessageTracker(options.AcknowledgementTimeout, TimeSpan.FromSeconds(1));
unackedTracker.Start(consumer);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, unackedTracker);
var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
Expand Down
Loading