From c4bcb03c3b060311bf8048a35d539370285164b5 Mon Sep 17 00:00:00 2001 From: Cathal Golden Date: Mon, 16 May 2022 12:51:55 +0200 Subject: [PATCH 1/5] add server timeout functionality plus docker test --- DotPulsar.sln | 7 ++ .../Abstractions/IPulsarClientBuilder.cs | 10 ++ src/DotPulsar/Internal/Connection.cs | 13 +- src/DotPulsar/Internal/ConnectionPool.cs | 6 +- src/DotPulsar/Internal/DotPulsarMeter.cs | 4 + src/DotPulsar/Internal/PingPongHandler.cs | 32 ++++- src/DotPulsar/Internal/PulsarClientBuilder.cs | 11 +- .../DotPulsar.Consumer.csproj | 26 ++++ tests/DotPulsar.Consumer/Program.cs | 95 +++++++++++++++ tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 1 + .../DotPulsar.Tests/IntegrationCollection.cs | 3 + tests/DotPulsar.Tests/IntegrationFixture.cs | 106 +++++++++++------ tests/DotPulsar.Tests/KeepAliveFixture.cs | 16 +++ tests/DotPulsar.Tests/KeepAliveTests.cs | 112 ++++++++++++++++++ tests/DotPulsar.Tests/ProducerTests.cs | 1 + 15 files changed, 399 insertions(+), 44 deletions(-) create mode 100644 tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj create mode 100644 tests/DotPulsar.Consumer/Program.cs create mode 100644 tests/DotPulsar.Tests/KeepAliveFixture.cs create mode 100644 tests/DotPulsar.Tests/KeepAliveTests.cs diff --git a/DotPulsar.sln b/DotPulsar.sln index 35c269044..7c68759ec 100644 --- a/DotPulsar.sln +++ b/DotPulsar.sln @@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing", "samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Consumer", "tests\DotPulsar.Consumer\DotPulsar.Consumer.csproj", "{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -56,6 +58,10 @@ Global {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Debug|Any CPU.Build.0 = Debug|Any CPU {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Release|Any CPU.ActiveCfg = Release|Any CPU {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Release|Any CPU.Build.0 = Release|Any CPU + {36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Debug|Any CPU.Build.0 = Debug|Any CPU + {36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Release|Any CPU.ActiveCfg = Release|Any CPU + {36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -66,6 +72,7 @@ Global {14934BED-A222-47B2-A58A-CFC4AAB89B49} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} {6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} + {36E6E6EF-A471-4AE4-B696-1C9DAAFA2770} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789} diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs index 9cf41e0e8..62da0ab30 100644 --- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs +++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs @@ -53,6 +53,16 @@ public interface IPulsarClientBuilder /// IPulsarClientBuilder KeepAliveInterval(TimeSpan interval); + /// + /// The maximum amount of time to wait without receiving any message from the server at + /// which point the connection is assumed to be dead or the server is not responding. + /// As we are sending pings the server should respond to those at a minimum within this specified timeout period. + /// Once this happens the connection will be torn down and all consumers/producers will enter + /// the disconnected state and attempt to reconnect + /// The default is 60 seconds. + /// + IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval); + /// /// Set the listener name. This is optional. /// diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs index adb1d34d0..70a492c39 100644 --- a/src/DotPulsar/Internal/Connection.cs +++ b/src/DotPulsar/Internal/Connection.cs @@ -33,11 +33,15 @@ public sealed class Connection : IConnection private readonly IAuthentication? _authentication; private int _isDisposed; - public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, IAuthentication? authentication) + public Connection( + IPulsarStream stream, + TimeSpan keepAliveInterval, + TimeSpan serverResponseTimeout, + IAuthentication? authentication) { _lock = new AsyncLock(); _channelManager = new ChannelManager(); - _pingPongHandler = new PingPongHandler(this, keepAliveInterval); + _pingPongHandler = new PingPongHandler(this, keepAliveInterval, serverResponseTimeout); _stream = stream; _authentication = authentication; } @@ -294,6 +298,11 @@ private async Task Send(BaseCommand command, CancellationToken cancellationToken } public async Task ProcessIncommingFrames(CancellationToken cancellationToken) + { + await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding); + } + + public async Task ProcessIncommingFramesImpl(CancellationToken cancellationToken) { await Task.Yield(); diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs index 62fb53d7f..5485a682d 100644 --- a/src/DotPulsar/Internal/ConnectionPool.cs +++ b/src/DotPulsar/Internal/ConnectionPool.cs @@ -38,6 +38,7 @@ public sealed class ConnectionPool : IConnectionPool private readonly string? _listenerName; private readonly TimeSpan _keepAliveInterval; private readonly IAuthentication? _authentication; + private readonly TimeSpan _serverResponseTimeout; public ConnectionPool( CommandConnect commandConnect, @@ -47,6 +48,7 @@ public ConnectionPool( TimeSpan closeInactiveConnectionsInterval, string? listenerName, TimeSpan keepAliveInterval, + TimeSpan serverResponseTimeout, IAuthentication? authentication) { _lock = new AsyncLock(); @@ -59,6 +61,7 @@ public ConnectionPool( _cancellationTokenSource = new CancellationTokenSource(); _closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token); _keepAliveInterval = keepAliveInterval; + _serverResponseTimeout = serverResponseTimeout; _authentication = authentication; } @@ -159,7 +162,8 @@ private async ValueTask GetConnection(PulsarUrl url, CancellationTok private async Task EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken) { var stream = await _connector.Connect(url.Physical).ConfigureAwait(false); - var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _authentication); + var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _serverResponseTimeout, + _authentication); DotPulsarMeter.ConnectionCreated(); _connections[url] = connection; _ = connection.ProcessIncommingFrames(_cancellationTokenSource.Token).ContinueWith(t => DisposeConnection(url)); diff --git a/src/DotPulsar/Internal/DotPulsarMeter.cs b/src/DotPulsar/Internal/DotPulsarMeter.cs index e56d6a2d2..4056f6715 100644 --- a/src/DotPulsar/Internal/DotPulsarMeter.cs +++ b/src/DotPulsar/Internal/DotPulsarMeter.cs @@ -29,6 +29,7 @@ public static class DotPulsarMeter private static int _numberOfReaders; private static int _numberOfConsumers; private static int _numberOfProducers; + private static int _numberOfServerTimeouts; #pragma warning restore IDE0044 #pragma warning restore IDE0079 private static readonly Histogram _producerSendDuration; @@ -42,6 +43,7 @@ static DotPulsarMeter() _ = Meter.CreateObservableGauge("dotpulsar.reader.count", GetNumberOfReaders, "{readers}", "Number of readers"); _ = Meter.CreateObservableGauge("dotpulsar.consumer.count", GetNumberOfConsumers, "{consumers}", "Number of consumers"); _ = Meter.CreateObservableGauge("dotpulsar.producer.count", GetNumberOfProducers, "{producers}", "Number of producers"); + _ = Meter.CreateObservableGauge("dotpulsar.server.timeout.count", GetNumberOfProducers, "{servertimeout}", "Number of times server stopped responding"); _producerSendDuration = Meter.CreateHistogram("dotpulsar.producer.send.duration", "ms", "Measures the duration for sending a message"); _consumerProcessDuration = Meter.CreateHistogram("dotpulsar.consumer.process.duration", "ms", "Measures the duration for processing a message"); } @@ -54,6 +56,7 @@ static DotPulsarMeter() public static void ConnectionCreated() => Interlocked.Increment(ref _numberOfConnections); public static void ConnectionDisposed() => Interlocked.Decrement(ref _numberOfConnections); + public static void ServerTimedout() => Interlocked.Decrement(ref _numberOfServerTimeouts); private static int GetNumberOfConnections() => Volatile.Read(ref _numberOfConnections); public static void ReaderCreated() => Interlocked.Increment(ref _numberOfReaders); @@ -67,6 +70,7 @@ static DotPulsarMeter() public static void ProducerCreated() => Interlocked.Increment(ref _numberOfProducers); public static void ProducerDisposed() => Interlocked.Decrement(ref _numberOfProducers); private static int GetNumberOfProducers() => Volatile.Read(ref _numberOfProducers); + private static int GetNumberOfServerTimeouts() => Volatile.Read(ref _numberOfServerTimeouts); public static bool MessageSentEnabled => _producerSendDuration.Enabled; diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs index 43910956e..4f18dbf49 100644 --- a/src/DotPulsar/Internal/PingPongHandler.cs +++ b/src/DotPulsar/Internal/PingPongHandler.cs @@ -25,29 +25,37 @@ public sealed class PingPongHandler : IAsyncDisposable { private readonly IConnection _connection; private readonly TimeSpan _keepAliveInterval; + private readonly TimeSpan _serverResponseTimeout; private readonly Timer _timer; private readonly CommandPing _ping; private readonly CommandPong _pong; private long _lastCommand; + private readonly TaskCompletionSource _serverNotRespondingTcs; - public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval) + public PingPongHandler(IConnection connection, + TimeSpan keepAliveInterval, + TimeSpan serverResponseTimeout) { _connection = connection; _keepAliveInterval = keepAliveInterval; + _serverResponseTimeout = serverResponseTimeout; _timer = new Timer(Watch); _timer.Change(_keepAliveInterval, TimeSpan.Zero); _ping = new CommandPing(); _pong = new CommandPong(); _lastCommand = Stopwatch.GetTimestamp(); + _serverNotRespondingTcs = new TaskCompletionSource(); } + public Task ServerNotResponding => _serverNotRespondingTcs.Task; + public bool Incoming(BaseCommand.Type commandType) { Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp()); if (commandType == BaseCommand.Type.Ping) { - Task.Factory.StartNew(() => SendPong()); + Task.Factory.StartNew(SendPong); return true; } @@ -61,13 +69,25 @@ private void Watch(object? state) var lastCommand = Interlocked.Read(ref _lastCommand); var now = Stopwatch.GetTimestamp(); var elapsed = TimeSpan.FromSeconds((now - lastCommand) / Stopwatch.Frequency); - if (elapsed >= _keepAliveInterval) + + if (elapsed > _serverResponseTimeout) { - Task.Factory.StartNew(() => SendPing()); - _timer.Change(_keepAliveInterval, TimeSpan.Zero); + DotPulsarMeter.ServerTimedout(); + _serverNotRespondingTcs.SetResult(new object()); } else - _timer.Change(_keepAliveInterval.Subtract(elapsed), TimeSpan.Zero); + { + if (elapsed >= _keepAliveInterval) + { + Task.Factory.StartNew(() => SendPing()); + _timer.Change(_keepAliveInterval, TimeSpan.Zero); + } + else + { + var result = _keepAliveInterval.Subtract(elapsed); + _timer.Change(result, TimeSpan.Zero); + } + } } catch { diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs index 92e3fbb4a..5550fcdbb 100644 --- a/src/DotPulsar/Internal/PulsarClientBuilder.cs +++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs @@ -27,6 +27,7 @@ public sealed class PulsarClientBuilder : IPulsarClientBuilder private readonly List _exceptionHandlers; private EncryptionPolicy? _encryptionPolicy; private TimeSpan _keepAliveInterval; + private TimeSpan _serverResponseTimeout; private string? _listenerName; private TimeSpan _retryInterval; private Uri _serviceUrl; @@ -51,6 +52,7 @@ public PulsarClientBuilder() _exceptionHandlers = new List(); _keepAliveInterval = TimeSpan.FromSeconds(30); + _serverResponseTimeout = TimeSpan.FromSeconds(60); _retryInterval = TimeSpan.FromSeconds(3); _serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}"); _clientCertificates = new X509Certificate2Collection(); @@ -96,6 +98,12 @@ public IPulsarClientBuilder KeepAliveInterval(TimeSpan interval) return this; } + public IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval) + { + _serverResponseTimeout = interval; + return this; + } + public IPulsarClientBuilder ListenerName(string listenerName) { _listenerName = listenerName; @@ -165,7 +173,8 @@ public IPulsarClient Build() var exceptionHandlers = new List(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) }; var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers); - var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval, _authentication); + + var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval, _serverResponseTimeout, _authentication); var processManager = new ProcessManager(connectionPool); return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl); diff --git a/tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj b/tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj new file mode 100644 index 000000000..426203539 --- /dev/null +++ b/tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj @@ -0,0 +1,26 @@ + + + + net6.0 + enable + Exe + enable + true + embedded + linux-x64 + + + + + + + + + + + + + + + diff --git a/tests/DotPulsar.Consumer/Program.cs b/tests/DotPulsar.Consumer/Program.cs new file mode 100644 index 000000000..ee68da258 --- /dev/null +++ b/tests/DotPulsar.Consumer/Program.cs @@ -0,0 +1,95 @@ +namespace DotPulsar.Consumer; + +using Abstractions; +using Extensions; +using Internal; +using System.Net.Sockets; + +public static class Program +{ + public const string ConnectedAwaitingDisconnection = "Successfully connected, awaiting disconnection"; + public const string DisconnectedAwaitingReconnection = "Successfully disconnected, awaiting reconnection"; + public const string Reconnected = "Successfully reconnected"; + + public static async Task Main(string[] args) + { + Console.WriteLine($"Running with url {args[0]}"); + var client = CreateClient(args[0], args[1]); + + var testRunId = Guid.NewGuid().ToString("N"); + + var topic = $"persistent://public/default/consumer-tests-{testRunId}"; + await using var consumer = client.NewConsumer(Schema.ByteArray) + .ConsumerName($"consumer-{testRunId}") + .InitialPosition(SubscriptionInitialPosition.Earliest) + .SubscriptionName($"subscription-{testRunId}") + .StateChangedHandler(changed => Console.WriteLine($"Consumer state changed to {changed.ConsumerState}")) + .Topic(topic) + .Create(); + + var timeout = Task.Delay(TimeSpan.FromSeconds(30)); + var result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Active).AsTask()); + + if (result == timeout) + { + Console.WriteLine("Timed out waiting for active status"); + return -1; + } + + Console.WriteLine(ConnectedAwaitingDisconnection); + timeout = Task.Delay(TimeSpan.FromSeconds(30)); + result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Disconnected).AsTask()); + + if (result == timeout) + { + Console.WriteLine("Timed out waiting for disconnected status"); + return -2; + } + + Console.WriteLine(DisconnectedAwaitingReconnection); + timeout = Task.Delay(TimeSpan.FromSeconds(30)); + result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Active).AsTask()); + + if (result == timeout) + { + Console.WriteLine("Timed out waiting for reconnected status"); + return -3; + } + + Console.WriteLine(Reconnected); + return 0; + } + + private static IPulsarClient CreateClient(string url, string token) + => PulsarClient + .Builder() + .Authentication(AuthenticationFactory.Token(_ => ValueTask.FromResult(token))) + .KeepAliveInterval(TimeSpan.FromSeconds(5)) + .ServerResponseTimeout(TimeSpan.FromSeconds(10)) + .ExceptionHandler(new TestExceptionHandler()) + .ServiceUrl(new Uri(url)) + .Build(); +} + +internal class TestExceptionHandler : IHandleException +{ + private readonly IHandleException _inner = new DefaultExceptionHandler(TimeSpan.FromSeconds(5)); + + public ValueTask OnException(ExceptionContext exceptionContext) + { + Console.WriteLine("Exception occurred: {0}", exceptionContext.Exception.Message); + + // This occurs when reconnecting the docker network, it takes some time for the alias to be reestablished + if (exceptionContext.Exception is SocketException se && se.Message.Contains("Name or service not known")) + { + exceptionContext.ExceptionHandled = true; + exceptionContext.Result = FaultAction.Retry; + } + else + { + _inner.OnException(exceptionContext); + } + + return ValueTask.CompletedTask; + } +} diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 8caf66a9e..2e9d7f10c 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -23,6 +23,7 @@ + diff --git a/tests/DotPulsar.Tests/IntegrationCollection.cs b/tests/DotPulsar.Tests/IntegrationCollection.cs index 40880f621..45f4e40c8 100644 --- a/tests/DotPulsar.Tests/IntegrationCollection.cs +++ b/tests/DotPulsar.Tests/IntegrationCollection.cs @@ -18,3 +18,6 @@ namespace DotPulsar.Tests; [CollectionDefinition("Integration")] public class IntegrationCollection : ICollectionFixture { } + +[CollectionDefinition("KeepAlive")] +public class KeepAliveCollection : ICollectionFixture { } diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs b/tests/DotPulsar.Tests/IntegrationFixture.cs index d9a2ad769..d7f492b4f 100644 --- a/tests/DotPulsar.Tests/IntegrationFixture.cs +++ b/tests/DotPulsar.Tests/IntegrationFixture.cs @@ -15,9 +15,11 @@ namespace DotPulsar.Tests; using Ductus.FluentDocker.Builders; +using Ductus.FluentDocker.Commands; using Ductus.FluentDocker.Services; using Ductus.FluentDocker.Services.Extensions; using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -32,56 +34,78 @@ public class IntegrationFixture : IAsyncLifetime private const int Port = 6650; private readonly IMessageSink _messageSink; - private readonly IContainerService _cluster; + private IContainerService? _cluster; + private INetworkService? _network; + + protected virtual string[] EnvironmentVariables => new[] + { + $"PULSAR_PREFIX_tokenSecretKey=file://{SecretKeyPath}", + "PULSAR_PREFIX_authenticationRefreshCheckSeconds=5", + $"superUserRoles={UserName}", + "authenticationEnabled=true", + "authorizationEnabled=true", + "authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken", + "authenticateOriginalAuthData=false", + $"brokerClientAuthenticationPlugin={AuthenticationPlugin}", + $"CLIENT_PREFIX_authPlugin={AuthenticationPlugin}", + }; + + protected virtual bool IncludeNetwork => false; public IntegrationFixture(IMessageSink messageSink) { _messageSink = messageSink; - - var environmentVariables = new[] - { - $"PULSAR_PREFIX_tokenSecretKey=file://{SecretKeyPath}", - "PULSAR_PREFIX_authenticationRefreshCheckSeconds=5", - $"superUserRoles={UserName}", - "authenticationEnabled=true", - "authorizationEnabled=true", - "authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken", - "authenticateOriginalAuthData=false", - $"brokerClientAuthenticationPlugin={AuthenticationPlugin}", - $"CLIENT_PREFIX_authPlugin={AuthenticationPlugin}", - }; - - var arguments = "\"" + - $"bin/pulsar tokens create-secret-key --output {SecretKeyPath} && " + - $"export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " + - "export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && " + - "bin/apply-config-from-env.py conf/standalone.conf && " + - "bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf && " + - "bin/pulsar standalone --no-functions-worker" - + "\""; - - _cluster = new Builder() - .UseContainer() - .UseImage("apachepulsar/pulsar:2.9.2") - .WithEnvironment(environmentVariables) - .ExposePort(Port) - .Command("/bin/bash -c", arguments) - .Build(); - + var instanceId = Guid.NewGuid().ToString(); + PulsarContainerName = $"dotpulsar-pulsar.{instanceId}"; + NetworkAlias = $"dotpulsar-network.{instanceId}"; ServiceUrl = new Uri("pulsar://localhost:6650"); } public Uri ServiceUrl { get; private set; } + public string NetworkAlias { get; } + public string PulsarContainerName { get; } public Task DisposeAsync() { - _cluster.Dispose(); + _cluster?.Dispose(); + _network?.Dispose(); return Task.CompletedTask; } public Task InitializeAsync() { - _cluster.StateChange += (sender, args) => _messageSink.OnMessage(new DiagnosticMessage($"The Pulsar cluster changed state to: {args.State}")); + var arguments = "\"" + + $"bin/pulsar tokens create-secret-key --output {SecretKeyPath} && " + + $"export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " + + "export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && " + + "bin/apply-config-from-env.py conf/standalone.conf && " + + "bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf && " + + "bin/pulsar standalone --no-functions-worker" + + "\""; + + var docker= new Hosts().Discover().Single(); + + if (IncludeNetwork) + { + _network = docker.CreateNetwork(NetworkAlias, removeOnDispose:true); + } + + var builder = new Builder() + .UseContainer() + .WithName(PulsarContainerName) + .UseImage("apachepulsar/pulsar:2.9.2") + .WithEnvironment(EnvironmentVariables) + .WithHostName("pulsar") + .ExposePort(Port) + .Command("/bin/bash -c", arguments); + + if (IncludeNetwork) + { + builder = builder.UseNetwork(NetworkAlias); + } + + _cluster = builder.Build(); + _cluster.StateChange += (_, args) => _messageSink.OnMessage(new DiagnosticMessage($"The Pulsar cluster changed state to: {args.State}")); _cluster.Start(); _cluster.WaitForMessageInLogs("Successfully updated the policies on namespace public/default", int.MaxValue); var endpoint = _cluster.ToHostExposedEndpoint($"{Port}/tcp"); @@ -114,4 +138,18 @@ public void CreatePartitionedTopic(string topic, int numberOfPartitions) if (!result.Success) throw new Exception($"Could not create the partitioned topic: {result.Error}"); } + + public void DisconnectBroker() + { + var hosts = new Hosts().Discover(); + var docker = hosts.Single(); + docker.Host.NetworkDisconnect(PulsarContainerName, NetworkAlias, true); + } + + public void ReconnectBroker() + { + var hosts = new Hosts().Discover(); + var docker = hosts.Single(); + docker.Host.NetworkConnect(PulsarContainerName, NetworkAlias, new[] { "pulsar" }); + } } diff --git a/tests/DotPulsar.Tests/KeepAliveFixture.cs b/tests/DotPulsar.Tests/KeepAliveFixture.cs new file mode 100644 index 000000000..c0a80a409 --- /dev/null +++ b/tests/DotPulsar.Tests/KeepAliveFixture.cs @@ -0,0 +1,16 @@ +namespace DotPulsar.Tests; + +using System.Linq; +using Xunit.Abstractions; + +public class KeepAliveFixture : IntegrationFixture +{ + public KeepAliveFixture(IMessageSink messageSink) : base(messageSink) + { + + } + + protected override bool IncludeNetwork => true; + + protected override string[] EnvironmentVariables => base.EnvironmentVariables.Concat(new[] { "keepAliveIntervalSeconds=5" }).ToArray(); +} diff --git a/tests/DotPulsar.Tests/KeepAliveTests.cs b/tests/DotPulsar.Tests/KeepAliveTests.cs new file mode 100644 index 000000000..42b7e5f2b --- /dev/null +++ b/tests/DotPulsar.Tests/KeepAliveTests.cs @@ -0,0 +1,112 @@ +namespace DotPulsar.Tests; + +using Abstractions; +using Extensions; +using Consumer; +using Ductus.FluentDocker.Builders; +using Ductus.FluentDocker.Model.Builders; +using Ductus.FluentDocker.Services.Extensions; +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +[Collection("KeepAlive"), Trait("Category", "KeepAlive")] +public class KeepAliveTests +{ + private readonly KeepAliveFixture _fixture; + private readonly ITestOutputHelper _testOutputHelper; + + public KeepAliveTests(KeepAliveFixture fixture, ITestOutputHelper outputHelper) + { + _fixture = fixture; + _testOutputHelper = outputHelper; + } + + [Fact] + public async Task TestNetworkDisconnection() + { + var token = _fixture.CreateToken(Timeout.InfiniteTimeSpan); + + using var consumer = new Builder() + .UseContainer() + .UseImage("mcr.microsoft.com/dotnet/aspnet:6.0-bullseye-slim") + .UseNetwork(_fixture.NetworkAlias) + .Mount(Path.Combine(Environment.CurrentDirectory, "linux-x64"), "/app", MountType.ReadOnly) + .Command("/app/DotPulsar.Consumer", "pulsar://pulsar:6650", token) + .Build() + .Start(); + + var reconnected = false; + var logsTask = Task.Run(() => + { + var logs = consumer.Logs(true); + + while (!logs.IsFinished) + { + var line = logs.TryRead(45_000); // Do a read with timeout + + if (line != null) + { + _testOutputHelper.WriteLine(line); + } + } + + _testOutputHelper.WriteLine("Logs completed"); + }); + + try + { + consumer.WaitForMessageInLogs(Program.ConnectedAwaitingDisconnection, 30_000); + _testOutputHelper.WriteLine("Severing network"); + _fixture.DisconnectBroker(); + + consumer.WaitForMessageInLogs(Program.DisconnectedAwaitingReconnection, 30_000); + _testOutputHelper.WriteLine("Reconnecting network"); + _fixture.ReconnectBroker(); + reconnected = true; + consumer.WaitForMessageInLogs(Program.Reconnected, 30_000); + } + finally + { + if (!reconnected) + { + _fixture.ReconnectBroker(); + await AssertReconnection(); + } + + consumer.Dispose(); + } + + await logsTask; + } + + private async Task AssertReconnection() + { + await using var client = CreateClient(); + string topicName = $"round-robin-partitioned-{Guid.NewGuid():N}"; + await using var producer = client.NewProducer(Schema.String) + .Topic(topicName) + .Create(); + + var timeout = Task.Delay(TimeSpan.FromSeconds(30)); + var result = await Task.WhenAny(timeout, producer.StateChangedTo(ProducerState.Connected).AsTask()); + + await producer.Send("message"); + + if (result == timeout) + { + throw new Exception("Timeout waiting for active status"); + } + } + + private IPulsarClient CreateClient() + => PulsarClient + .Builder() + .Authentication(AuthenticationFactory.Token(ct => ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan)))) + .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: {ec.Exception}")) + .ServiceUrl(_fixture.ServiceUrl) + .Build(); +} diff --git a/tests/DotPulsar.Tests/ProducerTests.cs b/tests/DotPulsar.Tests/ProducerTests.cs index 1f63ad216..c24d21316 100644 --- a/tests/DotPulsar.Tests/ProducerTests.cs +++ b/tests/DotPulsar.Tests/ProducerTests.cs @@ -152,6 +152,7 @@ private IPulsarClient CreateClient() => PulsarClient .Builder() .Authentication(AuthenticationFactory.Token(ct => ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan)))) + .KeepAliveInterval(TimeSpan.FromSeconds(5)) .ExceptionHandler(ec => _testOutputHelper.WriteLine($"Exception: {ec.Exception}")) .ServiceUrl(_fixture.ServiceUrl) .Build(); From 52146b25900a74d2f77e2a1796e5742afa8a0f5e Mon Sep 17 00:00:00 2001 From: Cathal Golden Date: Mon, 16 May 2022 12:54:03 +0200 Subject: [PATCH 2/5] move to KeepAliveFixture --- tests/DotPulsar.Tests/IntegrationFixture.cs | 15 --------------- tests/DotPulsar.Tests/KeepAliveFixture.cs | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs b/tests/DotPulsar.Tests/IntegrationFixture.cs index d7f492b4f..3d90c7033 100644 --- a/tests/DotPulsar.Tests/IntegrationFixture.cs +++ b/tests/DotPulsar.Tests/IntegrationFixture.cs @@ -15,7 +15,6 @@ namespace DotPulsar.Tests; using Ductus.FluentDocker.Builders; -using Ductus.FluentDocker.Commands; using Ductus.FluentDocker.Services; using Ductus.FluentDocker.Services.Extensions; using System; @@ -138,18 +137,4 @@ public void CreatePartitionedTopic(string topic, int numberOfPartitions) if (!result.Success) throw new Exception($"Could not create the partitioned topic: {result.Error}"); } - - public void DisconnectBroker() - { - var hosts = new Hosts().Discover(); - var docker = hosts.Single(); - docker.Host.NetworkDisconnect(PulsarContainerName, NetworkAlias, true); - } - - public void ReconnectBroker() - { - var hosts = new Hosts().Discover(); - var docker = hosts.Single(); - docker.Host.NetworkConnect(PulsarContainerName, NetworkAlias, new[] { "pulsar" }); - } } diff --git a/tests/DotPulsar.Tests/KeepAliveFixture.cs b/tests/DotPulsar.Tests/KeepAliveFixture.cs index c0a80a409..ea9ba81bc 100644 --- a/tests/DotPulsar.Tests/KeepAliveFixture.cs +++ b/tests/DotPulsar.Tests/KeepAliveFixture.cs @@ -1,5 +1,7 @@ namespace DotPulsar.Tests; +using Ductus.FluentDocker.Commands; +using Ductus.FluentDocker.Services; using System.Linq; using Xunit.Abstractions; @@ -13,4 +15,18 @@ public KeepAliveFixture(IMessageSink messageSink) : base(messageSink) protected override bool IncludeNetwork => true; protected override string[] EnvironmentVariables => base.EnvironmentVariables.Concat(new[] { "keepAliveIntervalSeconds=5" }).ToArray(); + + public void DisconnectBroker() + { + var hosts = new Hosts().Discover(); + var docker = hosts.Single(); + docker.Host.NetworkDisconnect(PulsarContainerName, NetworkAlias, true); + } + + public void ReconnectBroker() + { + var hosts = new Hosts().Discover(); + var docker = hosts.Single(); + docker.Host.NetworkConnect(PulsarContainerName, NetworkAlias, new[] { "pulsar" }); + } } From 88310e58dc82e2766268e44605adf0787c0bb546 Mon Sep 17 00:00:00 2001 From: Cathal Golden Date: Mon, 16 May 2022 13:15:26 +0200 Subject: [PATCH 3/5] disable collection paralleization --- tests/DotPulsar.Tests/xunit.runner.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/DotPulsar.Tests/xunit.runner.json b/tests/DotPulsar.Tests/xunit.runner.json index 6c0d1e49b..017bc2212 100644 --- a/tests/DotPulsar.Tests/xunit.runner.json +++ b/tests/DotPulsar.Tests/xunit.runner.json @@ -1,4 +1,5 @@ { "$schema": "https://xunit.net/schema/current/xunit.runner.schema.json", - "diagnosticMessages": true + "diagnosticMessages": true, + "parallelizeTestCollections": false } From d42b29c38c0dc488348afec549c20bf4bfd2b1e5 Mon Sep 17 00:00:00 2001 From: Cathal Golden Date: Mon, 16 May 2022 13:18:33 +0200 Subject: [PATCH 4/5] don't use SolutionDir --- tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj b/tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj index 426203539..14885e2b0 100644 --- a/tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj +++ b/tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj @@ -15,7 +15,7 @@ - From d3858d963a0c6f91b4aeb6eee21ee0eb6a17a9e2 Mon Sep 17 00:00:00 2001 From: Cathal Golden Date: Fri, 3 Jun 2022 12:19:54 +0200 Subject: [PATCH 5/5] PR comments --- src/DotPulsar/Internal/Connection.cs | 2 +- src/DotPulsar/Internal/PingPongHandler.cs | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs index 70a492c39..249b125c0 100644 --- a/src/DotPulsar/Internal/Connection.cs +++ b/src/DotPulsar/Internal/Connection.cs @@ -299,7 +299,7 @@ private async Task Send(BaseCommand command, CancellationToken cancellationToken public async Task ProcessIncommingFrames(CancellationToken cancellationToken) { - await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding); + await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding).ConfigureAwait(false); } public async Task ProcessIncommingFramesImpl(CancellationToken cancellationToken) diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs index 4f18dbf49..b4481229b 100644 --- a/src/DotPulsar/Internal/PingPongHandler.cs +++ b/src/DotPulsar/Internal/PingPongHandler.cs @@ -74,19 +74,18 @@ private void Watch(object? state) { DotPulsarMeter.ServerTimedout(); _serverNotRespondingTcs.SetResult(new object()); + return; + } + + if (elapsed >= _keepAliveInterval) + { + Task.Factory.StartNew(() => SendPing()); + _timer.Change(_keepAliveInterval, TimeSpan.Zero); } else { - if (elapsed >= _keepAliveInterval) - { - Task.Factory.StartNew(() => SendPing()); - _timer.Change(_keepAliveInterval, TimeSpan.Zero); - } - else - { - var result = _keepAliveInterval.Subtract(elapsed); - _timer.Change(result, TimeSpan.Zero); - } + var result = _keepAliveInterval.Subtract(elapsed); + _timer.Change(result, TimeSpan.Zero); } } catch