diff --git a/CHANGELOG.md b/CHANGELOG.md index 60a4562a8..18faebfa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.1.1-rc.1] - 2023-12-06 + +### Fixed + +- Fixed a bug where disposing a disconnected consumer, reader or producer would cause a hang + ## [3.1.0] - 2023-11-28 ### Added diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs b/src/DotPulsar/Internal/Abstractions/Process.cs index a2c51582f..981653e6d 100644 --- a/src/DotPulsar/Internal/Abstractions/Process.cs +++ b/src/DotPulsar/Internal/Abstractions/Process.cs @@ -49,10 +49,8 @@ public void Start() public virtual async ValueTask DisposeAsync() { _cancellationTokenSource.Cancel(); - if (_actionProcessorTask != null) - { + if (_actionProcessorTask is not null) await _actionProcessorTask.ConfigureAwait(false); - } } public void Handle(IEvent e) diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs index 1cd7e7cf0..bf164ff54 100644 --- a/src/DotPulsar/Internal/ConnectionPool.cs +++ b/src/DotPulsar/Internal/ConnectionPool.cs @@ -21,7 +21,6 @@ namespace DotPulsar.Internal; using DotPulsar.Internal.PulsarApi; using System; using System.Collections.Concurrent; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -64,9 +63,9 @@ public async ValueTask DisposeAsync() { _cancellationTokenSource.Cancel(); - foreach (var serviceUrl in _connections.Keys.ToArray()) + foreach (var entry in _connections.ToArray()) { - await DisposeConnection(serviceUrl).ConfigureAwait(false); + await DisposeConnection(entry.Key, entry.Value).ConfigureAwait(false); } } @@ -149,14 +148,14 @@ private async ValueTask GetConnection(PulsarUrl url, CancellationTok private async Task EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken) { - var stream = await _connector.Connect(url.Physical).ConfigureAwait(false); + var stream = await _connector.Connect(url.Physical, cancellationToken).ConfigureAwait(false); var commandConnect = _commandConnect; if (url.ProxyThroughServiceUrl) commandConnect = WithProxyToBroker(commandConnect, url.Logical); var connection = Connection.Connect(new PulsarStream(stream), _authentication, _keepAliveInterval, _closeInactiveConnectionsInterval); - _ = connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t => DisposeConnection(url)); + _ = connection.OnStateChangeFrom(ConnectionState.Connected, CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, connection)); var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false); response.Expect(BaseCommand.Type.Connected); _connections[url] = connection; @@ -164,10 +163,10 @@ private async Task EstablishNewConnection(PulsarUrl url, Cancellatio return connection; } - private async ValueTask DisposeConnection(PulsarUrl serviceUrl) + private async ValueTask DisposeConnection(PulsarUrl serviceUrl, Connection connection) { - if (_connections.TryRemove(serviceUrl, out var connection) && connection is not null) - await connection.DisposeAsync().ConfigureAwait(false); + _connections.TryRemove(serviceUrl, out var _); + await connection.DisposeAsync().ConfigureAwait(false); } private static CommandConnect WithProxyToBroker(CommandConnect commandConnect, Uri logicalUrl) diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs index 6af6bfbad..b4f4bdd13 100644 --- a/src/DotPulsar/Internal/Connector.cs +++ b/src/DotPulsar/Internal/Connector.cs @@ -21,6 +21,7 @@ namespace DotPulsar.Internal; using System.Net.Sockets; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; +using System.Threading; using System.Threading.Tasks; public sealed class Connector @@ -45,7 +46,7 @@ public Connector( _checkCertificateRevocation = checkCertificateRevocation; } - public async Task Connect(Uri serviceUrl) + public async Task Connect(Uri serviceUrl, CancellationToken cancellationToken) { var scheme = serviceUrl.Scheme; var host = serviceUrl.Host; @@ -55,15 +56,15 @@ public async Task Connect(Uri serviceUrl) if (port == -1) port = encrypt ? Constants.DefaultPulsarSSLPort : Constants.DefaultPulsarPort; - var stream = await GetStream(host, port).ConfigureAwait(false); + var stream = await GetStream(host, port, cancellationToken).ConfigureAwait(false); if (encrypt) - stream = await EncryptStream(stream, host).ConfigureAwait(false); + stream = await EncryptStream(stream, host, cancellationToken).ConfigureAwait(false); return stream; } - private static async Task GetStream(string host, int port) + private static async Task GetStream(string host, int port, CancellationToken cancellationToken) { var tcpClient = new TcpClient(); @@ -71,10 +72,17 @@ private static async Task GetStream(string host, int port) { var type = Uri.CheckHostName(host); +#if NETSTANDARD2_0 || NETSTANDARD2_1 if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6) await tcpClient.ConnectAsync(IPAddress.Parse(host), port).ConfigureAwait(false); else await tcpClient.ConnectAsync(host, port).ConfigureAwait(false); +#else + if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6) + await tcpClient.ConnectAsync(IPAddress.Parse(host), port, cancellationToken).ConfigureAwait(false); + else + await tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false); +#endif return tcpClient.GetStream(); } @@ -85,7 +93,8 @@ private static async Task GetStream(string host, int port) } } - private async Task EncryptStream(Stream stream, string host) +#if NETSTANDARD2_0 + private async Task EncryptStream(Stream stream, string host, CancellationToken _) { SslStream? sslStream = null; @@ -97,20 +106,43 @@ private async Task EncryptStream(Stream stream, string host) } catch { -#if NETSTANDARD2_0 if (sslStream is null) stream.Dispose(); else sslStream.Dispose(); + + throw; + } + } #else + private async Task EncryptStream(Stream stream, string host, CancellationToken cancellationToken) + { + SslStream? sslStream = null; + + try + { + sslStream = new SslStream(stream, false, ValidateServerCertificate, null); + var options = new SslClientAuthenticationOptions + { + TargetHost = host, + ClientCertificates = _clientCertificates, + EnabledSslProtocols = SslProtocols.None, + CertificateRevocationCheckMode = _checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck + }; + await sslStream.AuthenticateAsClientAsync(options, cancellationToken).ConfigureAwait(false); + return sslStream; + } + catch + { if (sslStream is null) await stream.DisposeAsync().ConfigureAwait(false); else await sslStream.DisposeAsync().ConfigureAwait(false); -#endif + throw; } } +#endif private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors) { diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 27d6c88f2..38717ad73 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -7,8 +7,8 @@ - - + +