Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add server timeout functionality plus docker test #104

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions DotPulsar.sln
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing", "samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Consumer", "tests\DotPulsar.Consumer\DotPulsar.Consumer.csproj", "{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get the new tests into DotPulsar.Tests instead of creating new test projects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need an executable that I can run in docker to connect to the server

EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -56,6 +58,10 @@ Global
{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}.Release|Any CPU.Build.0 = Release|Any CPU
{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Debug|Any CPU.Build.0 = Debug|Any CPU
{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Release|Any CPU.ActiveCfg = Release|Any CPU
{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -66,6 +72,7 @@ Global
{14934BED-A222-47B2-A58A-CFC4AAB89B49} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = {E7106D0F-B255-4631-9FB8-734FC5748FA9}
{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770} = {E1C932A9-6D4C-4DDF-8922-BE7B71F12F1C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
Expand Down
10 changes: 10 additions & 0 deletions src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ public interface IPulsarClientBuilder
/// </summary>
IPulsarClientBuilder KeepAliveInterval(TimeSpan interval);

/// <summary>
/// The maximum amount of time to wait without receiving any message from the server at
/// which point the connection is assumed to be dead or the server is not responding.
/// As we are sending pings the server should respond to those at a minimum within this specified timeout period.
/// Once this happens the connection will be torn down and all consumers/producers will enter
/// the disconnected state and attempt to reconnect
/// The default is 60 seconds.
/// </summary>
IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also a configurable setting for other clients? If not, we could just hardcode it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't know about other clients but it seems it is configurable on the broker so the client setting should agree with that so I think it needs to be configurable here. Shame the value isn't returned from the server as part of the connection response


/// <summary>
/// Set the listener name. This is optional.
/// </summary>
Expand Down
13 changes: 11 additions & 2 deletions src/DotPulsar/Internal/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@ public sealed class Connection : IConnection
private readonly IAuthentication? _authentication;
private int _isDisposed;

public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, IAuthentication? authentication)
public Connection(
IPulsarStream stream,
TimeSpan keepAliveInterval,
TimeSpan serverResponseTimeout,
IAuthentication? authentication)
{
_lock = new AsyncLock();
_channelManager = new ChannelManager();
_pingPongHandler = new PingPongHandler(this, keepAliveInterval);
_pingPongHandler = new PingPongHandler(this, keepAliveInterval, serverResponseTimeout);
_stream = stream;
_authentication = authentication;
}
Expand Down Expand Up @@ -294,6 +298,11 @@ private async Task Send(BaseCommand command, CancellationToken cancellationToken
}

public async Task ProcessIncommingFrames(CancellationToken cancellationToken)
{
await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a ConfigureAwait(false)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

public async Task ProcessIncommingFramesImpl(CancellationToken cancellationToken)
{
await Task.Yield();

Expand Down
6 changes: 5 additions & 1 deletion src/DotPulsar/Internal/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public sealed class ConnectionPool : IConnectionPool
private readonly string? _listenerName;
private readonly TimeSpan _keepAliveInterval;
private readonly IAuthentication? _authentication;
private readonly TimeSpan _serverResponseTimeout;

public ConnectionPool(
CommandConnect commandConnect,
Expand All @@ -47,6 +48,7 @@ public ConnectionPool(
TimeSpan closeInactiveConnectionsInterval,
string? listenerName,
TimeSpan keepAliveInterval,
TimeSpan serverResponseTimeout,
IAuthentication? authentication)
{
_lock = new AsyncLock();
Expand All @@ -59,6 +61,7 @@ public ConnectionPool(
_cancellationTokenSource = new CancellationTokenSource();
_closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
_keepAliveInterval = keepAliveInterval;
_serverResponseTimeout = serverResponseTimeout;
_authentication = authentication;
}

Expand Down Expand Up @@ -159,7 +162,8 @@ private async ValueTask<Connection> GetConnection(PulsarUrl url, CancellationTok
private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
{
var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _authentication);
var connection = new Connection(new PulsarStream(stream), _keepAliveInterval, _serverResponseTimeout,
_authentication);
DotPulsarMeter.ConnectionCreated();
_connections[url] = connection;
_ = connection.ProcessIncommingFrames(_cancellationTokenSource.Token).ContinueWith(t => DisposeConnection(url));
Expand Down
4 changes: 4 additions & 0 deletions src/DotPulsar/Internal/DotPulsarMeter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static class DotPulsarMeter
private static int _numberOfReaders;
private static int _numberOfConsumers;
private static int _numberOfProducers;
private static int _numberOfServerTimeouts;
#pragma warning restore IDE0044
#pragma warning restore IDE0079
private static readonly Histogram<double> _producerSendDuration;
Expand All @@ -42,6 +43,7 @@ static DotPulsarMeter()
_ = Meter.CreateObservableGauge("dotpulsar.reader.count", GetNumberOfReaders, "{readers}", "Number of readers");
_ = Meter.CreateObservableGauge("dotpulsar.consumer.count", GetNumberOfConsumers, "{consumers}", "Number of consumers");
_ = Meter.CreateObservableGauge("dotpulsar.producer.count", GetNumberOfProducers, "{producers}", "Number of producers");
_ = Meter.CreateObservableGauge("dotpulsar.server.timeout.count", GetNumberOfProducers, "{servertimeout}", "Number of times server stopped responding");
_producerSendDuration = Meter.CreateHistogram<double>("dotpulsar.producer.send.duration", "ms", "Measures the duration for sending a message");
_consumerProcessDuration = Meter.CreateHistogram<double>("dotpulsar.consumer.process.duration", "ms", "Measures the duration for processing a message");
}
Expand All @@ -54,6 +56,7 @@ static DotPulsarMeter()

public static void ConnectionCreated() => Interlocked.Increment(ref _numberOfConnections);
public static void ConnectionDisposed() => Interlocked.Decrement(ref _numberOfConnections);
public static void ServerTimedout() => Interlocked.Decrement(ref _numberOfServerTimeouts);
private static int GetNumberOfConnections() => Volatile.Read(ref _numberOfConnections);

public static void ReaderCreated() => Interlocked.Increment(ref _numberOfReaders);
Expand All @@ -67,6 +70,7 @@ static DotPulsarMeter()
public static void ProducerCreated() => Interlocked.Increment(ref _numberOfProducers);
public static void ProducerDisposed() => Interlocked.Decrement(ref _numberOfProducers);
private static int GetNumberOfProducers() => Volatile.Read(ref _numberOfProducers);
private static int GetNumberOfServerTimeouts() => Volatile.Read(ref _numberOfServerTimeouts);


public static bool MessageSentEnabled => _producerSendDuration.Enabled;
Expand Down
32 changes: 26 additions & 6 deletions src/DotPulsar/Internal/PingPongHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,37 @@ public sealed class PingPongHandler : IAsyncDisposable
{
private readonly IConnection _connection;
private readonly TimeSpan _keepAliveInterval;
private readonly TimeSpan _serverResponseTimeout;
private readonly Timer _timer;
private readonly CommandPing _ping;
private readonly CommandPong _pong;
private long _lastCommand;
private readonly TaskCompletionSource<object> _serverNotRespondingTcs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-generic TaskCompletionSource is a better fit since the object is never used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it has been removed
StephenCleary/AsyncEx#176


public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval)
public PingPongHandler(IConnection connection,
TimeSpan keepAliveInterval,
TimeSpan serverResponseTimeout)
{
_connection = connection;
_keepAliveInterval = keepAliveInterval;
_serverResponseTimeout = serverResponseTimeout;
_timer = new Timer(Watch);
_timer.Change(_keepAliveInterval, TimeSpan.Zero);
_ping = new CommandPing();
_pong = new CommandPong();
_lastCommand = Stopwatch.GetTimestamp();
_serverNotRespondingTcs = new TaskCompletionSource<object>();
}

public Task ServerNotResponding => _serverNotRespondingTcs.Task;

public bool Incoming(BaseCommand.Type commandType)
{
Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp());

if (commandType == BaseCommand.Type.Ping)
{
Task.Factory.StartNew(() => SendPong());
Task.Factory.StartNew(SendPong);
return true;
}

Expand All @@ -61,13 +69,25 @@ private void Watch(object? state)
var lastCommand = Interlocked.Read(ref _lastCommand);
var now = Stopwatch.GetTimestamp();
var elapsed = TimeSpan.FromSeconds((now - lastCommand) / Stopwatch.Frequency);
if (elapsed >= _keepAliveInterval)

if (elapsed > _serverResponseTimeout)
{
Task.Factory.StartNew(() => SendPing());
_timer.Change(_keepAliveInterval, TimeSpan.Zero);
DotPulsarMeter.ServerTimedout();
_serverNotRespondingTcs.SetResult(new object());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might as well just return here instead of wrapping the following code in an else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
else
_timer.Change(_keepAliveInterval.Subtract(elapsed), TimeSpan.Zero);
{
if (elapsed >= _keepAliveInterval)
{
Task.Factory.StartNew(() => SendPing());
_timer.Change(_keepAliveInterval, TimeSpan.Zero);
}
else
{
var result = _keepAliveInterval.Subtract(elapsed);
_timer.Change(result, TimeSpan.Zero);
}
}
}
catch
{
Expand Down
11 changes: 10 additions & 1 deletion src/DotPulsar/Internal/PulsarClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public sealed class PulsarClientBuilder : IPulsarClientBuilder
private readonly List<IHandleException> _exceptionHandlers;
private EncryptionPolicy? _encryptionPolicy;
private TimeSpan _keepAliveInterval;
private TimeSpan _serverResponseTimeout;
private string? _listenerName;
private TimeSpan _retryInterval;
private Uri _serviceUrl;
Expand All @@ -51,6 +52,7 @@ public PulsarClientBuilder()

_exceptionHandlers = new List<IHandleException>();
_keepAliveInterval = TimeSpan.FromSeconds(30);
_serverResponseTimeout = TimeSpan.FromSeconds(60);
_retryInterval = TimeSpan.FromSeconds(3);
_serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}");
_clientCertificates = new X509Certificate2Collection();
Expand Down Expand Up @@ -96,6 +98,12 @@ public IPulsarClientBuilder KeepAliveInterval(TimeSpan interval)
return this;
}

public IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval)
{
_serverResponseTimeout = interval;
return this;
}

public IPulsarClientBuilder ListenerName(string listenerName)
{
_listenerName = listenerName;
Expand Down Expand Up @@ -165,7 +173,8 @@ public IPulsarClient Build()

var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval, _authentication);

var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval, _serverResponseTimeout, _authentication);
var processManager = new ProcessManager(connectionPool);

return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
Expand Down
26 changes: 26 additions & 0 deletions tests/DotPulsar.Consumer/DotPulsar.Consumer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<OutputType>Exe</OutputType>
<Nullable>enable</Nullable>
<PublishSingleFile>true</PublishSingleFile>
<DebugType>embedded</DebugType>
<RuntimeIdentifier>linux-x64</RuntimeIdentifier>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
</ItemGroup>

<Target Name="PublishConsumer" AfterTargets="Build">
<Exec Command="dotnet publish $(ProjectDir) -r linux-x64 -o $(ProjectDir)..\DotPulsar.Tests\$(OutDir) --no-build"
ConsoleToMSBuild="true" IgnoreExitCode="true" IgnoreStandardErrorWarningFormat="true">
<Output TaskParameter="ConsoleOutput" PropertyName="ConsoleOutput" />
<Output TaskParameter="ExitCode" PropertyName="ExitCode" />
</Exec>

<Error Condition="'$(ExitCode)' != '0'" Text="$(ConsoleOutput)" />
</Target>
</Project>
95 changes: 95 additions & 0 deletions tests/DotPulsar.Consumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
namespace DotPulsar.Consumer;

using Abstractions;
using Extensions;
using Internal;
using System.Net.Sockets;

public static class Program
{
public const string ConnectedAwaitingDisconnection = "Successfully connected, awaiting disconnection";
public const string DisconnectedAwaitingReconnection = "Successfully disconnected, awaiting reconnection";
public const string Reconnected = "Successfully reconnected";

public static async Task<int> Main(string[] args)
{
Console.WriteLine($"Running with url {args[0]}");
var client = CreateClient(args[0], args[1]);

var testRunId = Guid.NewGuid().ToString("N");

var topic = $"persistent://public/default/consumer-tests-{testRunId}";
await using var consumer = client.NewConsumer(Schema.ByteArray)
.ConsumerName($"consumer-{testRunId}")
.InitialPosition(SubscriptionInitialPosition.Earliest)
.SubscriptionName($"subscription-{testRunId}")
.StateChangedHandler(changed => Console.WriteLine($"Consumer state changed to {changed.ConsumerState}"))
.Topic(topic)
.Create();

var timeout = Task.Delay(TimeSpan.FromSeconds(30));
var result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Active).AsTask());

if (result == timeout)
{
Console.WriteLine("Timed out waiting for active status");
return -1;
}

Console.WriteLine(ConnectedAwaitingDisconnection);
timeout = Task.Delay(TimeSpan.FromSeconds(30));
result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Disconnected).AsTask());

if (result == timeout)
{
Console.WriteLine("Timed out waiting for disconnected status");
return -2;
}

Console.WriteLine(DisconnectedAwaitingReconnection);
timeout = Task.Delay(TimeSpan.FromSeconds(30));
result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Active).AsTask());

if (result == timeout)
{
Console.WriteLine("Timed out waiting for reconnected status");
return -3;
}

Console.WriteLine(Reconnected);
return 0;
}

