Skip to content

Commit

Permalink
Removed handle_exception and moved it to connect() to avoid reraise
Browse files Browse the repository at this point in the history
  • Loading branch information
trana authored and trana committed Dec 21, 2023
1 parent 7d89946 commit 54e12ef
Showing 1 changed file with 45 additions and 55 deletions.
100 changes: 45 additions & 55 deletions fastapi_websocket_rpc/websocket_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,8 @@ def __init__(self):
https://websocket-client.readthedocs.io/en/latest/core.html#websocket._core.WebSocket.connect
"""
async def connect(self, uri: str, **connect_kwargs):
self._websocket = await asyncio.get_event_loop().run_in_executor(None, websocket.create_connection, uri, **connect_kwargs)

async def send(self, msg):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
await asyncio.get_event_loop().run_in_executor(None, self._websocket.send, msg)

async def recv(self):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
try:
msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv)
except websocket.WebSocketConnectionClosedException as err:
logger.debug("Connection closed.", exc_info=True)
# websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened
# Returning None to ensure we get out of the loop, with no Exception.
return None
return msg

async def close(self, code: int = 1000):
if self._websocket is not None:
# Case opened, we have something to close.
self._websocket.close(code)

async def handle_exception(self, exception: Exception):
try:
raise exception
self._websocket = await asyncio.get_event_loop().run_in_executor(None, websocket.create_connection, uri, **connect_kwargs)
# See https://websocket-client.readthedocs.io/en/latest/exceptions.html
except websocket._exceptions.WebSocketAddressException:
logger.info("websocket address info cannot be found")
Expand Down Expand Up @@ -98,36 +71,22 @@ async def handle_exception(self, exception: Exception):
logger.exception("RPC Error")
raise

class WebSocketsClientHandler(SimpleWebSocket):
"""
Handler that use https://websockets.readthedocs.io/en/stable module.
This implementation does not support HTTP proxy (see https://github.com/python-websockets/websockets/issues/364).
"""
def __init__(self):
self._websocket = None

"""
Args:
**kwargs: Additional args passed to connect
https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection
"""
async def connect(self, uri: str, **connect_kwargs):
self._websocket = await websockets.connect(uri, **connect_kwargs)

async def send(self, msg):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
await self._websocket.send(msg)
await asyncio.get_event_loop().run_in_executor(None, self._websocket.send, msg)

async def recv(self):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
try:
msg = await self._websocket.recv()
except websockets.exceptions.ConnectionClosed:
msg = await asyncio.get_event_loop().run_in_executor(None, self._websocket.recv)
except websocket.WebSocketConnectionClosedException as err:
logger.debug("Connection closed.", exc_info=True)
# websocket.WebSocketConnectionClosedException means remote host closed the connection or some network error happened
# Returning None to ensure we get out of the loop, with no Exception.
return None
return msg

Expand All @@ -136,9 +95,22 @@ async def close(self, code: int = 1000):
# Case opened, we have something to close.
self._websocket.close(code)

async def handle_exception(self, exception: Exception):
class WebSocketsClientHandler(SimpleWebSocket):
"""
Handler that use https://websockets.readthedocs.io/en/stable module.
This implementation does not support HTTP proxy (see https://github.com/python-websockets/websockets/issues/364).
"""
def __init__(self):
self._websocket = None

"""
Args:
**kwargs: Additional args passed to connect
https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#opening-a-connection
"""
async def connect(self, uri: str, **connect_kwargs):
try:
raise exception
self._websocket = await websockets.connect(uri, **connect_kwargs)
except ConnectionRefusedError:
logger.info("RPC connection was refused by server")
raise
Expand All @@ -162,6 +134,28 @@ async def handle_exception(self, exception: Exception):
logger.exception("RPC Error")
raise

async def send(self, msg):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
await self._websocket.send(msg)

async def recv(self):
if self._websocket is None:
# connect must be called before.
logging.error("Websocket connect() must be called before.")
try:
msg = await self._websocket.recv()
except websockets.exceptions.ConnectionClosed:
logger.debug("Connection closed.", exc_info=True)
return None
return msg

async def close(self, code: int = 1000):
if self._websocket is not None:
# Case opened, we have something to close.
self._websocket.close(code)

def isNotInvalidStatusCode(value):
return not isinstance(value, InvalidStatusCode)

Expand Down Expand Up @@ -287,12 +281,8 @@ async def __connect__(self):
self.cancel_tasks()
raise
except Exception as err:
if self.ws is not None:
# Exception could be websocket client specific.
await self.ws.handle_exception(err)
else:
logger.exception("RPC Error")
raise
logger.exception("RPC Error")
raise

async def __aenter__(self):
if self.retry_config is False:
Expand Down

0 comments on commit 54e12ef

Please sign in to comment.