From 53c5b1bc17bc868858cdde3b5f337c3eb301831b Mon Sep 17 00:00:00 2001 From: Daniel Blankensteiner Date: Thu, 10 Oct 2024 12:52:34 +0200 Subject: [PATCH] Added error details on exceptions from the connector and fixed issue with deadlocked DisposeAsync on consumers, readers and producers --- CHANGELOG.md | 9 ++++- benchmarks/Compression/Compression.csproj | 2 +- samples/Processing/Processing.csproj | 2 +- src/DotPulsar/DotPulsar.csproj | 2 +- src/DotPulsar/Internal/AsyncLock.cs | 45 +++++++++++++++-------- src/DotPulsar/Internal/AsyncQueue.cs | 28 +++++++++++--- src/DotPulsar/Internal/Connector.cs | 14 ++++++- 7 files changed, 74 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a98ab709f..328db4d23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Added - Multiple messages can now be acknowledged with Acknowledge(IEnumerable\ messageIds, CancellationToken cancellationToken) -- ProcessingOptions has a new ShutdownGracePeriod property for doing a graceful shutdown by allowing active tasks to finish +- ProcessingOptions has a new ShutdownGracePeriod property for doing a graceful shutdown by allowing active tasks to finish ### Changed -- Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to 8.0.8 +- Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to 8.0.10 +- 'SslPolicyErrors' are added to the 'Data' property of the exception thrown when failing to connect + +- ### Fixed + +- When disposing producers, consumers, or readers 'DisposeAsync' would sometimes hang ## [3.3.2] - 2024-08-07 diff --git a/benchmarks/Compression/Compression.csproj b/benchmarks/Compression/Compression.csproj index 41f8afb12..36a5f6b46 100644 --- a/benchmarks/Compression/Compression.csproj +++ b/benchmarks/Compression/Compression.csproj @@ -11,7 +11,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/samples/Processing/Processing.csproj b/samples/Processing/Processing.csproj index fbb25bd54..f2d243d9a 100644 --- a/samples/Processing/Processing.csproj +++ b/samples/Processing/Processing.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index ab79788de..e072ab2cf 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -24,7 +24,7 @@ - + diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs index 2a488ace0..1134d7f9a 100644 --- a/src/DotPulsar/Internal/AsyncLock.cs +++ b/src/DotPulsar/Internal/AsyncLock.cs @@ -58,17 +58,22 @@ public Task Lock(CancellationToken cancellationToken) public async ValueTask DisposeAsync() { - lock (_pending) - { - if (Interlocked.Exchange(ref _isDisposed, 1) != 0) - return; + if (Interlocked.Exchange(ref _isDisposed, 1) != 0) + return; - foreach (var pending in _pending) - pending.Dispose(); + IEnumerable> pending; + lock (_pending) + { + pending = _pending.ToArray(); _pending.Clear(); } + foreach (var ccs in pending) + { + ccs.Dispose(); + } + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); //Wait for possible lock-holder to finish _semaphoreSlim.Release(); @@ -82,31 +87,41 @@ private void Cancel(LinkedListNode> node try { _pending.Remove(node); - node.Value.Dispose(); } catch { // Ignore } } + + try + { + node.Value.Dispose(); + } + catch + { + // Ignore + } } private void Release() { + LinkedListNode>? node; + lock (_pending) { - var node = _pending.First; + node = _pending.First; if (node is not null) - { - node.Value.SetResult(_releaser); - node.Value.Dispose(); _pending.RemoveFirst(); - return; - } - - if (_semaphoreSlim.CurrentCount == 0) + else if (_semaphoreSlim.CurrentCount == 0) _semaphoreSlim.Release(); } + + if (node is not null) + { + node.Value.SetResult(_releaser); + node.Value.Dispose(); + } } private void ThrowIfDisposed() diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs index e0560c0fc..067554200 100644 --- a/src/DotPulsar/Internal/AsyncQueue.cs +++ b/src/DotPulsar/Internal/AsyncQueue.cs @@ -33,20 +33,23 @@ public AsyncQueue() public void Enqueue(T item) { + LinkedListNode>? node; + lock (_lock) { ThrowIfDisposed(); - var node = _pendingDequeues.First; + node = _pendingDequeues.First; if (node is not null) { node.Value.SetResult(item); - node.Value.Dispose(); _pendingDequeues.RemoveFirst(); } else _queue.Enqueue(item); } + + node?.Value.Dispose(); } public ValueTask Dequeue(CancellationToken cancellationToken = default) @@ -72,14 +75,19 @@ public void Dispose() if (Interlocked.Exchange(ref _isDisposed, 1) != 0) return; + IEnumerable> pendingDequeues; + lock (_lock) { - foreach (var pendingDequeue in _pendingDequeues) - pendingDequeue.Dispose(); - + pendingDequeues = _pendingDequeues.ToArray(); _pendingDequeues.Clear(); _queue.Clear(); } + + foreach (var ccs in pendingDequeues) + { + ccs.Dispose(); + } } private void Cancel(LinkedListNode> node) @@ -88,7 +96,6 @@ private void Cancel(LinkedListNode> node) { try { - node.Value.Dispose(); _pendingDequeues.Remove(node); } catch @@ -96,6 +103,15 @@ private void Cancel(LinkedListNode> node) // ignored } } + + try + { + node.Value.Dispose(); + } + catch + { + // ignored + } } private void ThrowIfDisposed() diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs index 6c8964b9d..f68aeb819 100644 --- a/src/DotPulsar/Internal/Connector.cs +++ b/src/DotPulsar/Internal/Connector.cs @@ -118,10 +118,17 @@ private async Task EncryptStream(Stream stream, string host, Cancellatio private async Task EncryptStream(Stream stream, string host, CancellationToken cancellationToken) { SslStream? sslStream = null; + var policyErrors = SslPolicyErrors.None; + + bool Validate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) + { + policyErrors = sslPolicyErrors; + return ValidateServerCertificate(sender, certificate, chain, sslPolicyErrors); + } try { - sslStream = new SslStream(stream, false, ValidateServerCertificate, null); + sslStream = new SslStream(stream, false, Validate, null); var options = new SslClientAuthenticationOptions { TargetHost = host, @@ -132,13 +139,16 @@ private async Task EncryptStream(Stream stream, string host, Cancellatio await sslStream.AuthenticateAsClientAsync(options, cancellationToken).ConfigureAwait(false); return sslStream; } - catch + catch (Exception exception) { if (sslStream is null) await stream.DisposeAsync().ConfigureAwait(false); else await sslStream.DisposeAsync().ConfigureAwait(false); + if (policyErrors != SslPolicyErrors.None) + exception.Data.Add("SslPolicyErrors", policyErrors); + throw; } }