diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs index 976f54bd0..78b165f53 100644 --- a/src/DotPulsar/Abstractions/IConsumer.cs +++ b/src/DotPulsar/Abstractions/IConsumer.cs @@ -14,6 +14,7 @@ namespace DotPulsar.Abstractions { + using DotPulsar.Internal.PulsarApi; using System; using System.Collections.Generic; using System.Threading; @@ -59,6 +60,11 @@ public interface IConsumer : IGetLastMessageId, IReceive, ISeek, IState ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken = default); + /// + /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged. + /// + ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken); + /// /// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged. /// diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs index 849648765..9523281a9 100644 --- a/src/DotPulsar/ConsumerOptions.cs +++ b/src/DotPulsar/ConsumerOptions.cs @@ -15,6 +15,7 @@ namespace DotPulsar { using DotPulsar.Abstractions; + using System; /// /// The consumer building options. @@ -104,5 +105,16 @@ public ConsumerOptions(string subscriptionName, string topic) /// Set the topic for this consumer. This is required. /// public string Topic { get; set; } + + /// + /// Delay to wait before redelivering messages that failed to be processed. + /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout. + /// + public TimeSpan NegativeAcknowledgementRedeliveryDelay { get; set; } + + /// + /// Timeout of unacked messages + /// + public TimeSpan AcknowledgementTimeout { get; set; } } } diff --git a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs new file mode 100644 index 000000000..a6a52f602 --- /dev/null +++ b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal.Abstractions +{ + using DotPulsar.Internal.PulsarApi; + using System; + + public interface IMessageQueue : IDequeue, IDisposable + { + void Acknowledge(MessageIdData messageId); + + void NegativeAcknowledge(MessageIdData messageId); + } +} diff --git a/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs b/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs new file mode 100644 index 000000000..0a294e07e --- /dev/null +++ b/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal.Abstractions +{ + using DotPulsar.Abstractions; + using DotPulsar.Internal.PulsarApi; + using System; + using System.Threading; + using System.Threading.Tasks; + + public interface IUnacknowledgedMessageTracker : IDisposable + { + void Add(MessageIdData messageId); + + void Acknowledge(MessageIdData messageId); + + Task Start(IConsumer consumer, CancellationToken cancellationToken = default); + } +} diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs index c29db020b..8abfe83ee 100644 --- a/src/DotPulsar/Internal/Consumer.cs +++ b/src/DotPulsar/Internal/Consumer.cs @@ -103,7 +103,7 @@ public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancel public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken) => await Acknowledge(messageId, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false); - public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken) + public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken) { ThrowIfDisposed(); @@ -112,6 +112,9 @@ public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable me await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false); } + public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken) + => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data), cancellationToken).ConfigureAwait(false); + public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken) => await RedeliverUnacknowledgedMessages(Enumerable.Empty(), cancellationToken).ConfigureAwait(false); diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs index de109dd4d..9f97c52fa 100644 --- a/src/DotPulsar/Internal/ConsumerChannel.cs +++ b/src/DotPulsar/Internal/ConsumerChannel.cs @@ -24,7 +24,7 @@ namespace DotPulsar.Internal public sealed class ConsumerChannel : IConsumerChannel { private readonly ulong _id; - private readonly AsyncQueue _queue; + private readonly MessageQueue _queue; private readonly IConnection _connection; private readonly BatchHandler _batchHandler; private readonly CommandFlow _cachedCommandFlow; @@ -35,7 +35,7 @@ public sealed class ConsumerChannel : IConsumerChannel public ConsumerChannel( ulong id, uint messagePrefetchCount, - AsyncQueue queue, + MessageQueue queue, IConnection connection, BatchHandler batchHandler) { @@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken) } command.ConsumerId = _id; + + _queue.Acknowledge(messageId); + await _connection.Send(command, cancellationToken).ConfigureAwait(false); } diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs index 5268264d5..37756097f 100644 --- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs +++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs @@ -28,20 +28,25 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory private readonly IExecute _executor; private readonly CommandSubscribe _subscribe; private readonly uint _messagePrefetchCount; + private readonly int _ackTimeoutMillis; + private readonly int _negativeAckRedeliveryDelayMicros; private readonly BatchHandler _batchHandler; + private readonly IUnacknowledgedMessageTracker _tracker; public ConsumerChannelFactory( Guid correlationId, IRegisterEvent eventRegister, IConnectionPool connectionPool, IExecute executor, - ConsumerOptions options) + ConsumerOptions options, + IUnacknowledgedMessageTracker tracker) { _correlationId = correlationId; _eventRegister = eventRegister; _connectionPool = connectionPool; _executor = executor; _messagePrefetchCount = options.MessagePrefetchCount; + _tracker = tracker; _subscribe = new CommandSubscribe { @@ -64,9 +69,10 @@ private async ValueTask GetChannel(CancellationToken cancellat { var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false); var messageQueue = new AsyncQueue(); + var consumerMessageQueue = new MessageQueue(messageQueue, _tracker); var channel = new Channel(_correlationId, _eventRegister, messageQueue); var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false); - return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler); + return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler); } } } diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs new file mode 100644 index 000000000..65ff97f09 --- /dev/null +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal +{ + using Abstractions; + using PulsarApi; + using DotPulsar.Abstractions; + using System.Threading; + using System.Threading.Tasks; + + public sealed class InactiveUnackedMessageTracker : IUnacknowledgedMessageTracker + { + + public void Acknowledge(MessageIdData messageId) + { + return; + } + + public void Add(MessageIdData messageId) + { + return; + } + + public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask; + + public void Dispose() + { + return; + } + } +} diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs new file mode 100644 index 000000000..4f4ec5715 --- /dev/null +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal +{ + using Abstractions; + using PulsarApi; + using System; + using System.Threading; + using System.Threading.Tasks; + + public sealed class MessageQueue : IMessageQueue + { + private readonly AsyncQueue _queue; + private readonly IUnacknowledgedMessageTracker _tracker; + + public MessageQueue(AsyncQueue queue, IUnacknowledgedMessageTracker tracker) + { + _queue = queue; + _tracker = tracker; + } + + public async ValueTask Dequeue(CancellationToken cancellationToken = default) + { + var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); + _tracker.Add(message.MessageId); + return message; + } + + public void Acknowledge(MessageIdData obj) => _tracker.Acknowledge(obj); + + public void NegativeAcknowledge(MessageIdData obj) + { + throw new NotImplementedException(); + } + + public void Dispose() + { + _queue.Dispose(); + _tracker.Dispose(); + } + } +} diff --git a/src/DotPulsar/Internal/PulsarApi/MessageIdData.cs b/src/DotPulsar/Internal/PulsarApi/MessageIdData.cs new file mode 100644 index 000000000..af25f2e37 --- /dev/null +++ b/src/DotPulsar/Internal/PulsarApi/MessageIdData.cs @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal.PulsarApi +{ + using System; + + public partial class MessageIdData : IEquatable, IComparable + { + public int CompareTo(MessageIdData? other) + { + if (other is null) + return 1; + + var result = LedgerId.CompareTo(other.LedgerId); + if (result != 0) + return result; + + result = EntryId.CompareTo(other.EntryId); + if (result != 0) + return result; + + result = Partition.CompareTo(other.Partition); + if (result != 0) + return result; + + return BatchIndex.CompareTo(other.BatchIndex); + } + + public bool Equals(MessageIdData? other) + => other is not null && LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex; + } +} diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs index 58b31e157..f4a6ef9af 100644 --- a/src/DotPulsar/Internal/ReaderChannelFactory.cs +++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs @@ -63,9 +63,11 @@ private async ValueTask GetChannel(CancellationToken cancellat { var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false); var messageQueue = new AsyncQueue(); + var tracker = new InactiveUnackedMessageTracker(); // No tracker for reader since readers don't ack. + var consumerMessageQueue = new MessageQueue(messageQueue, tracker); var channel = new Channel(_correlationId, _eventRegister, messageQueue); var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false); - return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler); + return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler); } } } diff --git a/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs new file mode 100644 index 000000000..df531a609 --- /dev/null +++ b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using PulsarApi; + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + + public readonly struct AwaitingAck + { + public MessageIdData MessageId { get; } + public long Timestamp { get; } + + public AwaitingAck(MessageIdData messageId) + { + MessageId = messageId; + Timestamp = Stopwatch.GetTimestamp(); + } + + public TimeSpan Elapsed => TimeSpan.FromTicks( + (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond)); + } + + public sealed class UnackedMessageTracker : IUnacknowledgedMessageTracker + { + private readonly TimeSpan _ackTimeout; + private readonly TimeSpan _pollingTimeout; + private readonly ConcurrentQueue _awaitingAcks; + private readonly List _acked; + private readonly CancellationTokenSource _cancellationTokenSource; + + public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout) + { + _ackTimeout = ackTimeout; + _pollingTimeout = pollingTimeout; + _awaitingAcks = new ConcurrentQueue(); + _acked = new List(); + _cancellationTokenSource = new CancellationTokenSource(); + } + + public void Add(MessageIdData messageId) + { + _awaitingAcks.Enqueue(new AwaitingAck(messageId)); + } + + public void Acknowledge(MessageIdData messageId) + { + // We only need to store the highest cumulative ack we see (if there is one) + // and the MessageIds not included by that cumulative ack. + _acked.Add(messageId); + } + + public async Task Start(IConsumer consumer, CancellationToken cancellationToken = default) + { + await Task.Yield(); + + CancellationToken token = + CancellationTokenSource.CreateLinkedTokenSource( + _cancellationTokenSource.Token, cancellationToken).Token; + + while (!token.IsCancellationRequested) + { + var messages = CheckUnackedMessages(); + + if (messages.Count() > 0) + await consumer.RedeliverUnacknowledgedMessages(messages, token); + + await Task.Delay(_pollingTimeout, token); + } + } + + private IEnumerable CheckUnackedMessages() + { + var result = new List(); + + while (_awaitingAcks.TryPeek(out AwaitingAck awaiting) + && awaiting.Elapsed > _ackTimeout) + { + if (_awaitingAcks.TryDequeue(out awaiting)) + { + if (!_acked.Contains(awaiting.MessageId)) + result.Add(awaiting.MessageId); + else + _acked.Remove(awaiting.MessageId); + } + } + + return result; + } + + public void Dispose() + { + _cancellationTokenSource.Cancel(); + } + } +} diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs index bc3203994..9b9216a8b 100644 --- a/src/DotPulsar/MessageId.cs +++ b/src/DotPulsar/MessageId.cs @@ -70,25 +70,7 @@ public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex) /// public int BatchIndex { get; } - public int CompareTo(MessageId? other) - { - if (other is null) - return 1; - - var result = LedgerId.CompareTo(other.LedgerId); - if (result != 0) - return result; - - result = EntryId.CompareTo(other.EntryId); - if (result != 0) - return result; - - result = Partition.CompareTo(other.Partition); - if (result != 0) - return result; - - return BatchIndex.CompareTo(other.BatchIndex); - } + public int CompareTo(MessageId? other) => other is null ? 1 : Data.CompareTo(other.Data); public static bool operator >(MessageId x, MessageId y) => x is not null && x.CompareTo(y) >= 1; @@ -106,7 +88,7 @@ public override bool Equals(object? o) => o is MessageId id && Equals(id); public bool Equals(MessageId? other) - => other is not null && LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex; + => other is not null && Data.Equals(other.Data); public static bool operator ==(MessageId x, MessageId y) => ReferenceEquals(x, y) || (x is not null && x.Equals(y)); diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs index c18c7d8d3..62a3e489d 100644 --- a/src/DotPulsar/PulsarClient.cs +++ b/src/DotPulsar/PulsarClient.cs @@ -81,11 +81,15 @@ public IConsumer CreateConsumer(ConsumerOptions options) ThrowIfDisposed(); var correlationId = Guid.NewGuid(); var executor = new Executor(correlationId, _processManager, _exceptionHandler); - var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options); var stateManager = new StateManager(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted); var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager); if (options.StateChangedHandler is not null) _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler); + IUnacknowledgedMessageTracker unackedTracker = options.AcknowledgementTimeout == default + ? new InactiveUnackedMessageTracker() + : new UnackedMessageTracker(options.AcknowledgementTimeout, TimeSpan.FromSeconds(1)); + unackedTracker.Start(consumer); + var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, unackedTracker); var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover); _processManager.Add(process); process.Start(); diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 8d40129ac..66c0f3896 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -17,6 +17,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs b/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs new file mode 100644 index 000000000..793de3146 --- /dev/null +++ b/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests.Internal +{ + using DotPulsar.Internal; + using DotPulsar.Internal.PulsarApi; + using FluentAssertions; + using System; + using System.Diagnostics; + using System.Threading.Tasks; + using Xunit; + + public class AwaitingAckTests + { + [Fact] + public async void Elapsed_GivenTimeElapsed_ShoulEqualCorrectElapsedTicks() + { + //Arrange + var messageId = new MessageIdData(); + var sw = Stopwatch.StartNew(); + + //Act + var awaiting = new AwaitingAck(messageId); + await Task.Delay(TimeSpan.FromMilliseconds(123)); + sw.Stop(); + + //Assert + awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1); + } + } +} diff --git a/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs b/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs new file mode 100644 index 000000000..396472c92 --- /dev/null +++ b/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs @@ -0,0 +1,150 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests.Internal +{ + using AutoFixture; + using AutoFixture.AutoNSubstitute; + using DotPulsar.Abstractions; + using DotPulsar.Internal; + using DotPulsar.Internal.PulsarApi; + using NSubstitute; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Linq.Expressions; + using System.Threading; + using System.Threading.Tasks; + using Xunit; + + public class UnacknowledgedMessageTrackerTests + { + [Fact] + public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = new MessageIdData(); + var cts = new CancellationTokenSource(); + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Is(EquivalentTo(new List() { messageId })), + Arg.Any()); + } + + [Fact] + public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = new MessageIdData(); + var cts = new CancellationTokenSource(); + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + var _ = Task.Delay(5).ContinueWith(_ => tracker.Acknowledge(messageId)); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .DidNotReceive() + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); + } + + [Fact] + public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = new MessageIdData(); + var cts = new CancellationTokenSource(); + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + + var _ = Task.Delay(15).ContinueWith(_ => tracker.Acknowledge(messageId)); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); + } + + [Fact] + public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliverOnlyOnce() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = new MessageIdData(); + var cts = new CancellationTokenSource(); + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(5)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(50); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); + } + + private Expression>> EquivalentTo(IEnumerable enumerable) => + x => IsEquivalentIEnumerable(enumerable, x); + + private bool IsEquivalentIEnumerable(IEnumerable a, IEnumerable b) => + a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _); + } +}