Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue reconnecting to a cluster client or member. Fixes bug introduced in 1.7.1.alpha-0.4 build #2142

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 45 additions & 29 deletions src/Proto.Remote/Endpoints/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,62 @@ private async Task OnEndpointTerminated(EndpointTerminatedEvent evt)
}

Action? unblock = null;
IEndpoint? endpoint = null;
lock (_synLock)
try
{
if (_cancellationTokenSource.IsCancellationRequested)
IEndpoint? endpoint = null;
lock (_synLock)
{
return;
if (_cancellationTokenSource.IsCancellationRequested)
{
return;
}

if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint))
{
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow);
unblock = () => _blockedAddresses.TryRemove(evt.Address, out _);
}
else if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint))
{
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow);
unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
}
}
if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint))

if (endpoint != null)
{
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow);
unblock = () => _blockedAddresses.TryRemove(evt.Address, out _);
}
// leave the lock to dispose the endpoint, so that requests can't build up behind the lock
// the address will always be blocked while we dispose, at a minimum
await endpoint.DisposeAsync().ConfigureAwait(false);

Logger.LogInformation("[{SystemAddress}] Endpoint {Address} terminated", _system.Address,
evt.Address ?? evt.ActorSystemId);

if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint))
if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue)
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false);
}

}
else
{
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow);
unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
Logger.LogDebug("[{SystemAddress}] Endpoint {Address} already removed.", _system.Address,
evt.Address ?? evt.ActorSystemId);
}
}

if (endpoint != null)
catch (Exception ex)
{
// leave the lock to dispose the endpoint, so that requests can't build up behind the lock
// the address will always be blocked while we dispose, at a minimum
await endpoint.DisposeAsync().ConfigureAwait(false);
if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue)
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false);
if (_cancellationTokenSource.IsCancellationRequested)
{
return;
}
}

// since these async EventStream subscription handlers are fire and forget, we need to
// log if something goes wrong, or we'll never know
Logger.LogError(ex, "[{SystemAddress}] Error during endpoint {Address} termination", _system.Address,
evt.Address ?? evt.ActorSystemId);
}
finally
{
// make sure that the unblock action runs if it was set, or we can end up with a forever blocked address
// which is bad if a new endpoint is started with the same address, or the same one restarts and reconnects
unblock?.Invoke();
}

Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminated", _system.Address,
evt.Address ?? evt.ActorSystemId);
}

internal IEndpoint GetOrAddServerEndpoint(string? address)
Expand Down
7 changes: 7 additions & 0 deletions src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ await responseStream.WriteAsync(new RemoteMessage
address = serverConnection.Address;
systemId = serverConnection.MemberId;
endpoint = _endpointManager.GetOrAddServerEndpoint(address);
if (!endpoint.IsActive)
{
Logger.LogWarning(
"[EndpointReader][{SystemAddress}] Failed to connect back to remote member {MemberId} address {Address} for writes",
_system.Address, connectRequest.ServerConnection.MemberId,
connectRequest.ServerConnection.Address);
}
}

break;
Expand Down
12 changes: 10 additions & 2 deletions src/Proto.Remote/Endpoints/ServerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private async Task RunAsync()

while (!_cts.IsCancellationRequested)
{
var cancellationTokenSource = new CancellationTokenSource();
try
{
_logger.LogInformation("[ServerConnector][{SystemAddress}] Connecting to {Address}", _system.Address,
Expand Down Expand Up @@ -142,7 +143,7 @@ await call.RequestStream.WriteAsync(new RemoteMessage
_logger.LogError(
"[ServerConnector][{SystemAddress}] Connection Refused to remote member {MemberId} address {Address}, we are blocked",
_system.Address, connectResponse.MemberId, _address);

//block self
_system.Remote().BlockList.Block(new[] { _system.Id }, "Blocked by remote member");
var terminated = new EndpointTerminatedEvent(false, _address, _system.Id);
Expand All @@ -166,7 +167,6 @@ await call.RequestStream.WriteAsync(new RemoteMessage
}

rs.Reset();
var cancellationTokenSource = new CancellationTokenSource();

var combinedToken = CancellationTokenSource
.CreateLinkedTokenSource(_cts.Token, cancellationTokenSource.Token)
Expand Down Expand Up @@ -238,6 +238,14 @@ await call.RequestStream.WriteAsync(new RemoteMessage
"[ServerConnector][{SystemAddress}] Restarting endpoint connection to {Address} after {Duration} because of {Reason} ({Retries} / {MaxRetries})",
_system.Address, _address, duration, e.GetType().Name, rs.FailureCount, _maxNrOfRetries);
}
finally
{
// always cancel the token for this writer/reader, otherwise their loops can continue
// running indefinitely, depending on how we got here. the call does get disposed via
// a using above, but we want to be certain these loops don't continue running, especially
// since these can build up when there are multiple reconnect attempts
cancellationTokenSource.Cancel();
}
}
}

Expand Down
Loading