Skip to content

Commit

Permalink
Fixed issue with producer not sending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
blankensteiner committed Jan 16, 2024
1 parent a8ef06a commit 6bcd6e8
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 85 deletions.
47 changes: 30 additions & 17 deletions src/DotPulsar/Internal/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private async ValueTask<int> ChoosePartitions(MessageMetadata metadata, Cancella
public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<MessageId>();
var registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
using var registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));

ValueTask OnMessageSent(MessageId messageId)
{
Expand All @@ -227,21 +227,25 @@ ValueTask OnMessageSent(MessageId messageId)
#endif
}

try
{
await InternalSend(metadata, message, true, OnMessageSent, x => tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
return await tcs.Task.ConfigureAwait(false);
}
finally
{
registration.Dispose();
}
await InternalSend(metadata, message, true, tcs, OnMessageSent, x => tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
return await tcs.Task.ConfigureAwait(false);
}

public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken cancellationToken = default)
=> await InternalSend(metadata, message, false, onMessageSent, cancellationToken: cancellationToken).ConfigureAwait(false);

private async ValueTask InternalSend(MessageMetadata metadata, TMessage message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent = default, Action<Exception>? onFailed = default, CancellationToken cancellationToken = default)
public async ValueTask Enqueue(
MessageMetadata metadata,
TMessage message,
Func<MessageId, ValueTask>? onMessageSent = default,
CancellationToken cancellationToken = default)
=> await InternalSend(metadata, message, false, null, onMessageSent, cancellationToken: cancellationToken).ConfigureAwait(false);

private async ValueTask InternalSend(
MessageMetadata metadata,
TMessage message,
bool sendOpCancelable,
TaskCompletionSource<MessageId>? tcs = default,
Func<MessageId, ValueTask>? onMessageSent = default,
Action<Exception>? onFailed = default,
CancellationToken cancellationToken = default)
{
ThrowIfDisposed();

Expand All @@ -265,8 +269,9 @@ private async ValueTask InternalSend(MessageMetadata metadata, TMessage message,
var subProducer = _producers[partition];
var data = _options.Schema.Encode(message);

var tcs = new TaskCompletionSource<MessageId>();
await subProducer.Send(new SendOp(metadata.Metadata, data, tcs, sendOpCancelable ? cancellationToken : CancellationToken.None), cancellationToken).ConfigureAwait(false);
tcs ??= new TaskCompletionSource<MessageId>();
var sendOp = new SendOp(metadata.Metadata, data, tcs, sendOpCancelable ? cancellationToken : CancellationToken.None);
await subProducer.Send(sendOp, cancellationToken).ConfigureAwait(false);

_ = tcs.Task.ContinueWith(async task =>
{
Expand All @@ -281,7 +286,15 @@ private async ValueTask InternalSend(MessageMetadata metadata, TMessage message,
if (autoAssignSequenceId)
metadata.SequenceId = 0;

onFailed?.Invoke(exception);
try
{
onFailed?.Invoke(exception);
}
catch
{
// Ignore
}

return;
}

Expand Down
73 changes: 33 additions & 40 deletions src/DotPulsar/Internal/SubProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,50 +118,45 @@ internal async ValueTask WaitForSendQueueEmpty(CancellationToken cancellationTok

private async Task MessageDispatcher(IProducerChannel channel, CancellationToken cancellationToken)
{
var responseQueue = new AsyncQueue<Task<BaseCommand>>();
var responseProcessorTask = ResponseProcessor(responseQueue, cancellationToken);
using var responseQueue = new AsyncQueue<Task<BaseCommand>>();
var responseProcessorTask = Task.Run(async () => await ResponseProcessor(responseQueue, cancellationToken));

try
_sendQueue.ResetCursor();

while (!cancellationToken.IsCancellationRequested)
{
while (!cancellationToken.IsCancellationRequested)
var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false);

if (sendOp.CancellationToken.IsCancellationRequested)
{
var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
_sendQueue.RemoveCurrentItem();
continue;
}

if (sendOp.CancellationToken.IsCancellationRequested)
var tcs = new TaskCompletionSource<BaseCommand>();
_ = tcs.Task.ContinueWith(task =>
{
try
{
_sendQueue.RemoveCurrentItem();
continue;
responseQueue.Enqueue(task);
}

var tcs = new TaskCompletionSource<BaseCommand>();
_ = tcs.Task.ContinueWith(task =>
catch
{
try
{
responseQueue.Enqueue(task);
}
catch
{
// Ignore
}
}, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously);

// Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry.
var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false);

if (success)
continue;
// Ignore
}
}, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously);

// Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry.
var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false);

if (!success)
{
_eventRegister.Register(new ChannelDisconnected(_correlationId));
break;
}

await responseProcessorTask.ConfigureAwait(false);
}
finally
{
responseQueue.Dispose();
}

await responseProcessorTask.ConfigureAwait(false);
}

private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>> responseQueue, CancellationToken cancellationToken)
Expand Down Expand Up @@ -210,9 +205,10 @@ public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
try
{
if (_dispatcherCts is not null && !_dispatcherCts.IsCancellationRequested)
if (_dispatcherCts is not null)
{
_dispatcherCts.Cancel();
if (!_dispatcherCts.IsCancellationRequested)
_dispatcherCts.Cancel();
_dispatcherCts.Dispose();
}
}
Expand All @@ -236,15 +232,12 @@ public async Task EstablishNewChannel(CancellationToken cancellationToken)
_channel = await _executor.Execute(() => _factory.Create(_topicEpoch, cancellationToken), cancellationToken).ConfigureAwait(false);
}

public async Task ActivateChannel(ulong? topicEpoch, CancellationToken cancellationToken)
public Task ActivateChannel(ulong? topicEpoch, CancellationToken cancellationToken)
{
_topicEpoch ??= topicEpoch;
_dispatcherCts = new CancellationTokenSource();
await _executor.Execute(() =>
{
_sendQueue.ResetCursor();
_dispatcherTask = MessageDispatcher(_channel, _dispatcherCts.Token);
}, cancellationToken).ConfigureAwait(false);
_dispatcherTask = Task.Run(async () => await MessageDispatcher(_channel, _dispatcherCts.Token));
return Task.CompletedTask;
}

public async ValueTask CloseChannel(CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion tests/DotPulsar.Tests/DotPulsar.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="Testcontainers" Version="3.7.0" />
<PackageReference Include="ToxiproxyNetCore" Version="1.0.35" />
<PackageReference Include="xunit" Version="2.6.5" />
<PackageReference Include="xunit" Version="2.6.6" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
22 changes: 11 additions & 11 deletions tests/DotPulsar.Tests/Internal/ConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public async Task GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarl
//Arrange
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));
var expected = new List<MessageId>() { MessageId.Earliest };
var expected = new List<MessageId> { MessageId.Earliest };

//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
Expand Down Expand Up @@ -160,11 +160,11 @@ public async Task Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFau

await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));

var receiveTask = consumer.Receive(_cts.Token).AsTask();
var receiveTask = consumer.Receive(_cts.Token);
semaphoreSlim.Release();

//Act
var exception = await Record.ExceptionAsync(() => receiveTask);
var exception = await Record.ExceptionAsync(receiveTask.AsTask);

//Assert
exception.Should().BeOfType<ConsumerFaultedException>();
Expand All @@ -186,14 +186,14 @@ public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFa
await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);

//Act
var exception = await Record.ExceptionAsync(() => consumer.Receive(_cts.Token).AsTask());
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);

//Assert
exception.Should().BeOfType<ConsumerFaultedException>();
}

[Fact(Skip = "Skip for now")]
public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
[Fact]
public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
Expand All @@ -210,7 +210,7 @@ public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_Sh
await ProduceMessages(producer, 1, "test-message", _cts.Token);

//Act
var exception = await Record.ExceptionAsync(async () => await receiveTask.AsTask());
var exception = await Record.ExceptionAsync(receiveTask.AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -230,7 +230,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeA
//Act
await connectionDown.DisposeAsync();
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
var exception = await Record.ExceptionAsync(() => consumer.Receive(_cts.Token).AsTask());
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -245,7 +245,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispo
await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token));

//Act
var exception = await Record.ExceptionAsync(() => consumer.DisposeAsync().AsTask());
var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -262,7 +262,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAb
//Act
await using var connectionDown = await _fixture.DisableThePulsarConnection();
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
var exception = await Record.ExceptionAsync(() => consumer.DisposeAsync().AsTask());
var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -285,7 +285,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBe
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
}
await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
var exception = await Record.ExceptionAsync(() => consumer.Receive(_cts.Token).AsTask());
var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);

//Assert
exception.Should().BeNull();
Expand Down
14 changes: 7 additions & 7 deletions tests/DotPulsar.Tests/Internal/ProducerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ public async Task Send_WhenProducingMessagesForFourPartitions_ShouldPartitionBeD
foundNonNegativeOne.Should().Be(true);
}

[Fact(Skip = "Skip for now")]
public async Task Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeAbleToSendWhileDown()
[Fact]
public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToSendWhileDown()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
Expand All @@ -286,7 +286,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeA
await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);

//Act
var exception = await Record.ExceptionAsync(async () => await sendTask.AsTask());
var exception = await Record.ExceptionAsync(sendTask.AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -301,7 +301,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispo
var producer = CreateProducer(client, await _fixture.CreateTopic(_cts.Token));

//Act
var exception = await Record.ExceptionAsync(() => producer.DisposeAsync().AsTask());
var exception = await Record.ExceptionAsync(producer.DisposeAsync().AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -318,7 +318,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeA
//Act
await connectionDown.DisposeAsync();
await producer.StateChangedTo(ProducerState.Connected, _cts.Token);
var exception = await Record.ExceptionAsync(() => producer.Send("test", _cts.Token).AsTask());
var exception = await Record.ExceptionAsync(producer.Send("test", _cts.Token).AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -335,7 +335,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAb
//Act
await using var connectionDown = await _fixture.DisableThePulsarConnection();
await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token);
var exception = await Record.ExceptionAsync(() => producer.DisposeAsync().AsTask());
var exception = await Record.ExceptionAsync(producer.DisposeAsync().AsTask);

//Assert
exception.Should().BeNull();
Expand All @@ -355,7 +355,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBe
await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token);
}
await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
var exception = await Record.ExceptionAsync(() => producer.Send("test", _cts.Token).AsTask());
var exception = await Record.ExceptionAsync(producer.Send("test", _cts.Token).AsTask);

//Assert
exception.Should().BeNull();
Expand Down
Loading

0 comments on commit 6bcd6e8

Please sign in to comment.