From 954d67512c7587c72fe98938c9cff44bde2b4aac Mon Sep 17 00:00:00 2001 From: aleksandr-mokrov <76171391+aleksandr-mokrov@users.noreply.github.com> Date: Wed, 9 Feb 2022 19:18:24 +0300 Subject: [PATCH] Additional error handling --- openfl/transport/grpc/aggregator_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openfl/transport/grpc/aggregator_client.py b/openfl/transport/grpc/aggregator_client.py index 7d11d6170f9..6e1604a3df4 100644 --- a/openfl/transport/grpc/aggregator_client.py +++ b/openfl/transport/grpc/aggregator_client.py @@ -137,16 +137,19 @@ class AsyncRetryOnRpcErrorUnaryStreamClientInterceptor( async def _intercept_call(self, continuation, client_call_details, request_or_iterator): """Intercept the async call to the gRPC server.""" started_time = time.time() + is_connected = False while True: try: async_iter = await continuation(client_call_details, request_or_iterator) async for response in async_iter: yield response + is_connected = True except grpc.aio.AioRpcError as error: if ( error.code() == grpc.StatusCode.UNAVAILABLE and error.details() == 'Socket closed' - ): + ) or is_connected: + logger.info(f'Rpc error: {error.code()}, details: {error.details()}.') return await self.handle_error_for_retry(error, started_time)