From 26cd957a56b6349f9e37bf76bd37f92a7a7e0970 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 3 Jan 2021 18:12:42 +0100 Subject: [PATCH] Added test suite for unacked message tracker --- .../Abstractions/IUnackedMessageTracker.cs | 3 +- .../Internal/InactiveUnackedMessageTracker.cs | 3 +- .../Internal/ReaderChannelFactory.cs | 2 +- .../Internal/UnackedMessageTracker.cs | 26 ++- tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 + .../Internal/MessageAcksTrackerTests.cs | 164 +++++++++++++++++- 6 files changed, 178 insertions(+), 22 deletions(-) diff --git a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs index ca2b703c5..ea432d1d4 100644 --- a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs @@ -2,6 +2,7 @@ { using DotPulsar.Abstractions; using System.Threading.Tasks; + using System.Threading; using System; public interface IUnackedMessageTracker : IDisposable @@ -10,6 +11,6 @@ public interface IUnackedMessageTracker : IDisposable void Ack(MessageId messageId); - Task Start(IConsumer consumer); + Task Start(IConsumer consumer, CancellationToken cancellationToken = default); } } diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs index 07fd69f1b..9155ab5f6 100644 --- a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -1,5 +1,6 @@ namespace DotPulsar.Internal { + using System.Threading; using System.Threading.Tasks; using Abstractions; using DotPulsar.Abstractions; @@ -20,7 +21,7 @@ public void Add(MessageId messageId) return; } - public Task Start(IConsumer consumer) => Task.CompletedTask; + public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask; public void Dispose() { return; diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs index dcb5ea75a..718084aa4 100644 --- a/src/DotPulsar/Internal/ReaderChannelFactory.cs +++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs @@ -63,7 +63,7 @@ private async ValueTask GetChannel(CancellationToken cancellatio { var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false); var messageQueue = new AsyncQueue(); - var tracker = new InactiveMessageAcksTracker(); // No tracker for reader since readers don't ack. + var tracker = new InactiveUnackedMessageTracker(); // No tracker for reader since readers don't ack. var consumerMessageQueue = new MessageQueue(messageQueue, tracker); var channel = new Channel(_correlationId, _eventRegister, messageQueue); var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false); diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnackedMessageTracker.cs index 299dfa1e2..d52da34da 100644 --- a/src/DotPulsar/Internal/UnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnackedMessageTracker.cs @@ -13,18 +13,15 @@ public readonly struct AwaitingAck { public MessageId MessageId { get; } - public long Timestamp { get; } + public Stopwatch Stopwatch { get; } public AwaitingAck(MessageId messageId) { MessageId = messageId; - Timestamp = Stopwatch.GetTimestamp(); + Stopwatch = Stopwatch.StartNew(); } - public TimeSpan Elapsed => - TimeSpan.FromTicks( - (Stopwatch.GetTimestamp() - Timestamp) / - (Stopwatch.Frequency) * 1000); + public TimeSpan Elapsed => Stopwatch.Elapsed; } public sealed class UnackedMessageTracker : IUnackedMessageTracker @@ -57,29 +54,30 @@ public void Ack(MessageId messageId) _acked.Add(messageId); } - public Task Start(IConsumer consumer) + public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) { - var cancellationToken = _cancellationTokenSource.Token; + CancellationToken token = + CancellationTokenSource.CreateLinkedTokenSource( + _cancellationTokenSource.Token, cancellationToken).Token; return Task.Run(async () => { - while (!cancellationToken.IsCancellationRequested) + while (!token.IsCancellationRequested) { var messages = CheckUnackedMessages(); if (messages.Count() > 0) - await consumer.RedeliverUnacknowledgedMessages(messages, cancellationToken); + await consumer.RedeliverUnacknowledgedMessages(messages, token); - await Task.Delay(_pollingTimeout, cancellationToken); + await Task.Delay(_pollingTimeout, token); } - }, cancellationToken); + }, token); } private IEnumerable CheckUnackedMessages() { - AwaitingAck awaiting; var result = new List(); - while (_awaitingAcks.TryPeek(out awaiting) + while (_awaitingAcks.TryPeek(out AwaitingAck awaiting) && awaiting.Elapsed > _ackTimeout) { // Can I safely use Dequeue now instead of TryDequeue? diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 74cb6b9fc..accfbbfbf 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -17,6 +17,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs index 8a06a76db..f915f1630 100644 --- a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs +++ b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs @@ -1,18 +1,172 @@ namespace DotPulsar.Tests.Internal { using DotPulsar.Internal; + using DotPulsar.Abstractions; using FluentAssertions; - using System.Buffers; - using System.Linq; using Xunit; + using System; + using AutoFixture; + using System.Threading; + using System.Threading.Tasks; + using System.Collections.Generic; + using System.Linq; + using System.Linq.Expressions; + using AutoFixture.AutoNSubstitute; + using NSubstitute; + using System.Diagnostics; - public class MessageAcksTrackerTests + public class UnackedMessageTrackerTests { [Fact] public void Test_Instance() { - var tracker = new MessageAcksTracker(1, 2, 3); - tracker.Should().BeOfType(); + var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1)); + tracker.Should().BeOfType(); + } + + + [Fact] + public async void Test_AwaitingAck_Elapsed() + { + //Arrange + var messageId = MessageId.Latest; + var sw = new Stopwatch(); + sw.Start(); + + //Act + var awaiting = new AwaitingAck(messageId); + await Task.Delay(TimeSpan.FromMilliseconds(123)); + sw.Stop(); + + //Assert + awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1); + } + + [Fact] + public async void Test_Start_Message() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Is(EquivalentTo(new List() { messageId })), + Arg.Any()); + } + + [Fact] + public async void Test_Start_Message_Ack_In_Time() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId)); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .DidNotReceive() + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); + } + + [Fact] + public async void Test_Start_Message_Ack_Too_Late() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + + var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId)); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); } + + [Fact] + public async void Test_Start_Redeliver_Only_Cnce() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(5)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(50); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); + } + + + private Expression>> EquivalentTo(IEnumerable enumerable) => + x => IsEquivalentIEnumerable(enumerable, x); + + + private bool IsEquivalentIEnumerable(IEnumerable a, IEnumerable b) => + a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _); } } \ No newline at end of file