diff --git a/src/Proto.Remote/Endpoints/EndpointManager.cs b/src/Proto.Remote/Endpoints/EndpointManager.cs index 9fd4904321..ae7f8749fe 100644 --- a/src/Proto.Remote/Endpoints/EndpointManager.cs +++ b/src/Proto.Remote/Endpoints/EndpointManager.cs @@ -97,46 +97,62 @@ private async Task OnEndpointTerminated(EndpointTerminatedEvent evt) } Action? unblock = null; - IEndpoint? endpoint = null; - lock (_synLock) + try { - if (_cancellationTokenSource.IsCancellationRequested) + IEndpoint? endpoint = null; + lock (_synLock) { - return; + if (_cancellationTokenSource.IsCancellationRequested) + { + return; + } + + if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint)) + { + _blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow); + unblock = () => _blockedAddresses.TryRemove(evt.Address, out _); + } + else if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint)) + { + _blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow); + unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _); + } } - - if (evt.Address is not null && _serverEndpoints.TryRemove(evt.Address, out endpoint)) + + if (endpoint != null) { - _blockedAddresses.TryAdd(evt.Address, DateTime.UtcNow); - unblock = () => _blockedAddresses.TryRemove(evt.Address, out _); - } + // leave the lock to dispose the endpoint, so that requests can't build up behind the lock + // the address will always be blocked while we dispose, at a minimum + await endpoint.DisposeAsync().ConfigureAwait(false); + + Logger.LogInformation("[{SystemAddress}] Endpoint {Address} terminated", _system.Address, + evt.Address ?? evt.ActorSystemId); - if (evt.ActorSystemId is not null && _clientEndpoints.TryRemove(evt.ActorSystemId, out endpoint)) + if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue) + { + await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false); + } + + } + else { - _blockedClientSystemIds.TryAdd(evt.ActorSystemId, DateTime.UtcNow); - unblock = () => _blockedClientSystemIds.TryRemove(evt.ActorSystemId, out _); + Logger.LogDebug("[{SystemAddress}] Endpoint {Address} already removed.", _system.Address, + evt.Address ?? evt.ActorSystemId); } } - - if (endpoint != null) + catch (Exception ex) { - // leave the lock to dispose the endpoint, so that requests can't build up behind the lock - // the address will always be blocked while we dispose, at a minimum - await endpoint.DisposeAsync().ConfigureAwait(false); - if (evt.ShouldBlock && _remoteConfig.WaitAfterEndpointTerminationTimeSpan.HasValue) - { - await Task.Delay(_remoteConfig.WaitAfterEndpointTerminationTimeSpan.Value, CancellationToken).ConfigureAwait(false); - if (_cancellationTokenSource.IsCancellationRequested) - { - return; - } - } - + // since these async EventStream subscription handlers are fire and forget, we need to + // log if something goes wrong, or we'll never know + Logger.LogError(ex, "[{SystemAddress}] Error during endpoint {Address} termination", _system.Address, + evt.Address ?? evt.ActorSystemId); + } + finally + { + // make sure that the unblock action runs if it was set, or we can end up with a forever blocked address + // which is bad if a new endpoint is started with the same address, or the same one restarts and reconnects unblock?.Invoke(); } - - Logger.LogDebug("[{SystemAddress}] Endpoint {Address} terminated", _system.Address, - evt.Address ?? evt.ActorSystemId); } internal IEndpoint GetOrAddServerEndpoint(string? address) diff --git a/src/Proto.Remote/Endpoints/EndpointReader.cs b/src/Proto.Remote/Endpoints/EndpointReader.cs index 602d3484b5..ca7916fbf8 100644 --- a/src/Proto.Remote/Endpoints/EndpointReader.cs +++ b/src/Proto.Remote/Endpoints/EndpointReader.cs @@ -196,6 +196,13 @@ await responseStream.WriteAsync(new RemoteMessage address = serverConnection.Address; systemId = serverConnection.MemberId; endpoint = _endpointManager.GetOrAddServerEndpoint(address); + if (!endpoint.IsActive) + { + Logger.LogWarning( + "[EndpointReader][{SystemAddress}] Failed to connect back to remote member {MemberId} address {Address} for writes", + _system.Address, connectRequest.ServerConnection.MemberId, + connectRequest.ServerConnection.Address); + } } break; diff --git a/src/Proto.Remote/Endpoints/ServerConnector.cs b/src/Proto.Remote/Endpoints/ServerConnector.cs index b061295adb..f3f23dd4ad 100644 --- a/src/Proto.Remote/Endpoints/ServerConnector.cs +++ b/src/Proto.Remote/Endpoints/ServerConnector.cs @@ -74,6 +74,7 @@ private async Task RunAsync() while (!_cts.IsCancellationRequested) { + var cancellationTokenSource = new CancellationTokenSource(); try { _logger.LogInformation("[ServerConnector][{SystemAddress}] Connecting to {Address}", _system.Address, @@ -142,7 +143,7 @@ await call.RequestStream.WriteAsync(new RemoteMessage _logger.LogError( "[ServerConnector][{SystemAddress}] Connection Refused to remote member {MemberId} address {Address}, we are blocked", _system.Address, connectResponse.MemberId, _address); - + //block self _system.Remote().BlockList.Block(new[] { _system.Id }, "Blocked by remote member"); var terminated = new EndpointTerminatedEvent(false, _address, _system.Id); @@ -166,7 +167,6 @@ await call.RequestStream.WriteAsync(new RemoteMessage } rs.Reset(); - var cancellationTokenSource = new CancellationTokenSource(); var combinedToken = CancellationTokenSource .CreateLinkedTokenSource(_cts.Token, cancellationTokenSource.Token) @@ -238,6 +238,14 @@ await call.RequestStream.WriteAsync(new RemoteMessage "[ServerConnector][{SystemAddress}] Restarting endpoint connection to {Address} after {Duration} because of {Reason} ({Retries} / {MaxRetries})", _system.Address, _address, duration, e.GetType().Name, rs.FailureCount, _maxNrOfRetries); } + finally + { + // always cancel the token for this writer/reader, otherwise their loops can continue + // running indefinitely, depending on how we got here. the call does get disposed via + // a using above, but we want to be certain these loops don't continue running, especially + // since these can build up when there are multiple reconnect attempts + cancellationTokenSource.Cancel(); + } } }