-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add server timeout functionality plus docker test #104
base: master
Are you sure you want to change the base?
Changes from 4 commits
c4bcb03
52146b2
88310e5
d42b29c
d3858d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,16 @@ public interface IPulsarClientBuilder | |
/// </summary> | ||
IPulsarClientBuilder KeepAliveInterval(TimeSpan interval); | ||
|
||
/// <summary> | ||
/// The maximum amount of time to wait without receiving any message from the server at | ||
/// which point the connection is assumed to be dead or the server is not responding. | ||
/// As we are sending pings the server should respond to those at a minimum within this specified timeout period. | ||
/// Once this happens the connection will be torn down and all consumers/producers will enter | ||
/// the disconnected state and attempt to reconnect | ||
/// The default is 60 seconds. | ||
/// </summary> | ||
IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this also a configurable setting for other clients? If not, we could just hardcode it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't know about other clients but it seems it is configurable on the broker so the client setting should agree with that so I think it needs to be configurable here. Shame the value isn't returned from the server as part of the connection response |
||
|
||
/// <summary> | ||
/// Set the listener name. This is optional. | ||
/// </summary> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,11 +33,15 @@ public sealed class Connection : IConnection | |
private readonly IAuthentication? _authentication; | ||
private int _isDisposed; | ||
|
||
public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, IAuthentication? authentication) | ||
public Connection( | ||
IPulsarStream stream, | ||
TimeSpan keepAliveInterval, | ||
TimeSpan serverResponseTimeout, | ||
IAuthentication? authentication) | ||
{ | ||
_lock = new AsyncLock(); | ||
_channelManager = new ChannelManager(); | ||
_pingPongHandler = new PingPongHandler(this, keepAliveInterval); | ||
_pingPongHandler = new PingPongHandler(this, keepAliveInterval, serverResponseTimeout); | ||
_stream = stream; | ||
_authentication = authentication; | ||
} | ||
|
@@ -294,6 +298,11 @@ private async Task Send(BaseCommand command, CancellationToken cancellationToken | |
} | ||
|
||
public async Task ProcessIncommingFrames(CancellationToken cancellationToken) | ||
{ | ||
await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs a ConfigureAwait(false) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
|
||
public async Task ProcessIncommingFramesImpl(CancellationToken cancellationToken) | ||
{ | ||
await Task.Yield(); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,29 +25,37 @@ public sealed class PingPongHandler : IAsyncDisposable | |
{ | ||
private readonly IConnection _connection; | ||
private readonly TimeSpan _keepAliveInterval; | ||
private readonly TimeSpan _serverResponseTimeout; | ||
private readonly Timer _timer; | ||
private readonly CommandPing _ping; | ||
private readonly CommandPong _pong; | ||
private long _lastCommand; | ||
private readonly TaskCompletionSource<object> _serverNotRespondingTcs; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The non-generic TaskCompletionSource is a better fit since the object is never used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems it has been removed |
||
|
||
public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval) | ||
public PingPongHandler(IConnection connection, | ||
TimeSpan keepAliveInterval, | ||
TimeSpan serverResponseTimeout) | ||
{ | ||
_connection = connection; | ||
_keepAliveInterval = keepAliveInterval; | ||
_serverResponseTimeout = serverResponseTimeout; | ||
_timer = new Timer(Watch); | ||
_timer.Change(_keepAliveInterval, TimeSpan.Zero); | ||
_ping = new CommandPing(); | ||
_pong = new CommandPong(); | ||
_lastCommand = Stopwatch.GetTimestamp(); | ||
_serverNotRespondingTcs = new TaskCompletionSource<object>(); | ||
} | ||
|
||
public Task ServerNotResponding => _serverNotRespondingTcs.Task; | ||
|
||
public bool Incoming(BaseCommand.Type commandType) | ||
{ | ||
Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp()); | ||
|
||
if (commandType == BaseCommand.Type.Ping) | ||
{ | ||
Task.Factory.StartNew(() => SendPong()); | ||
Task.Factory.StartNew(SendPong); | ||
return true; | ||
} | ||
|
||
|
@@ -61,13 +69,25 @@ private void Watch(object? state) | |
var lastCommand = Interlocked.Read(ref _lastCommand); | ||
var now = Stopwatch.GetTimestamp(); | ||
var elapsed = TimeSpan.FromSeconds((now - lastCommand) / Stopwatch.Frequency); | ||
if (elapsed >= _keepAliveInterval) | ||
|
||
if (elapsed > _serverResponseTimeout) | ||
{ | ||
Task.Factory.StartNew(() => SendPing()); | ||
_timer.Change(_keepAliveInterval, TimeSpan.Zero); | ||
DotPulsarMeter.ServerTimedout(); | ||
_serverNotRespondingTcs.SetResult(new object()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might as well just return here instead of wrapping the following code in an else. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
else | ||
_timer.Change(_keepAliveInterval.Subtract(elapsed), TimeSpan.Zero); | ||
{ | ||
if (elapsed >= _keepAliveInterval) | ||
{ | ||
Task.Factory.StartNew(() => SendPing()); | ||
_timer.Change(_keepAliveInterval, TimeSpan.Zero); | ||
} | ||
else | ||
{ | ||
var result = _keepAliveInterval.Subtract(elapsed); | ||
_timer.Change(result, TimeSpan.Zero); | ||
} | ||
} | ||
} | ||
catch | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<TargetFramework>net6.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<OutputType>Exe</OutputType> | ||
<Nullable>enable</Nullable> | ||
<PublishSingleFile>true</PublishSingleFile> | ||
<DebugType>embedded</DebugType> | ||
<RuntimeIdentifier>linux-x64</RuntimeIdentifier> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" /> | ||
</ItemGroup> | ||
|
||
<Target Name="PublishConsumer" AfterTargets="Build"> | ||
<Exec Command="dotnet publish $(ProjectDir) -r linux-x64 -o $(ProjectDir)..\DotPulsar.Tests\$(OutDir) --no-build" | ||
ConsoleToMSBuild="true" IgnoreExitCode="true" IgnoreStandardErrorWarningFormat="true"> | ||
<Output TaskParameter="ConsoleOutput" PropertyName="ConsoleOutput" /> | ||
<Output TaskParameter="ExitCode" PropertyName="ExitCode" /> | ||
</Exec> | ||
|
||
<Error Condition="'$(ExitCode)' != '0'" Text="$(ConsoleOutput)" /> | ||
</Target> | ||
</Project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
namespace DotPulsar.Consumer; | ||
|
||
using Abstractions; | ||
using Extensions; | ||
using Internal; | ||
using System.Net.Sockets; | ||
|
||
public static class Program | ||
{ | ||
public const string ConnectedAwaitingDisconnection = "Successfully connected, awaiting disconnection"; | ||
public const string DisconnectedAwaitingReconnection = "Successfully disconnected, awaiting reconnection"; | ||
public const string Reconnected = "Successfully reconnected"; | ||
|
||
public static async Task<int> Main(string[] args) | ||
{ | ||
Console.WriteLine($"Running with url {args[0]}"); | ||
var client = CreateClient(args[0], args[1]); | ||
|
||
var testRunId = Guid.NewGuid().ToString("N"); | ||
|
||
var topic = $"persistent://public/default/consumer-tests-{testRunId}"; | ||
await using var consumer = client.NewConsumer(Schema.ByteArray) | ||
.ConsumerName($"consumer-{testRunId}") | ||
.InitialPosition(SubscriptionInitialPosition.Earliest) | ||
.SubscriptionName($"subscription-{testRunId}") | ||
.StateChangedHandler(changed => Console.WriteLine($"Consumer state changed to {changed.ConsumerState}")) | ||
.Topic(topic) | ||
.Create(); | ||
|
||
var timeout = Task.Delay(TimeSpan.FromSeconds(30)); | ||
var result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Active).AsTask()); | ||
|
||
if (result == timeout) | ||
{ | ||
Console.WriteLine("Timed out waiting for active status"); | ||
return -1; | ||
} | ||
|
||
Console.WriteLine(ConnectedAwaitingDisconnection); | ||
timeout = Task.Delay(TimeSpan.FromSeconds(30)); | ||
result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Disconnected).AsTask()); | ||
|
||
if (result == timeout) | ||
{ | ||
Console.WriteLine("Timed out waiting for disconnected status"); | ||
return -2; | ||
} | ||
|
||
Console.WriteLine(DisconnectedAwaitingReconnection); | ||
timeout = Task.Delay(TimeSpan.FromSeconds(30)); | ||
result = await Task.WhenAny(timeout, consumer.StateChangedTo(ConsumerState.Active).AsTask()); | ||
|
||
if (result == timeout) | ||
{ | ||
Console.WriteLine("Timed out waiting for reconnected status"); | ||
return -3; | ||
} | ||
|
||
Console.WriteLine(Reconnected); | ||
return 0; | ||
} | ||
|
||
private static IPulsarClient CreateClient(string url, string token) | ||
=> PulsarClient | ||
.Builder() | ||
.Authentication(AuthenticationFactory.Token(_ => ValueTask.FromResult(token))) | ||
.KeepAliveInterval(TimeSpan.FromSeconds(5)) | ||
.ServerResponseTimeout(TimeSpan.FromSeconds(10)) | ||
.ExceptionHandler(new TestExceptionHandler()) | ||
.ServiceUrl(new Uri(url)) | ||
.Build(); | ||
} | ||
|
||
internal class TestExceptionHandler : IHandleException | ||
{ | ||
private readonly IHandleException _inner = new DefaultExceptionHandler(TimeSpan.FromSeconds(5)); | ||
|
||
public ValueTask OnException(ExceptionContext exceptionContext) | ||
{ | ||
Console.WriteLine("Exception occurred: {0}", exceptionContext.Exception.Message); | ||
|
||
// This occurs when reconnecting the docker network, it takes some time for the alias to be reestablished | ||
if (exceptionContext.Exception is SocketException se && se.Message.Contains("Name or service not known")) | ||
{ | ||
exceptionContext.ExceptionHandled = true; | ||
exceptionContext.Result = FaultAction.Retry; | ||
} | ||
else | ||
{ | ||
_inner.OnException(exceptionContext); | ||
} | ||
|
||
return ValueTask.CompletedTask; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,3 +18,6 @@ namespace DotPulsar.Tests; | |
|
||
[CollectionDefinition("Integration")] | ||
public class IntegrationCollection : ICollectionFixture<IntegrationFixture> { } | ||
|
||
[CollectionDefinition("KeepAlive")] | ||
public class KeepAliveCollection : ICollectionFixture<KeepAliveFixture> { } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is a new (standalone) cluster needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to use the existing cluster but once you start poking around with the network subsequent tests failed so I opted for a separate cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get the new tests into DotPulsar.Tests instead of creating new test projects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need an executable that I can run in docker to connect to the server