Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WSS receive_json/string/binary API error handling #8581

Open
1 task done
arcivanov opened this issue Aug 1, 2024 · 5 comments
Open
1 task done

WSS receive_json/string/binary API error handling #8581

arcivanov opened this issue Aug 1, 2024 · 5 comments

Comments

@arcivanov
Copy link
Contributor

arcivanov commented Aug 1, 2024

Describe the solution you'd like

The current API provides for a very bizarre/absent error handling mechanism for web socket receive operations.

Specifically these are the current implementations for receiving string, bytes and json:

async def receive_str(self, *, timeout: Optional[float] = None) -> str:
msg = await self.receive(timeout)
if msg.type is not WSMsgType.TEXT:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
return cast(str, msg.data)
async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
msg = await self.receive(timeout)
if msg.type is not WSMsgType.BINARY:
raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
return cast(bytes, msg.data)
async def receive_json(
self,
*,
loads: JSONDecoder = DEFAULT_JSON_DECODER,
timeout: Optional[float] = None,
) -> Any:
data = await self.receive_str(timeout=timeout)
return loads(data)
def __aiter__(self) -> "ClientWebSocketResponse":
return self

As you can see any message that is not of the type TEXT or BINARY will result in a TypeError raised. However, if we inspect the underlying receive, we'll see that whole bunch of error and state conditions will produce messages that are neither TEXT nor BINARY, including WS_CLOSED_MESSAGE, WSMsgType.CLOSED and WSMsgType.ERROR (this one is internal API implementation and should never be exposed to a user anyway) [emphasis added by ****]:

    async def receive(self, timeout: Optional[float] = None) -> WSMessage:
        while True:
            if self._waiting:
                raise RuntimeError("Concurrent call to receive() is not allowed")


            if self._closed:
****                return WS_CLOSED_MESSAGE
            elif self._closing:
                await self.close()
****                return WS_CLOSED_MESSAGE


            try:
                self._waiting = True
                try:
                    async with async_timeout.timeout(timeout or self._receive_timeout):
                        msg = await self._reader.read()
                    self._reset_heartbeat()
                finally:
                    self._waiting = False
                    if self._close_wait:
                        set_result(self._close_wait, None)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                self._close_code = WSCloseCode.ABNORMAL_CLOSURE
                raise
            except EofStream:
                self._close_code = WSCloseCode.OK
                await self.close()
****               return WSMessage(WSMsgType.CLOSED, None, None)
            except ClientError:
                self._closed = True
                self._close_code = WSCloseCode.ABNORMAL_CLOSURE
****                return WS_CLOSED_MESSAGE
            except WebSocketError as exc:
                self._close_code = exc.code
                await self.close(code=exc.code)
****               return WSMessage(WSMsgType.ERROR, exc, None)
            except Exception as exc:
                self._exception = exc
                self._closing = True
                self._close_code = WSCloseCode.ABNORMAL_CLOSURE
                await self.close()
****                return WSMessage(WSMsgType.ERROR, exc, None)


            if msg.type is WSMsgType.CLOSE:
                self._closing = True
                self._close_code = msg.data
                if not self._closed and self._autoclose:
                    await self.close()
            elif msg.type is WSMsgType.CLOSING:
                self._closing = True
            elif msg.type is WSMsgType.PING and self._autoping:
                await self.pong(msg.data)
                continue
            elif msg.type is WSMsgType.PONG and self._autoping:
                continue


            return msg

So, what happens when you receive_str on a closed web socket? - TypeError
Server-initiated socket closing? TypeError
TCP connection reset and disconnected? TypeError

The TypeError is a builtin, its only payload is a string and information about the state or exception is entirely lost except as encoded within the error string. This is clearly not a desired API behavior.

In my application internally I had to implement my own wrapper of the receive as follows to ensure that:

  1. If WSS is closed normally a None is returned to indicate EOF.
  2. If we receive an unsolicited server closure, a ServerDisconnectedError is raised.
  3. If an exception occurs during a receive, the exception is unwrapped from the WSMessage(WSMsgType.ERROR, exc, None) and re-raised.

The code is as follows and I believe the API should change along the similar lines:

    async def ws_receive(self, ws_conn: ClientWebSocketResponse):
        async def receive_str(timeout: float | None = None) -> str | None:
            msg = await ws_conn.receive(timeout)
            if msg.type == WSMsgType.CLOSE:
                raise ServerDisconnectedError()
            if msg.type in (WSMsgType.CLOSING, WSMsgType.CLOSED):
                return None
            if msg.type == WSMsgType.ERROR:
                raise msg.data
            if msg.type != WSMsgType.TEXT:
                raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
            return cast(str, msg.data)

        async def receive_json(
                loads: JSONDecoder = DEFAULT_JSON_DECODER,
                timeout: float | None = None, ) -> Any:
            data = await receive_str(timeout=timeout)
            if data is None:
                return None
            return loads(data)

        return await receive_json()

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct
@arcivanov arcivanov changed the title WSS receive_json API error handling WSS receive_json/string/binary API error handling Aug 2, 2024
@Dreamsorcerer
Copy link
Member

Such a change sounds sensible to me atleast. Probably want errors etc. to behave similarly to .receive().

@arcivanov
Copy link
Contributor Author

One of the issues with receive though, is that receive returns WSMessage with WSMsgType.ERROR which is an implementation detail and should not be exposed to public API IMO. I.e. the current receive should be _receive, and the public implementation of receive should wrap the _receive to unwrap the WSMessage(WSMsgType.ERROR, exc, None) to an exception at the very least.

@Dreamsorcerer
Copy link
Member

It is part of the public API though, and explicitly mentioned in the docs (e.g. https://docs.aiohttp.org/en/stable/client_quickstart.html#websockets).

@Dreamsorcerer
Copy link
Member

Maybe it makes sense to change the behaviour, but such a change would have to go into v4...

@arcivanov
Copy link
Contributor Author

It is part of the public API though, and explicitly mentioned in the docs (e.g. https://docs.aiohttp.org/en/stable/client_quickstart.html#websockets).

This is in re receive only, right? There is no way for this type of error handling in receive_string/binary/json.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants