Skip to content

Commit

Permalink
Added test suite for unacked message tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
dionjansen committed Jan 3, 2021
1 parent d759c06 commit 26cd957
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using DotPulsar.Abstractions;
using System.Threading.Tasks;
using System.Threading;
using System;

public interface IUnackedMessageTracker : IDisposable
Expand All @@ -10,6 +11,6 @@ public interface IUnackedMessageTracker : IDisposable

void Ack(MessageId messageId);

Task Start(IConsumer consumer);
Task Start(IConsumer consumer, CancellationToken cancellationToken = default);
}
}
3 changes: 2 additions & 1 deletion src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace DotPulsar.Internal
{
using System.Threading;
using System.Threading.Tasks;
using Abstractions;
using DotPulsar.Abstractions;
Expand All @@ -20,7 +21,7 @@ public void Add(MessageId messageId)
return;
}

public Task Start(IConsumer consumer) => Task.CompletedTask;
public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask;

public void Dispose() {
return;
Expand Down
2 changes: 1 addition & 1 deletion src/DotPulsar/Internal/ReaderChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private async ValueTask<IReaderChannel> GetChannel(CancellationToken cancellatio
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var tracker = new InactiveMessageAcksTracker(); // No tracker for reader since readers don't ack.
var tracker = new InactiveUnackedMessageTracker(); // No tracker for reader since readers don't ack.
var consumerMessageQueue = new MessageQueue(messageQueue, tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
Expand Down
26 changes: 12 additions & 14 deletions src/DotPulsar/Internal/UnackedMessageTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@
public readonly struct AwaitingAck
{
public MessageId MessageId { get; }
public long Timestamp { get; }
public Stopwatch Stopwatch { get; }

public AwaitingAck(MessageId messageId)
{
MessageId = messageId;
Timestamp = Stopwatch.GetTimestamp();
Stopwatch = Stopwatch.StartNew();
}

public TimeSpan Elapsed =>
TimeSpan.FromTicks(
(Stopwatch.GetTimestamp() - Timestamp) /
(Stopwatch.Frequency) * 1000);
public TimeSpan Elapsed => Stopwatch.Elapsed;
}

public sealed class UnackedMessageTracker : IUnackedMessageTracker
Expand Down Expand Up @@ -57,29 +54,30 @@ public void Ack(MessageId messageId)
_acked.Add(messageId);
}

public Task Start(IConsumer consumer)
public Task Start(IConsumer consumer, CancellationToken cancellationToken = default)
{
var cancellationToken = _cancellationTokenSource.Token;
CancellationToken token =
CancellationTokenSource.CreateLinkedTokenSource(
_cancellationTokenSource.Token, cancellationToken).Token;

return Task.Run(async () => {
while (!cancellationToken.IsCancellationRequested)
while (!token.IsCancellationRequested)
{
var messages = CheckUnackedMessages();

if (messages.Count() > 0)
await consumer.RedeliverUnacknowledgedMessages(messages, cancellationToken);
await consumer.RedeliverUnacknowledgedMessages(messages, token);

await Task.Delay(_pollingTimeout, cancellationToken);
await Task.Delay(_pollingTimeout, token);
}
}, cancellationToken);
}, token);
}

private IEnumerable<MessageId> CheckUnackedMessages()
{
AwaitingAck awaiting;
var result = new List<MessageId>();

while (_awaitingAcks.TryPeek(out awaiting)
while (_awaitingAcks.TryPeek(out AwaitingAck awaiting)
&& awaiting.Elapsed > _ackTimeout)
{
// Can I safely use Dequeue now instead of TryDequeue?
Expand Down
2 changes: 2 additions & 0 deletions tests/DotPulsar.Tests/DotPulsar.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="AutoFixture" Version="4.15.0" />
<PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.15.0" />
</ItemGroup>

<ItemGroup>
Expand Down
164 changes: 159 additions & 5 deletions tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,172 @@
namespace DotPulsar.Tests.Internal
{
using DotPulsar.Internal;
using DotPulsar.Abstractions;
using FluentAssertions;
using System.Buffers;
using System.Linq;
using Xunit;
using System;
using AutoFixture;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using AutoFixture.AutoNSubstitute;
using NSubstitute;
using System.Diagnostics;

public class MessageAcksTrackerTests
public class UnackedMessageTrackerTests
{
[Fact]
public void Test_Instance()
{
var tracker = new MessageAcksTracker(1, 2, 3);
tracker.Should().BeOfType<MessageAcksTracker>();
var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
tracker.Should().BeOfType<UnackedMessageTracker>();
}


[Fact]
public async void Test_AwaitingAck_Elapsed()
{
//Arrange
var messageId = MessageId.Latest;
var sw = new Stopwatch();
sw.Start();

//Act
var awaiting = new AwaitingAck(messageId);
await Task.Delay(TimeSpan.FromMilliseconds(123));
sw.Stop();

//Assert
awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1);
}

[Fact]
public async void Test_Start_Message()
{
//Arrange
var fixture = new Fixture();
fixture.Customize(new AutoNSubstituteCustomization());
var consumer = Substitute.For<IConsumer>();
var messageId = MessageId.Latest;
var cts = new CancellationTokenSource();


var tracker = new UnackedMessageTracker(
TimeSpan.FromMilliseconds(10),
TimeSpan.FromMilliseconds(1));

//Act
tracker.Add(messageId);
cts.CancelAfter(20);
try { await tracker.Start(consumer, cts.Token); }
catch (TaskCanceledException) { }

//Assert
await consumer
.Received(1)
.RedeliverUnacknowledgedMessages(
Arg.Is(EquivalentTo(new List<MessageId>() { messageId })),
Arg.Any<CancellationToken>());
}

[Fact]
public async void Test_Start_Message_Ack_In_Time()
{
//Arrange
var fixture = new Fixture();
fixture.Customize(new AutoNSubstituteCustomization());
var consumer = Substitute.For<IConsumer>();
var messageId = MessageId.Latest;
var cts = new CancellationTokenSource();


var tracker = new UnackedMessageTracker(
TimeSpan.FromMilliseconds(10),
TimeSpan.FromMilliseconds(1));

//Act
tracker.Add(messageId);
cts.CancelAfter(20);
var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId));
try { await tracker.Start(consumer, cts.Token); }
catch (TaskCanceledException) { }

//Assert
await consumer
.DidNotReceive()
.RedeliverUnacknowledgedMessages(
Arg.Any<IEnumerable<MessageId>>(),
Arg.Any<CancellationToken>());
}

[Fact]
public async void Test_Start_Message_Ack_Too_Late()
{
//Arrange
var fixture = new Fixture();
fixture.Customize(new AutoNSubstituteCustomization());
var consumer = Substitute.For<IConsumer>();
var messageId = MessageId.Latest;
var cts = new CancellationTokenSource();


var tracker = new UnackedMessageTracker(
TimeSpan.FromMilliseconds(10),
TimeSpan.FromMilliseconds(1));

//Act
tracker.Add(messageId);
cts.CancelAfter(20);

var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId));
try { await tracker.Start(consumer, cts.Token); }
catch (TaskCanceledException) { }

//Assert
await consumer
.Received(1)
.RedeliverUnacknowledgedMessages(
Arg.Any<IEnumerable<MessageId>>(),
Arg.Any<CancellationToken>());
}

[Fact]
public async void Test_Start_Redeliver_Only_Cnce()
{
//Arrange
var fixture = new Fixture();
fixture.Customize(new AutoNSubstituteCustomization());
var consumer = Substitute.For<IConsumer>();
var messageId = MessageId.Latest;
var cts = new CancellationTokenSource();


var tracker = new UnackedMessageTracker(
TimeSpan.FromMilliseconds(10),
TimeSpan.FromMilliseconds(5));

//Act
tracker.Add(messageId);
cts.CancelAfter(50);
try { await tracker.Start(consumer, cts.Token); }
catch (TaskCanceledException) { }

//Assert
await consumer
.Received(1)
.RedeliverUnacknowledgedMessages(
Arg.Any<IEnumerable<MessageId>>(),
Arg.Any<CancellationToken>());
}


private Expression<Predicate<IEnumerable<T>>> EquivalentTo<T>(IEnumerable<T> enumerable) =>
x => IsEquivalentIEnumerable(enumerable, x);


private bool IsEquivalentIEnumerable<T>(IEnumerable<T> a, IEnumerable<T> b) =>
a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _);
}
}

0 comments on commit 26cd957

Please sign in to comment.