Skip to content

Commit

Permalink
fixed unhandled store errors
Browse files Browse the repository at this point in the history
  • Loading branch information
babymotte committed Aug 24, 2023
1 parent ec25306 commit cba80e1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
10 changes: 8 additions & 2 deletions worterbuch/src/server/common/v1_0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,10 @@ async fn unsubscribe(
) -> WorterbuchResult<()> {
let mut wb = worterbuch.write().await;

wb.unsubscribe(client_id, msg.transaction_id)?;
if let Err(e) = wb.unsubscribe(client_id, msg.transaction_id) {
handle_store_error(e, client, msg.transaction_id).await?;
return Ok(());
};
let response = Ack {
transaction_id: msg.transaction_id,
};
Expand Down Expand Up @@ -672,7 +675,10 @@ async fn unsubscribe_ls(
) -> WorterbuchResult<()> {
let mut wb = worterbuch.write().await;

wb.unsubscribe_ls(client_id, msg.transaction_id)?;
if let Err(e) = wb.unsubscribe_ls(client_id, msg.transaction_id) {
handle_store_error(e, client, msg.transaction_id).await?;
return Ok(());
}
let response = Ack {
transaction_id: msg.transaction_id,
};
Expand Down
8 changes: 4 additions & 4 deletions worterbuch/src/server/poem.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{server::common::process_incoming_message, Config, Worterbuch};
use anyhow::{anyhow, Context};
use anyhow::anyhow;
use futures::{sink::SinkExt, stream::StreamExt};
use poem::{
get, handler,
Expand Down Expand Up @@ -358,7 +358,7 @@ impl ClientHandler {
tx.clone(),
&self.proto_version,
)
.await.context("Error processing incoming message")?;
.await?;
self.handshake_complete |= msg_processed && handshake;
if !msg_processed {
break;
Expand All @@ -376,15 +376,15 @@ impl ClientHandler {
},
recv = rx.recv() => if let Some(text) = recv {
let msg = Message::text(text);
self.send_with_timeout(msg).await.context("Error sending message to client")?;
self.send_with_timeout(msg).await?;
} else {
break;
},
_ = keepalive_rx.recv() => {
// check how long ago the last websocket message was received
self.check_client_keepalive()?;
// send out websocket message if the last has been more than a second ago
self.send_keepalive().await.context("Error sending keepalive signal")?;
self.send_keepalive().await?;
}
}
}
Expand Down

0 comments on commit cba80e1

Please sign in to comment.