Skip to content

Commit

Permalink
refactoring, improved logging, less strict keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
babymotte committed Aug 18, 2023
1 parent fbfc65e commit b6285c4
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 79 deletions.
2 changes: 1 addition & 1 deletion worterbuch/src/server/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub async fn process_incoming_message(
ProtocolVersion { major, minor } if *major < 1 || (*major == 1 && *minor == 0) => {
v1_0::process_incoming_message(client_id, msg, worterbuch, tx).await
}
_ => todo!(),
_ => panic!("looks like the server accidentally accepted a connection to a client that speaks an unsupported protocol version"),
}
}
209 changes: 131 additions & 78 deletions worterbuch/src/server/poem.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{server::common::process_incoming_message, Config, Worterbuch};
use anyhow::{anyhow, Context};
use futures::{sink::SinkExt, stream::StreamExt};
use poem::{
get, handler,
Expand Down Expand Up @@ -154,7 +155,8 @@ async fn ws(
.expect("Client has no remote address.");
ws.protocols(vec!["worterbuch"])
.on_upgrade(move |socket| async move {
if let Err(e) = serve(socket, wb, remote, proto_version).await {
let mut client_handler = ClientHandler::new(socket, wb, remote, proto_version);
if let Err(e) = client_handler.serve().await {
log::error!("Error in WS connection: {e}");
}
})
Expand Down Expand Up @@ -268,99 +270,150 @@ pub async fn start(
poem::Server::new(TcpListener::bind(addr)).run(app).await
}

async fn serve(
mut websocket: WebSocketStream,
struct ClientHandler {
client_id: Uuid,
handshake_complete: bool,
last_keepalive_rx: Instant,
last_keepalive_tx: Instant,
max_inactive_seconds: u64,
last_log: Instant,
websocket: WebSocketStream,
worterbuch: Arc<RwLock<Worterbuch>>,
remote_addr: SocketAddr,
proto_version: ProtocolVersion,
) -> anyhow::Result<()> {
let client_id = Uuid::new_v4();
}

impl ClientHandler {
fn new(
websocket: WebSocketStream,
worterbuch: Arc<RwLock<Worterbuch>>,
remote_addr: SocketAddr,
proto_version: ProtocolVersion,
) -> Self {
Self {
client_id: Uuid::new_v4(),
handshake_complete: false,
last_keepalive_rx: Instant::now(),
last_keepalive_tx: Instant::now(),
max_inactive_seconds: 5,
last_log: Instant::now(),
proto_version,
remote_addr,
websocket,
worterbuch,
}
}

log::info!("New client connected: {client_id} ({remote_addr})");
async fn serve(&mut self) -> anyhow::Result<()> {
let client_id = self.client_id.clone();
let remote_addr = self.remote_addr;

let (tx, mut rx) = mpsc::unbounded_channel::<String>();
log::info!("New client connected: {client_id} ({remote_addr})");

{
let mut wb = worterbuch.write().await;
wb.connected(client_id, remote_addr);
{
let mut wb = self.worterbuch.write().await;
wb.connected(self.client_id, remote_addr);
}

log::debug!("Receiving messages from client {client_id} ({remote_addr}) …",);

if let Err(e) = self.serve_loop().await {
log::error!("Error in serve loop: {e}");
}

let mut wb = self.worterbuch.write().await;
wb.disconnected(client_id, remote_addr);

Ok(())
}

log::debug!("Receiving messages from client {client_id} ({remote_addr}) …");

let mut handshake_complete = false;
let mut last_keepalive_rx = Instant::now();
let mut last_keepalive_tx = Instant::now();

loop {
select! {
recv = websocket.next() => if let Some(Ok(incoming_msg)) = recv {
last_keepalive_rx = Instant::now();
if last_keepalive_tx.elapsed().as_secs() >= 1 {
last_keepalive_tx = Instant::now();
if let Err(e) = send_keepalive(&mut websocket).await {
log::error!("Error sending keepalive signal: {e}");
break;
async fn serve_loop(&mut self) -> anyhow::Result<()> {
let client_id = self.client_id.clone();
let remote_addr = self.remote_addr;
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
loop {
// every branch of this select must
// - receive a websocket message or raise an error if none has been received for too long and
// - send a websocket message if none has been sent for over a second
select! {
recv = self.websocket.next() => if let Some(msg) = recv {
// received websocket message, no need to check how long it has been since the last one
self.last_keepalive_rx = Instant::now();
let incoming_msg = msg.context("Error in WebSocket connection")?;
// send a message if none has been sent for over a second
self.send_keepalive().await.context("Error sending keepalive signal")?;
if let Message::Text(text) = incoming_msg {
let (msg_processed, handshake) = process_incoming_message(
self.client_id,
&text,
self.worterbuch.clone(),
tx.clone(),
&self.proto_version,
)
.await.context("Error processing incoming message")?;
self.handshake_complete |= msg_processed && handshake;
if !msg_processed {
break;
}
}
}
if let Message::Text(text) = incoming_msg {
let (msg_processed, handshake) = process_incoming_message(
client_id,
&text,
worterbuch.clone(),
tx.clone(),
&proto_version,
)
.await?;
handshake_complete |= msg_processed && handshake;
if !msg_processed
{
break;
}
}
} else {
log::warn!("WS stream of client {client_id} ({remote_addr}) closed.");
break;
},
recv = rx.recv() => if let Some(text) = recv {
if handshake_complete && last_keepalive_rx.elapsed().as_secs() >= 2 {
log::warn!("Client has been inactive for too long. Disconnecting.");
} else {
log::warn!("WS stream of client {client_id} ({remote_addr}) closed.");
break;
}
last_keepalive_tx = Instant::now();
let msg = Message::text(text);
if let Err(e) = websocket.send(msg).await {
log::error!("Error sending message to client {client_id} ({remote_addr}): {e}");
},
recv = rx.recv() => if let Some(text) = recv {
// check how long ago the last websocket message was received
self.check_client_keepalive()?;
// send websocket message, no need to check when the last one was sent
self.last_keepalive_tx = Instant::now();
let msg = Message::text(text);
self.websocket.send(msg).await.context("Error sending keepalive signal")?;
} else {
break;
}
} else {
break;
},
_ = sleep(Duration::from_secs(1)) => {
if handshake_complete && last_keepalive_rx.elapsed().as_secs() >= 2 {
log::warn!("Client has been inactive for too long. Disconnecting.");
break;
}
if last_keepalive_tx.elapsed().as_secs() >= 1 {
last_keepalive_tx = Instant::now();
if let Err(e) = send_keepalive(&mut websocket).await {
log::error!("Error sending keepalive signal: {e}");
break;
}
},
_ = sleep(Duration::from_secs(1)) => {
// check how long ago the last websocket message was received
self.check_client_keepalive()?;
// send out websocket message if the last has been more than a second ago
self.send_keepalive().await.context("Error sending keepalive signal")?;
}
}
}

Ok(())
}

let mut wb = worterbuch.write().await;
wb.disconnected(client_id, remote_addr);
async fn send_keepalive(&mut self) -> anyhow::Result<()> {
if self.last_keepalive_tx.elapsed().as_secs() >= 1 {
self.last_keepalive_tx = Instant::now();
log::trace!("Sending keepalive");
let json = serde_json::to_string(&ServerMessage::Keepalive)?;
let msg = Message::Text(json);
self.websocket.send(msg).await?;
}

Ok(())
}
Ok(())
}

fn check_client_keepalive(&mut self) -> anyhow::Result<()> {
let last_log_elapsed = self.last_log.elapsed().as_secs();
let last_keepalive_elapsed = self.last_keepalive_rx.elapsed().as_secs();

async fn send_keepalive(websocket: &mut WebSocketStream) -> anyhow::Result<()> {
log::trace!("Sending keepalive");
let json = serde_json::to_string(&ServerMessage::Keepalive)?;
let msg = Message::Text(json);
websocket.send(msg).await?;
Ok(())
if self.handshake_complete && last_keepalive_elapsed >= 2 && last_log_elapsed >= 1 {
self.last_log = Instant::now();
log::warn!(
"Client {} has been inactive for {} seconds …",
self.client_id,
last_keepalive_elapsed
);
}
if self.handshake_complete && last_keepalive_elapsed >= self.max_inactive_seconds {
log::warn!(
"Client {} has been inactive for too long. Disconnecting.",
self.client_id
);
Err(anyhow!("Client has been inactive for too long"))
} else {
Ok(())
}
}
}

0 comments on commit b6285c4

Please sign in to comment.