Skip to content

Commit

Permalink
Merge pull request #9 from mbachmann-sts/main
Browse files Browse the repository at this point in the history
added send timeouts
  • Loading branch information
babymotte authored Aug 21, 2023
2 parents 0c1cd07 + db87709 commit 61eb823
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
11 changes: 9 additions & 2 deletions worterbuch-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,13 @@ async fn run(
}
}

async fn send_with_timeout(ws: &mut WebSocket, msg: Message) -> ConnectionResult<()> {
select! {
r = ws.send(msg) => Ok(r?),
_ = sleep(Duration::from_secs(1)) => Err(ConnectionError::Timeout),
}
}

async fn process_incoming_command(
cmd: Option<Command>,
callbacks: &mut Callbacks,
Expand Down Expand Up @@ -911,7 +918,7 @@ async fn send_client_message(cm: CM, websocket: &mut WebSocket) -> ConnectionRes
let json = serde_json::to_string(&cm)?;
log::debug!("Sending message: {json}");
let msg = Message::Text(json);
websocket.send(msg).await?;
send_with_timeout(websocket, msg).await?;
Ok(())
}

Expand Down Expand Up @@ -1026,7 +1033,7 @@ async fn send_keepalive(websocket: &mut WebSocket) -> ConnectionResult<()> {
log::trace!("Sending keepalive");
let json = serde_json::to_string(&ClientMessage::Keepalive)?;
let msg = Message::Text(json);
websocket.send(msg).await?;
send_with_timeout(websocket, msg).await?;
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions worterbuch-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub enum ConnectionError {
ConfigError(ConfigError),
SerdeError(serde_json::Error),
AckError(broadcast::error::SendError<u64>),
Timeout,
}

impl std::error::Error for ConnectionError {}
Expand All @@ -169,6 +170,7 @@ impl fmt::Display for ConnectionError {
Self::ConfigError(e) => fmt::Display::fmt(&e, f),
Self::SerdeError(e) => fmt::Display::fmt(&e, f),
Self::AckError(e) => fmt::Display::fmt(&e, f),
Self::Timeout => fmt::Display::fmt("timeout", f),
}
}
}
Expand Down
50 changes: 32 additions & 18 deletions worterbuch/src/server/poem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,22 +337,29 @@ impl ClientHandler {
// - send a websocket message if none has been sent for over a second
select! {
recv = self.websocket.next() => if let Some(msg) = recv {
// received websocket message, no need to check how long it has been since the last one
self.last_keepalive_rx = Instant::now();
let incoming_msg = msg.context("Error in WebSocket connection")?;
// send a message if none has been sent for over a second
self.send_keepalive().await.context("Error sending keepalive signal")?;
if let Message::Text(text) = incoming_msg {
let (msg_processed, handshake) = process_incoming_message(
self.client_id,
&text,
self.worterbuch.clone(),
tx.clone(),
&self.proto_version,
)
.await.context("Error processing incoming message")?;
self.handshake_complete |= msg_processed && handshake;
if !msg_processed {
match msg {
Ok(incoming_msg) => {
// received websocket message, no need to check how long it has been since the last one
self.last_keepalive_rx = Instant::now();
// send a message if none has been sent for over a second
self.send_keepalive().await.context("Error sending keepalive signal")?;
if let Message::Text(text) = incoming_msg {
let (msg_processed, handshake) = process_incoming_message(
self.client_id,
&text,
self.worterbuch.clone(),
tx.clone(),
&self.proto_version,
)
.await.context("Error processing incoming message")?;
self.handshake_complete |= msg_processed && handshake;
if !msg_processed {
break;
}
}
},
Err(e) => {
log::error!("Error in WebSocket connection: {e}");
break;
}
}
Expand All @@ -366,7 +373,7 @@ impl ClientHandler {
// send websocket message, no need to check when the last one was sent
self.last_keepalive_tx = Instant::now();
let msg = Message::text(text);
self.websocket.send(msg).await.context("Error sending keepalive signal")?;
self.send_with_timeout(msg).await.context("Error sending keepalive signal")?;
} else {
break;
},
Expand All @@ -388,7 +395,7 @@ impl ClientHandler {
log::trace!("Sending keepalive");
let json = serde_json::to_string(&ServerMessage::Keepalive)?;
let msg = Message::Text(json);
self.websocket.send(msg).await?;
self.send_with_timeout(msg).await?;
}

Ok(())
Expand Down Expand Up @@ -416,4 +423,11 @@ impl ClientHandler {
Ok(())
}
}

async fn send_with_timeout(&mut self, msg: Message) -> anyhow::Result<()> {
select! {
r = self.websocket.send(msg) => Ok(r?),
_ = sleep(Duration::from_secs(1)) => Err(anyhow!("Send timeout")),
}
}
}

0 comments on commit 61eb823

Please sign in to comment.