Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add server timeout functionality plus docker test #104

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

goldenccargill
Copy link
Contributor

As mentioned in the keep alive section of the pulsar binary configuration here if the server stops responding to our pings when we have to assume the connection is no longer valid. We can't just rely on the transport to tell us if the connection is alive or not. I've implemented it based on any response from the server rather than specifically a pong, its open to debate if this is correct, I could go either way.

I've also added a test to demonstrate the bug which is fixed by my code in the PingPongHandler

Copy link
Contributor

@blankensteiner blankensteiner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!
Feel free to reach out here or on Slack if you have any questions :-)

/// the disconnected state and attempt to reconnect
/// The default is 60 seconds.
/// </summary>
IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also a configurable setting for other clients? If not, we could just hardcode it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't know about other clients but it seems it is configurable on the broker so the client setting should agree with that so I think it needs to be configurable here. Shame the value isn't returned from the server as part of the connection response

private readonly Timer _timer;
private readonly CommandPing _ping;
private readonly CommandPong _pong;
private long _lastCommand;
private readonly TaskCompletionSource<object> _serverNotRespondingTcs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-generic TaskCompletionSource is a better fit since the object is never used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it has been removed
StephenCleary/AsyncEx#176

Task.Factory.StartNew(() => SendPing());
_timer.Change(_keepAliveInterval, TimeSpan.Zero);
DotPulsarMeter.ServerTimedout();
_serverNotRespondingTcs.SetResult(new object());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might as well just return here instead of wrapping the following code in an else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -152,6 +152,7 @@ private IPulsarClient CreateClient()
=> PulsarClient
.Builder()
.Authentication(AuthenticationFactory.Token(ct => ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
.KeepAliveInterval(TimeSpan.FromSeconds(5))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make the test run faster, the check to see if the server responded happens every 5 seconds instead of 30

@@ -18,3 +18,6 @@ namespace DotPulsar.Tests;

[CollectionDefinition("Integration")]
public class IntegrationCollection : ICollectionFixture<IntegrationFixture> { }

[CollectionDefinition("KeepAlive")]
public class KeepAliveCollection : ICollectionFixture<KeepAliveFixture> { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a new (standalone) cluster needed?
Seems we could solve this with just one cluster/integration fixture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use the existing cluster but once you start poking around with the network subsequent tests failed so I opted for a separate cluster

@@ -1,4 +1,5 @@
{
"$schema": "https://xunit.net/schema/current/xunit.runner.schema.json",
"diagnosticMessages": true
"diagnosticMessages": true,
"parallelizeTestCollections": false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed again if you go ahead with just one cluster for all integration tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for same reason as above I can't

@@ -294,6 +298,11 @@ private async Task Send(BaseCommand command, CancellationToken cancellationToken
}

public async Task ProcessIncommingFrames(CancellationToken cancellationToken)
{
await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a ConfigureAwait(false)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing", "samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Consumer", "tests\DotPulsar.Consumer\DotPulsar.Consumer.csproj", "{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get the new tests into DotPulsar.Tests instead of creating new test projects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need an executable that I can run in docker to connect to the server

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants