Skip to content

Commit

Permalink
Make ready for release 3.1.1-rc.1
Browse files Browse the repository at this point in the history
Fixed a bug where disposing a disconnected consumer, reader or producer would cause a hang.
Updated NuGet packages
  • Loading branch information
blankensteiner committed Dec 6, 2023
1 parent a77adca commit f35e682
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.1.1-rc.1] - 2023-12-06

### Fixed

- Fixed a bug where disposing a disconnected consumer, reader or producer would cause a hang

## [3.1.0] - 2023-11-28

### Added
Expand Down
4 changes: 1 addition & 3 deletions src/DotPulsar/Internal/Abstractions/Process.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ public void Start()
public virtual async ValueTask DisposeAsync()
{
_cancellationTokenSource.Cancel();
if (_actionProcessorTask != null)
{
if (_actionProcessorTask is not null)
await _actionProcessorTask.ConfigureAwait(false);
}
}

public void Handle(IEvent e)
Expand Down
15 changes: 7 additions & 8 deletions src/DotPulsar/Internal/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ namespace DotPulsar.Internal;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -64,9 +63,9 @@ public async ValueTask DisposeAsync()
{
_cancellationTokenSource.Cancel();

foreach (var serviceUrl in _connections.Keys.ToArray())
foreach (var entry in _connections.ToArray())
{
await DisposeConnection(serviceUrl).ConfigureAwait(false);
await DisposeConnection(entry.Key, entry.Value).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -149,25 +148,25 @@ private async ValueTask<Connection> GetConnection(PulsarUrl url, CancellationTok

private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
{
var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
var stream = await _connector.Connect(url.Physical, cancellationToken).ConfigureAwait(false);

var commandConnect = _commandConnect;
if (url.ProxyThroughServiceUrl)
commandConnect = WithProxyToBroker(commandConnect, url.Logical);

var connection = Connection.Connect(new PulsarStream(stream), _authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
_ = connection.OnStateChangeFrom(ConnectionState.Connected).AsTask().ContinueWith(t => DisposeConnection(url));
_ = connection.OnStateChangeFrom(ConnectionState.Connected, CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, connection));
var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Connected);
_connections[url] = connection;
connection.MaxMessageSize = response.Connected.MaxMessageSize;
return connection;
}

private async ValueTask DisposeConnection(PulsarUrl serviceUrl)
private async ValueTask DisposeConnection(PulsarUrl serviceUrl, Connection connection)
{
if (_connections.TryRemove(serviceUrl, out var connection) && connection is not null)
await connection.DisposeAsync().ConfigureAwait(false);
_connections.TryRemove(serviceUrl, out var _);
await connection.DisposeAsync().ConfigureAwait(false);
}

private static CommandConnect WithProxyToBroker(CommandConnect commandConnect, Uri logicalUrl)
Expand Down
46 changes: 39 additions & 7 deletions src/DotPulsar/Internal/Connector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace DotPulsar.Internal;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;

public sealed class Connector
Expand All @@ -45,7 +46,7 @@ public Connector(
_checkCertificateRevocation = checkCertificateRevocation;
}

public async Task<Stream> Connect(Uri serviceUrl)
public async Task<Stream> Connect(Uri serviceUrl, CancellationToken cancellationToken)
{
var scheme = serviceUrl.Scheme;
var host = serviceUrl.Host;
Expand All @@ -55,26 +56,33 @@ public async Task<Stream> Connect(Uri serviceUrl)
if (port == -1)
port = encrypt ? Constants.DefaultPulsarSSLPort : Constants.DefaultPulsarPort;

var stream = await GetStream(host, port).ConfigureAwait(false);
var stream = await GetStream(host, port, cancellationToken).ConfigureAwait(false);

if (encrypt)
stream = await EncryptStream(stream, host).ConfigureAwait(false);
stream = await EncryptStream(stream, host, cancellationToken).ConfigureAwait(false);

return stream;
}

private static async Task<Stream> GetStream(string host, int port)
private static async Task<Stream> GetStream(string host, int port, CancellationToken cancellationToken)
{
var tcpClient = new TcpClient();

try
{
var type = Uri.CheckHostName(host);

#if NETSTANDARD2_0 || NETSTANDARD2_1
if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6)
await tcpClient.ConnectAsync(IPAddress.Parse(host), port).ConfigureAwait(false);
else
await tcpClient.ConnectAsync(host, port).ConfigureAwait(false);
#else
if (type == UriHostNameType.IPv4 || type == UriHostNameType.IPv6)
await tcpClient.ConnectAsync(IPAddress.Parse(host), port, cancellationToken).ConfigureAwait(false);
else
await tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false);
#endif

return tcpClient.GetStream();
}
Expand All @@ -85,7 +93,8 @@ private static async Task<Stream> GetStream(string host, int port)
}
}

private async Task<Stream> EncryptStream(Stream stream, string host)
#if NETSTANDARD2_0
private async Task<Stream> EncryptStream(Stream stream, string host, CancellationToken _)
{
SslStream? sslStream = null;

Expand All @@ -97,20 +106,43 @@ private async Task<Stream> EncryptStream(Stream stream, string host)
}
catch
{
#if NETSTANDARD2_0
if (sslStream is null)
stream.Dispose();
else
sslStream.Dispose();

throw;
}
}
#else
private async Task<Stream> EncryptStream(Stream stream, string host, CancellationToken cancellationToken)
{
SslStream? sslStream = null;

try
{
sslStream = new SslStream(stream, false, ValidateServerCertificate, null);
var options = new SslClientAuthenticationOptions
{
TargetHost = host,
ClientCertificates = _clientCertificates,
EnabledSslProtocols = SslProtocols.None,
CertificateRevocationCheckMode = _checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck
};
await sslStream.AuthenticateAsClientAsync(options, cancellationToken).ConfigureAwait(false);
return sslStream;
}
catch
{
if (sslStream is null)
await stream.DisposeAsync().ConfigureAwait(false);
else
await sslStream.DisposeAsync().ConfigureAwait(false);
#endif

throw;
}
}
#endif

private bool ValidateServerCertificate(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors)
{
Expand Down
4 changes: 2 additions & 2 deletions tests/DotPulsar.Tests/DotPulsar.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.0" />
<PackageReference Include="AutoFixture.Xunit2" Version="4.18.0" />
<PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" />
<PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" />
<PackageReference Include="DotNetZip" Version="1.16.0" />
<PackageReference Include="Ductus.FluentDocker" Version="2.10.59" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
Expand Down

0 comments on commit f35e682

Please sign in to comment.