diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs index 723baffcd..c5fd66745 100644 --- a/src/DotPulsar/Internal/Producer.cs +++ b/src/DotPulsar/Internal/Producer.cs @@ -215,7 +215,7 @@ private async ValueTask ChoosePartitions(MessageMetadata metadata, Cancella public async ValueTask Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); - var registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); + using var registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); ValueTask OnMessageSent(MessageId messageId) { @@ -227,21 +227,25 @@ ValueTask OnMessageSent(MessageId messageId) #endif } - try - { - await InternalSend(metadata, message, true, OnMessageSent, x => tcs.TrySetException(x), cancellationToken).ConfigureAwait(false); - return await tcs.Task.ConfigureAwait(false); - } - finally - { - registration.Dispose(); - } + await InternalSend(metadata, message, true, tcs, OnMessageSent, x => tcs.TrySetException(x), cancellationToken).ConfigureAwait(false); + return await tcs.Task.ConfigureAwait(false); } - public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, Func? onMessageSent = default, CancellationToken cancellationToken = default) - => await InternalSend(metadata, message, false, onMessageSent, cancellationToken: cancellationToken).ConfigureAwait(false); - - private async ValueTask InternalSend(MessageMetadata metadata, TMessage message, bool sendOpCancelable, Func? onMessageSent = default, Action? onFailed = default, CancellationToken cancellationToken = default) + public async ValueTask Enqueue( + MessageMetadata metadata, + TMessage message, + Func? onMessageSent = default, + CancellationToken cancellationToken = default) + => await InternalSend(metadata, message, false, null, onMessageSent, cancellationToken: cancellationToken).ConfigureAwait(false); + + private async ValueTask InternalSend( + MessageMetadata metadata, + TMessage message, + bool sendOpCancelable, + TaskCompletionSource? tcs = default, + Func? onMessageSent = default, + Action? onFailed = default, + CancellationToken cancellationToken = default) { ThrowIfDisposed(); @@ -265,8 +269,9 @@ private async ValueTask InternalSend(MessageMetadata metadata, TMessage message, var subProducer = _producers[partition]; var data = _options.Schema.Encode(message); - var tcs = new TaskCompletionSource(); - await subProducer.Send(new SendOp(metadata.Metadata, data, tcs, sendOpCancelable ? cancellationToken : CancellationToken.None), cancellationToken).ConfigureAwait(false); + tcs ??= new TaskCompletionSource(); + var sendOp = new SendOp(metadata.Metadata, data, tcs, sendOpCancelable ? cancellationToken : CancellationToken.None); + await subProducer.Send(sendOp, cancellationToken).ConfigureAwait(false); _ = tcs.Task.ContinueWith(async task => { @@ -281,7 +286,15 @@ private async ValueTask InternalSend(MessageMetadata metadata, TMessage message, if (autoAssignSequenceId) metadata.SequenceId = 0; - onFailed?.Invoke(exception); + try + { + onFailed?.Invoke(exception); + } + catch + { + // Ignore + } + return; } diff --git a/src/DotPulsar/Internal/SubProducer.cs b/src/DotPulsar/Internal/SubProducer.cs index 188d6b34d..6924a4e9f 100644 --- a/src/DotPulsar/Internal/SubProducer.cs +++ b/src/DotPulsar/Internal/SubProducer.cs @@ -118,50 +118,45 @@ internal async ValueTask WaitForSendQueueEmpty(CancellationToken cancellationTok private async Task MessageDispatcher(IProducerChannel channel, CancellationToken cancellationToken) { - var responseQueue = new AsyncQueue>(); - var responseProcessorTask = ResponseProcessor(responseQueue, cancellationToken); + using var responseQueue = new AsyncQueue>(); + var responseProcessorTask = Task.Run(async () => await ResponseProcessor(responseQueue, cancellationToken)); - try + _sendQueue.ResetCursor(); + + while (!cancellationToken.IsCancellationRequested) { - while (!cancellationToken.IsCancellationRequested) + var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false); + + if (sendOp.CancellationToken.IsCancellationRequested) { - var sendOp = await _sendQueue.NextItem(cancellationToken).ConfigureAwait(false); + _sendQueue.RemoveCurrentItem(); + continue; + } - if (sendOp.CancellationToken.IsCancellationRequested) + var tcs = new TaskCompletionSource(); + _ = tcs.Task.ContinueWith(task => + { + try { - _sendQueue.RemoveCurrentItem(); - continue; + responseQueue.Enqueue(task); } - - var tcs = new TaskCompletionSource(); - _ = tcs.Task.ContinueWith(task => + catch { - try - { - responseQueue.Enqueue(task); - } - catch - { - // Ignore - } - }, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously); - - // Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry. - var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false); - - if (success) - continue; + // Ignore + } + }, TaskContinuationOptions.NotOnCanceled | TaskContinuationOptions.ExecuteSynchronously); + + // Use CancellationToken.None here because otherwise it will throw exceptions on all fault actions even retry. + var success = await _executor.TryExecuteOnce(() => channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), CancellationToken.None).ConfigureAwait(false); + if (!success) + { _eventRegister.Register(new ChannelDisconnected(_correlationId)); break; } - - await responseProcessorTask.ConfigureAwait(false); - } - finally - { - responseQueue.Dispose(); } + + await responseProcessorTask.ConfigureAwait(false); } private async ValueTask ResponseProcessor(IDequeue> responseQueue, CancellationToken cancellationToken) @@ -210,9 +205,10 @@ public async Task EstablishNewChannel(CancellationToken cancellationToken) { try { - if (_dispatcherCts is not null && !_dispatcherCts.IsCancellationRequested) + if (_dispatcherCts is not null) { - _dispatcherCts.Cancel(); + if (!_dispatcherCts.IsCancellationRequested) + _dispatcherCts.Cancel(); _dispatcherCts.Dispose(); } } @@ -236,15 +232,12 @@ public async Task EstablishNewChannel(CancellationToken cancellationToken) _channel = await _executor.Execute(() => _factory.Create(_topicEpoch, cancellationToken), cancellationToken).ConfigureAwait(false); } - public async Task ActivateChannel(ulong? topicEpoch, CancellationToken cancellationToken) + public Task ActivateChannel(ulong? topicEpoch, CancellationToken cancellationToken) { _topicEpoch ??= topicEpoch; _dispatcherCts = new CancellationTokenSource(); - await _executor.Execute(() => - { - _sendQueue.ResetCursor(); - _dispatcherTask = MessageDispatcher(_channel, _dispatcherCts.Token); - }, cancellationToken).ConfigureAwait(false); + _dispatcherTask = Task.Run(async () => await MessageDispatcher(_channel, _dispatcherCts.Token)); + return Task.CompletedTask; } public async ValueTask CloseChannel(CancellationToken cancellationToken) diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 010d0540d..ec8627ef6 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -26,7 +26,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs index e5d51d1cf..911c14738 100644 --- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs +++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs @@ -96,7 +96,7 @@ public async Task GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarl //Arrange await using var client = CreateClient(); await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token)); - var expected = new List() { MessageId.Earliest }; + var expected = new List { MessageId.Earliest }; //Act var actual = await consumer.GetLastMessageIds(_cts.Token); @@ -160,11 +160,11 @@ public async Task Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFau await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token)); - var receiveTask = consumer.Receive(_cts.Token).AsTask(); + var receiveTask = consumer.Receive(_cts.Token); semaphoreSlim.Release(); //Act - var exception = await Record.ExceptionAsync(() => receiveTask); + var exception = await Record.ExceptionAsync(receiveTask.AsTask); //Assert exception.Should().BeOfType(); @@ -186,14 +186,14 @@ public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFa await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token); //Act - var exception = await Record.ExceptionAsync(() => consumer.Receive(_cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask); //Assert exception.Should().BeOfType(); } - [Fact(Skip = "Skip for now")] - public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain() + [Fact] + public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain() { //Arrange var topicName = await _fixture.CreateTopic(_cts.Token); @@ -210,7 +210,7 @@ public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_Sh await ProduceMessages(producer, 1, "test-message", _cts.Token); //Act - var exception = await Record.ExceptionAsync(async () => await receiveTask.AsTask()); + var exception = await Record.ExceptionAsync(receiveTask.AsTask); //Assert exception.Should().BeNull(); @@ -230,7 +230,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeA //Act await connectionDown.DisposeAsync(); await consumer.StateChangedTo(ConsumerState.Active, _cts.Token); - var exception = await Record.ExceptionAsync(() => consumer.Receive(_cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask); //Assert exception.Should().BeNull(); @@ -245,7 +245,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispo await using var consumer = CreateConsumer(client, await _fixture.CreateTopic(_cts.Token)); //Act - var exception = await Record.ExceptionAsync(() => consumer.DisposeAsync().AsTask()); + var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask); //Assert exception.Should().BeNull(); @@ -262,7 +262,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAb //Act await using var connectionDown = await _fixture.DisableThePulsarConnection(); await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token); - var exception = await Record.ExceptionAsync(() => consumer.DisposeAsync().AsTask()); + var exception = await Record.ExceptionAsync(consumer.DisposeAsync().AsTask); //Assert exception.Should().BeNull(); @@ -285,7 +285,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBe await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token); } await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token); - var exception = await Record.ExceptionAsync(() => consumer.Receive(_cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask); //Assert exception.Should().BeNull(); diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs b/tests/DotPulsar.Tests/Internal/ProducerTests.cs index 7e0f8f4bc..39a8c8954 100644 --- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs +++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs @@ -268,8 +268,8 @@ public async Task Send_WhenProducingMessagesForFourPartitions_ShouldPartitionBeD foundNonNegativeOne.Should().Be(true); } - [Fact(Skip = "Skip for now")] - public async Task Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeAbleToSendWhileDown() + [Fact] + public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToSendWhileDown() { //Arrange var topicName = await _fixture.CreateTopic(_cts.Token); @@ -286,7 +286,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeA await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token); //Act - var exception = await Record.ExceptionAsync(async () => await sendTask.AsTask()); + var exception = await Record.ExceptionAsync(sendTask.AsTask); //Assert exception.Should().BeNull(); @@ -301,7 +301,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispo var producer = CreateProducer(client, await _fixture.CreateTopic(_cts.Token)); //Act - var exception = await Record.ExceptionAsync(() => producer.DisposeAsync().AsTask()); + var exception = await Record.ExceptionAsync(producer.DisposeAsync().AsTask); //Assert exception.Should().BeNull(); @@ -318,7 +318,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeA //Act await connectionDown.DisposeAsync(); await producer.StateChangedTo(ProducerState.Connected, _cts.Token); - var exception = await Record.ExceptionAsync(() => producer.Send("test", _cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(producer.Send("test", _cts.Token).AsTask); //Assert exception.Should().BeNull(); @@ -335,7 +335,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAb //Act await using var connectionDown = await _fixture.DisableThePulsarConnection(); await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token); - var exception = await Record.ExceptionAsync(() => producer.DisposeAsync().AsTask()); + var exception = await Record.ExceptionAsync(producer.DisposeAsync().AsTask); //Assert exception.Should().BeNull(); @@ -355,7 +355,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBe await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token); } await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token); - var exception = await Record.ExceptionAsync(() => producer.Send("test", _cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(producer.Send("test", _cts.Token).AsTask); //Assert exception.Should().BeNull(); diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs b/tests/DotPulsar.Tests/Internal/ReaderTests.cs index 504b679c9..4b3129415 100644 --- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs +++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs @@ -39,7 +39,7 @@ public async Task GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarl //Arrange await using var client = CreateClient(); await using var reader = CreateReader(client, MessageId.Earliest, await _fixture.CreateTopic(_cts.Token)); - var expected = new List() { MessageId.Earliest }; + var expected = new List { MessageId.Earliest }; //Act var actual = await reader.GetLastMessageIds(_cts.Token); @@ -206,14 +206,14 @@ public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowReaderFaul await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token); //Act - var exception = await Record.ExceptionAsync(() => reader.Receive(_cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask); //Assert exception.Should().BeOfType(); } - [Fact(Skip = "Skip for now")] - public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain() + [Fact] + public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain() { //Arrange var topicName = await _fixture.CreateTopic(_cts.Token); @@ -230,7 +230,7 @@ public async Task Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_Sh await producer.Send("test-message", _cts.Token); //Act - var exception = await Record.ExceptionAsync(async () => await receiveTask.AsTask()); + var exception = await Record.ExceptionAsync(receiveTask.AsTask); //Assert exception.Should().BeNull(); @@ -250,7 +250,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDownAndComesUp_ShouldBeA //Act await connectionDown.DisposeAsync(); await reader.StateChangedTo(ReaderState.Connected, _cts.Token); - var exception = await Record.ExceptionAsync(() => reader.Receive(_cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask); //Assert exception.Should().BeNull(); @@ -265,7 +265,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyDown_ShouldBeAbleToDispo var reader = CreateReader(client, MessageId.Earliest, await _fixture.CreateTopic(_cts.Token)); //Act - var exception = await Record.ExceptionAsync(() => reader.DisposeAsync().AsTask()); + var exception = await Record.ExceptionAsync(reader.DisposeAsync().AsTask); //Assert exception.Should().BeNull(); @@ -282,7 +282,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAb //Act await using var connectionDown = await _fixture.DisableThePulsarConnection(); await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token); - var exception = await Record.ExceptionAsync(() => reader.DisposeAsync().AsTask()); + var exception = await Record.ExceptionAsync(reader.DisposeAsync().AsTask); //Assert exception.Should().BeNull(); @@ -305,7 +305,7 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBe await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token); } await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token); - var exception = await Record.ExceptionAsync(() => reader.Receive(_cts.Token).AsTask()); + var exception = await Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask); //Assert exception.Should().BeNull();