private static IPulsarClient CreateClient(string url, string token)
=> PulsarClient
.Builder()
.Authentication(AuthenticationFactory.Token(_ => ValueTask.FromResult(token)))
.KeepAliveInterval(TimeSpan.FromSeconds(5))
.ServerResponseTimeout(TimeSpan.FromSeconds(10))
.ExceptionHandler(new TestExceptionHandler())
.ServiceUrl(new Uri(url))
.Build();
}

internal class TestExceptionHandler : IHandleException
{
private readonly IHandleException _inner = new DefaultExceptionHandler(TimeSpan.FromSeconds(5));

public ValueTask OnException(ExceptionContext exceptionContext)
{
Console.WriteLine("Exception occurred: {0}", exceptionContext.Exception.Message);

// This occurs when reconnecting the docker network, it takes some time for the alias to be reestablished
if (exceptionContext.Exception is SocketException se && se.Message.Contains("Name or service not known"))
{
exceptionContext.ExceptionHandled = true;
exceptionContext.Result = FaultAction.Retry;
}
else
{
_inner.OnException(exceptionContext);
}

return ValueTask.CompletedTask;
}
}
1 change: 1 addition & 0 deletions tests/DotPulsar.Tests/DotPulsar.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
<ProjectReference Include="..\DotPulsar.Consumer\DotPulsar.Consumer.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions tests/DotPulsar.Tests/IntegrationCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ namespace DotPulsar.Tests;

[CollectionDefinition("Integration")]
public class IntegrationCollection : ICollectionFixture<IntegrationFixture> { }

[CollectionDefinition("KeepAlive")]
public class KeepAliveCollection : ICollectionFixture<KeepAliveFixture> { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a new (standalone) cluster needed?
Seems we could solve this with just one cluster/integration fixture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use the existing cluster but once you start poking around with the network subsequent tests failed so I opted for a separate cluster

Loading