diff --git a/worterbuch-client/src/lib.rs b/worterbuch-client/src/lib.rs index d416ae6..2a27bbb 100644 --- a/worterbuch-client/src/lib.rs +++ b/worterbuch-client/src/lib.rs @@ -736,6 +736,13 @@ async fn run( } } +async fn send_with_timeout(ws: &mut WebSocket, msg: Message) -> ConnectionResult<()> { + select! { + r = ws.send(msg) => Ok(r?), + _ = sleep(Duration::from_secs(1)) => Err(ConnectionError::Timeout), + } +} + async fn process_incoming_command( cmd: Option, callbacks: &mut Callbacks, @@ -911,7 +918,7 @@ async fn send_client_message(cm: CM, websocket: &mut WebSocket) -> ConnectionRes let json = serde_json::to_string(&cm)?; log::debug!("Sending message: {json}"); let msg = Message::Text(json); - websocket.send(msg).await?; + send_with_timeout(websocket, msg).await?; Ok(()) } @@ -1026,7 +1033,7 @@ async fn send_keepalive(websocket: &mut WebSocket) -> ConnectionResult<()> { log::trace!("Sending keepalive"); let json = serde_json::to_string(&ClientMessage::Keepalive)?; let msg = Message::Text(json); - websocket.send(msg).await?; + send_with_timeout(websocket, msg).await?; Ok(()) } diff --git a/worterbuch-common/src/error.rs b/worterbuch-common/src/error.rs index 12ce104..6db292e 100644 --- a/worterbuch-common/src/error.rs +++ b/worterbuch-common/src/error.rs @@ -152,6 +152,7 @@ pub enum ConnectionError { ConfigError(ConfigError), SerdeError(serde_json::Error), AckError(broadcast::error::SendError), + Timeout, } impl std::error::Error for ConnectionError {} @@ -169,6 +170,7 @@ impl fmt::Display for ConnectionError { Self::ConfigError(e) => fmt::Display::fmt(&e, f), Self::SerdeError(e) => fmt::Display::fmt(&e, f), Self::AckError(e) => fmt::Display::fmt(&e, f), + Self::Timeout => fmt::Display::fmt("timeout", f), } } } diff --git a/worterbuch/src/server/poem.rs b/worterbuch/src/server/poem.rs index 5e2d293..415bc4b 100644 --- a/worterbuch/src/server/poem.rs +++ b/worterbuch/src/server/poem.rs @@ -337,22 +337,29 @@ impl ClientHandler { // - send a websocket message if none has been sent for over a second select! { recv = self.websocket.next() => if let Some(msg) = recv { - // received websocket message, no need to check how long it has been since the last one - self.last_keepalive_rx = Instant::now(); - let incoming_msg = msg.context("Error in WebSocket connection")?; - // send a message if none has been sent for over a second - self.send_keepalive().await.context("Error sending keepalive signal")?; - if let Message::Text(text) = incoming_msg { - let (msg_processed, handshake) = process_incoming_message( - self.client_id, - &text, - self.worterbuch.clone(), - tx.clone(), - &self.proto_version, - ) - .await.context("Error processing incoming message")?; - self.handshake_complete |= msg_processed && handshake; - if !msg_processed { + match msg { + Ok(incoming_msg) => { + // received websocket message, no need to check how long it has been since the last one + self.last_keepalive_rx = Instant::now(); + // send a message if none has been sent for over a second + self.send_keepalive().await.context("Error sending keepalive signal")?; + if let Message::Text(text) = incoming_msg { + let (msg_processed, handshake) = process_incoming_message( + self.client_id, + &text, + self.worterbuch.clone(), + tx.clone(), + &self.proto_version, + ) + .await.context("Error processing incoming message")?; + self.handshake_complete |= msg_processed && handshake; + if !msg_processed { + break; + } + } + }, + Err(e) => { + log::error!("Error in WebSocket connection: {e}"); break; } } @@ -366,7 +373,7 @@ impl ClientHandler { // send websocket message, no need to check when the last one was sent self.last_keepalive_tx = Instant::now(); let msg = Message::text(text); - self.websocket.send(msg).await.context("Error sending keepalive signal")?; + self.send_with_timeout(msg).await.context("Error sending keepalive signal")?; } else { break; }, @@ -388,7 +395,7 @@ impl ClientHandler { log::trace!("Sending keepalive"); let json = serde_json::to_string(&ServerMessage::Keepalive)?; let msg = Message::Text(json); - self.websocket.send(msg).await?; + self.send_with_timeout(msg).await?; } Ok(()) @@ -416,4 +423,11 @@ impl ClientHandler { Ok(()) } } + + async fn send_with_timeout(&mut self, msg: Message) -> anyhow::Result<()> { + select! { + r = self.websocket.send(msg) => Ok(r?), + _ = sleep(Duration::from_secs(1)) => Err(anyhow!("Send timeout")), + } + } }