Skip to content

Commit

Permalink
Unacked tracker only
Browse files Browse the repository at this point in the history
  • Loading branch information
dionjansen committed Dec 23, 2020
1 parent 6fc4550 commit 7da197e
Show file tree
Hide file tree
Showing 8 changed files with 902 additions and 21 deletions.
9 changes: 5 additions & 4 deletions src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
namespace DotPulsar.Internal.Abstractions
{
using System.Threading.Tasks;
public interface IMessageQueue
using System;

public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
{
MessageId Acknowledge(MessageId obj);
MessageId NegativeAcknowledge(MessageId obj);
void Acknowledge(MessageId obj);
void NegativeAcknowledge(MessageId obj);
}
}
15 changes: 15 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Abstractions;
using System.Threading.Tasks;
using System;

public interface IUnackedMessageTracker : IDisposable
{
void Add(MessageId messageId);

void Ack(MessageId messageId);

Task Start(IConsumer consumer);
}
}
5 changes: 2 additions & 3 deletions src/DotPulsar/Internal/ConsumerChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory
private readonly int _ackTimeoutMillis;
private readonly int _negativeAckRedeliveryDelayMicros;
private readonly BatchHandler _batchHandler;
private readonly IMessageAcksTracker<MessageId> _tracker;
private readonly IUnackedMessageTracker _tracker;

public ConsumerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
ConsumerOptions options,
IMessageAcksTracker<MessageId> tracker)
IUnackedMessageTracker tracker)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
Expand Down Expand Up @@ -69,7 +69,6 @@ private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellat
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
// TODO perhaps start tracker here?
var consumerMessageQueue = new MessageQueue(messageQueue, _tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
Expand Down
30 changes: 30 additions & 0 deletions src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace DotPulsar.Internal
{
using System.Threading.Tasks;
using Abstractions;
using DotPulsar.Abstractions;

public class InactiveUnackedMessageTracker : IUnackedMessageTracker
{
public InactiveUnackedMessageTracker()
{
}

public void Ack(MessageId messageId)
{
return;
}

public void Add(MessageId messageId)
{
return;
}

public Task Start(IConsumer consumer) => Task.CompletedTask;

public void Dispose() {
return;
}

}
}
15 changes: 10 additions & 5 deletions src/DotPulsar/Internal/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace DotPulsar.Internal
public sealed class MessageQueue : IMessageQueue, IDequeue<MessagePackage>, IDisposable
{
private readonly AsyncQueue<MessagePackage> _queue;
private readonly IMessageAcksTracker<MessageId> _tracker;
public MessageQueue(AsyncQueue<MessagePackage> queue, IMessageAcksTracker<MessageId> tracker)
private readonly IUnackedMessageTracker _tracker;
public MessageQueue(AsyncQueue<MessagePackage> queue, IUnackedMessageTracker tracker)
{
_queue = queue;
_tracker = tracker;
Expand All @@ -28,13 +28,18 @@ public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationTok
_tracker.Add(new MessageId(message.MessageId));
return message;
}
public MessageId Acknowledge(MessageId obj) => _tracker.Ack(obj);
public MessageId NegativeAcknowledge(MessageId obj) => _tracker.Nack(obj);
public void Acknowledge(MessageId obj) => _tracker.Ack(obj);

public void NegativeAcknowledge(MessageId obj)
{
throw new NotImplementedException();
}

public void Dispose()
{
_queue.Dispose();
// TODO dispose tracker
_tracker.Dispose();
}

}
}
104 changes: 104 additions & 0 deletions src/DotPulsar/Internal/UnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
namespace DotPulsar.Internal
{
using Abstractions;
using DotPulsar.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public readonly struct AwaitingAck
{
public MessageId MessageId { get; }
public long Timestamp { get; }

public AwaitingAck(MessageId messageId)
{
MessageId = messageId;
Timestamp = Stopwatch.GetTimestamp();
}

public TimeSpan Elapsed =>
TimeSpan.FromTicks(
(Stopwatch.GetTimestamp() - Timestamp) /
(Stopwatch.Frequency) * 1000);
}

public sealed class UnackedMessageTracker : IUnackedMessageTracker
{
private readonly TimeSpan _ackTimeout;
private readonly TimeSpan _pollingTimeout;
private readonly ConcurrentQueue<AwaitingAck> _awaitingAcks;
private readonly List<MessageId> _acked;
private readonly CancellationTokenSource _cancellationTokenSource;


public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout)
{
_ackTimeout = ackTimeout;
_pollingTimeout = pollingTimeout;
_awaitingAcks = new ConcurrentQueue<AwaitingAck>();
_acked = new List<MessageId>();
_cancellationTokenSource = new CancellationTokenSource();
}

public void Add(MessageId messageId)
{
_awaitingAcks.Enqueue(new AwaitingAck(messageId));
}

public void Ack(MessageId messageId)
{
// We only need to store the highest cumulative ack we see (if there is one)
// and the MessageIds not included by that cumulative ack.
_acked.Add(messageId);
}

public Task Start(IConsumer consumer)
{
var cancellationToken = _cancellationTokenSource.Token;

return Task.Run(async () => {
while (!cancellationToken.IsCancellationRequested)
{
var messages = CheckUnackedMessages();

if (messages.Count() > 0)
await consumer.RedeliverUnacknowledgedMessages(messages, cancellationToken);

await Task.Delay(_pollingTimeout, cancellationToken);
}
}, cancellationToken);
}

private IEnumerable<MessageId> CheckUnackedMessages()
{
AwaitingAck awaiting;
var result = new List<MessageId>();

while (_awaitingAcks.TryPeek(out awaiting)
&& awaiting.Elapsed > _ackTimeout)
{
// Can I safely use Dequeue now instead of TryDequeue?
if (_awaitingAcks.TryDequeue(out awaiting))
{
//If the MessageId is not acknowledged
if (!_acked.Contains(awaiting.MessageId))
result.Add(awaiting.MessageId);
else
_acked.Remove(awaiting.MessageId);
}
}

return result;
}

public void Dispose()
{
this._cancellationTokenSource.Cancel();
}
}
}
14 changes: 5 additions & 9 deletions src/DotPulsar/PulsarClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,15 @@ public IConsumer CreateConsumer(ConsumerOptions options)
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);

var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
IMessageAcksTracker<MessageId> tracker = new InactiveMessageAcksTracker();
if (options.AckTimeoutMillis > 0 || options.NegativeAckRedeliveryDelayMicros > 0)
// TODO polling interval from options
tracker = new MessageAcksTracker(options.AckTimeoutMillis, options.NegativeAckRedeliveryDelayMicros, 1000);
// TODO
// handle cancellation
_ = tracker.StartTracker(consumer, new CancellationTokenSource().Token);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, tracker);
IUnackedMessageTracker unackedTracker = options.AckTimeoutMillis > 0
? new UnackedMessageTracker(TimeSpan.FromMilliseconds(options.AckTimeoutMillis), TimeSpan.FromSeconds(1))
: new InactiveUnackedMessageTracker();
unackedTracker.Start(consumer);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, unackedTracker);
var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
Expand Down
Loading

0 comments on commit 7da197e

Please sign in to comment.