Skip to content

Commit

Permalink
Fix issue reconnecting to a cluster client or member. Bug was introdu…
Browse files Browse the repository at this point in the history
…ced in 1.7.1.alpha-0.4 build. (#2142)
  • Loading branch information
benbenwilde authored Nov 14, 2024
1 parent 7193337 commit b01daea
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 31 deletions.
74 changes: 45 additions & 29 deletions src/Proto.Remote/Endpoints/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,62 @@ private async Task OnEndpointTerminated(EndpointTerminatedEvent evt)
}

Action? unblock = null;
IEndpoint? endpoint = null;
lock (_synLock)
try
{
if (_cancellationTokenSource.IsCancellationRequested)
IEndpoint? endpoint = null;
lock (_synLock)
{
return;
if (_cancellationTokenSource.IsCancellationRequested)
{
return;
}

if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint))
{
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow);
unblock = () => _blockedAddresses.TryRemove(evt.Address, out _);
}
else if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint))
{
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow);
unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
}
}
if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint))

if (endpoint != null)
{
_blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow);
unblock = () => _blockedAddresses.TryRemove(evt.Address, out _);
}
// leave the lock to dispose the endpoint, so that requests can't build up behind the lock
// the address will always be blocked while we dispose, at a minimum
await endpoint.DisposeAsync().ConfigureAwait(false);

Logger.LogInformation("[{SystemAddress}] Endpoint {Address} terminated", _system.Address,
evt.Address ?? evt.ActorSystemId);

if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint))
if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue)
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false);
}

}
else
{
_blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow);
unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _);
Logger.LogDebug("[{SystemAddress}] Endpoint {Address} already removed.", _system.Address,
evt.Address ?? evt.ActorSystemId);
}
}

if (endpoint != null)
catch (Exception ex)
{
// leave the lock to dispose the endpoint, so that requests can't build up behind the lock
// the address will always be blocked while we dispose, at a minimum
await endpoint.DisposeAsync().ConfigureAwait(false);
if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue)
{
await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false);
if (_cancellationTokenSource.IsCancellationRequested)
{
return;
}
}

// since these async EventStream subscription handlers are fire and forget, we need to
// log if something goes wrong, or we'll never know
Logger.LogError(ex, "[{SystemAddress}] Error during endpoint {Address} termination", _system.Address,
evt.Address ?? evt.ActorSystemId);
}
finally
{
// make sure that the unblock action runs if it was set, or we can end up with a forever blocked address
// which is bad if a new endpoint is started with the same address, or the same one restarts and reconnects
unblock?.Invoke();
}

Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminated", _system.Address,
evt.Address ?? evt.ActorSystemId);
}

internal IEndpoint GetOrAddServerEndpoint(string? address)
Expand Down
7 changes: 7 additions & 0 deletions src/Proto.Remote/Endpoints/EndpointReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ await responseStream.WriteAsync(new RemoteMessage
address = serverConnection.Address;
systemId = serverConnection.MemberId;
endpoint = _endpointManager.GetOrAddServerEndpoint(address);
if (!endpoint.IsActive)
{
Logger.LogWarning(
"[EndpointReader][{SystemAddress}] Failed to connect back to remote member {MemberId} address {Address} for writes",
_system.Address, connectRequest.ServerConnection.MemberId,
connectRequest.ServerConnection.Address);
}
}

break;
Expand Down
12 changes: 10 additions & 2 deletions src/Proto.Remote/Endpoints/ServerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private async Task RunAsync()

while (!_cts.IsCancellationRequested)
{
var cancellationTokenSource = new CancellationTokenSource();
try
{
_logger.LogInformation("[ServerConnector][{SystemAddress}] Connecting to {Address}", _system.Address,
Expand Down Expand Up @@ -142,7 +143,7 @@ await call.RequestStream.WriteAsync(new RemoteMessage
_logger.LogError(
"[ServerConnector][{SystemAddress}] Connection Refused to remote member {MemberId} address {Address}, we are blocked",
_system.Address, connectResponse.MemberId, _address);

//block self
_system.Remote().BlockList.Block(new[] { _system.Id }, "Blocked by remote member");
var terminated = new EndpointTerminatedEvent(false, _address, _system.Id);
Expand All @@ -166,7 +167,6 @@ await call.RequestStream.WriteAsync(new RemoteMessage
}

rs.Reset();
var cancellationTokenSource = new CancellationTokenSource();

var combinedToken = CancellationTokenSource
.CreateLinkedTokenSource(_cts.Token, cancellationTokenSource.Token)
Expand Down Expand Up @@ -238,6 +238,14 @@ await call.RequestStream.WriteAsync(new RemoteMessage
"[ServerConnector][{SystemAddress}] Restarting endpoint connection to {Address} after {Duration} because of {Reason} ({Retries} / {MaxRetries})",
_system.Address, _address, duration, e.GetType().Name, rs.FailureCount, _maxNrOfRetries);
}
finally
{
// always cancel the token for this writer/reader, otherwise their loops can continue
// running indefinitely, depending on how we got here. the call does get disposed via
// a using above, but we want to be certain these loops don't continue running, especially
// since these can build up when there are multiple reconnect attempts
cancellationTokenSource.Cancel();
}
}
}

Expand Down

0 comments on commit b01daea

Please sign in to comment.