From 71ef474d370cd28643968837707af140fb4e579d Mon Sep 17 00:00:00 2001 From: Daniel Blankensteiner Date: Wed, 3 Apr 2024 15:25:30 +0200 Subject: [PATCH] Updated NuGet packages and fixed consumers/readers sometimes throwing an exception if disposed right after being created --- benchmarks/Compression/Compression.csproj | 6 +-- src/DotPulsar/DotPulsar.csproj | 2 +- src/DotPulsar/Internal/Consumer.cs | 51 +++++++++++--------- src/DotPulsar/Internal/Reader.cs | 49 ++++++++++--------- tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 8 +-- 5 files changed, 61 insertions(+), 55 deletions(-) diff --git a/benchmarks/Compression/Compression.csproj b/benchmarks/Compression/Compression.csproj index 679db9082..f7d9f273a 100644 --- a/benchmarks/Compression/Compression.csproj +++ b/benchmarks/Compression/Compression.csproj @@ -10,15 +10,15 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + - + diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index 62b31380c..23570d156 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -24,7 +24,7 @@ - + diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs index c799aac71..55da102ac 100644 --- a/src/DotPulsar/Internal/Consumer.cs +++ b/src/DotPulsar/Internal/Consumer.cs @@ -172,7 +172,10 @@ public async ValueTask DisposeAsync() await _processManager.DisposeAsync().ConfigureAwait(false); foreach (var subConsumer in _subConsumers) - await subConsumer.DisposeAsync().ConfigureAwait(false); + { + if (subConsumer is not null) + await subConsumer.DisposeAsync().ConfigureAwait(false); + } await _lock.DisposeAsync().ConfigureAwait(false); _state.SetState(ConsumerState.Closed); @@ -187,32 +190,32 @@ public async ValueTask> Receive(CancellationToken cancellatio var iterations = 0; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) - while (true) - { - iterations++; - _subConsumerIndex++; - if (_subConsumerIndex == _subConsumers.Length) - _subConsumerIndex = 0; - - var receiveTask = _receiveTasks[_subConsumerIndex]; - if (receiveTask == _emptyTaskCompletionSource.Task) + while (true) { - var receiveTaskValueTask = _subConsumers[_subConsumerIndex].Receive(cancellationToken); - if (receiveTaskValueTask.IsCompleted) - return receiveTaskValueTask.Result; - _receiveTasks[_subConsumerIndex] = receiveTaskValueTask.AsTask(); - } - else - { - if (receiveTask.IsCompleted) + iterations++; + _subConsumerIndex++; + if (_subConsumerIndex == _subConsumers.Length) + _subConsumerIndex = 0; + + var receiveTask = _receiveTasks[_subConsumerIndex]; + if (receiveTask == _emptyTaskCompletionSource.Task) { - _receiveTasks[_subConsumerIndex] = _emptyTaskCompletionSource.Task; - return receiveTask.Result; + var receiveTaskValueTask = _subConsumers[_subConsumerIndex].Receive(cancellationToken); + if (receiveTaskValueTask.IsCompleted) + return receiveTaskValueTask.Result; + _receiveTasks[_subConsumerIndex] = receiveTaskValueTask.AsTask(); } + else + { + if (receiveTask.IsCompleted) + { + _receiveTasks[_subConsumerIndex] = _emptyTaskCompletionSource.Task; + return receiveTask.Result; + } + } + if (iterations == _subConsumers.Length) + await Task.WhenAny(_receiveTasks).ConfigureAwait(false); } - if (iterations == _subConsumers.Length) - await Task.WhenAny(_receiveTasks).ConfigureAwait(false); - } } public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken) @@ -355,7 +358,7 @@ public async ValueTask> GetLastMessageIds(CancellationTok await Guard(cancellationToken).ConfigureAwait(false); if (!_isPartitionedTopic) - return new[] { await _subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false) }; + return [await _subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)]; var getLastMessageIdsTasks = new List>(_numberOfPartitions); diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs index e1d88a440..910e0c0ce 100644 --- a/src/DotPulsar/Internal/Reader.cs +++ b/src/DotPulsar/Internal/Reader.cs @@ -188,32 +188,32 @@ public async ValueTask> Receive(CancellationToken cancellatio var iterations = 0; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) - while (true) - { - iterations++; - _subReaderIndex++; - if (_subReaderIndex == _subReaders.Length) - _subReaderIndex = 0; - - var receiveTask = _receiveTasks[_subReaderIndex]; - if (receiveTask == _emptyTaskCompletionSource.Task) - { - var receiveTaskValueTask = _subReaders[_subReaderIndex].Receive(cancellationToken); - if (receiveTaskValueTask.IsCompleted) - return receiveTaskValueTask.Result; - _receiveTasks[_subReaderIndex] = receiveTaskValueTask.AsTask(); - } - else + while (true) { - if (receiveTask.IsCompleted) + iterations++; + _subReaderIndex++; + if (_subReaderIndex == _subReaders.Length) + _subReaderIndex = 0; + + var receiveTask = _receiveTasks[_subReaderIndex]; + if (receiveTask == _emptyTaskCompletionSource.Task) + { + var receiveTaskValueTask = _subReaders[_subReaderIndex].Receive(cancellationToken); + if (receiveTaskValueTask.IsCompleted) + return receiveTaskValueTask.Result; + _receiveTasks[_subReaderIndex] = receiveTaskValueTask.AsTask(); + } + else { - _receiveTasks[_subReaderIndex] = _emptyTaskCompletionSource.Task; - return receiveTask.Result; + if (receiveTask.IsCompleted) + { + _receiveTasks[_subReaderIndex] = _emptyTaskCompletionSource.Task; + return receiveTask.Result; + } } + if (iterations == _subReaders.Length) + await Task.WhenAny(_receiveTasks).ConfigureAwait(false); } - if (iterations == _subReaders.Length) - await Task.WhenAny(_receiveTasks).ConfigureAwait(false); - } } public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken) @@ -265,7 +265,10 @@ public async ValueTask DisposeAsync() await _processManager.DisposeAsync().ConfigureAwait(false); foreach (var subReader in _subReaders) - await subReader.DisposeAsync().ConfigureAwait(false); + { + if (subReader is not null) + await subReader.DisposeAsync().ConfigureAwait(false); + } await _lock.DisposeAsync().ConfigureAwait(false); _state.SetState(ReaderState.Closed); diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 4e6221a0e..4bc2f4443 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -21,22 +21,22 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive - +