From d08f1067f83241f651c18a0510ad609bc91189ee Mon Sep 17 00:00:00 2001 From: Michael Bachmann Date: Mon, 15 Jul 2024 10:05:47 +0200 Subject: [PATCH] publishing keepalive lag to $SYS/clients --- worterbuch-common/src/lib.rs | 1 + worterbuch/src/lib.rs | 6 ++++ worterbuch/src/server/common.rs | 18 ++++++++++++ worterbuch/src/server/poem/websocket.rs | 2 +- worterbuch/src/server/tcp.rs | 2 +- worterbuch/src/server/unix.rs | 2 +- worterbuch/src/worterbuch.rs | 37 +++++++++++++++++++++++-- 7 files changed, 63 insertions(+), 5 deletions(-) diff --git a/worterbuch-common/src/lib.rs b/worterbuch-common/src/lib.rs index 6f77fc8..acb68d5 100644 --- a/worterbuch-common/src/lib.rs +++ b/worterbuch-common/src/lib.rs @@ -42,6 +42,7 @@ pub const SYSTEM_TOPIC_SOURCES: &str = "source-code"; pub const SYSTEM_TOPIC_SUBSCRIPTIONS: &str = "subscriptions"; pub const SYSTEM_TOPIC_CLIENTS_PROTOCOL: &str = "protocol"; pub const SYSTEM_TOPIC_CLIENTS_ADDRESS: &str = "address"; +pub const SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG: &str = "keepaliveLag"; pub const SYSTEM_TOPIC_LAST_WILL: &str = "lastWill"; pub const SYSTEM_TOPIC_GRAVE_GOODS: &str = "graveGoods"; pub const SYSTEM_TOPIC_CLIENT_NAME: &str = "clientName"; diff --git a/worterbuch/src/lib.rs b/worterbuch/src/lib.rs index 986e66d..116c971 100644 --- a/worterbuch/src/lib.rs +++ b/worterbuch/src/lib.rs @@ -253,5 +253,11 @@ async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) { WbFunction::SupportedProtocolVersion(tx) => { tx.send(worterbuch.supported_protocol_version()).ok(); } + WbFunction::KeepaliveLag(client_id, lag) => { + worterbuch.keepalive_lag(client_id, lag).await; + } + WbFunction::ClearKeepaliveLag(client_id) => { + worterbuch.clear_keepalive_lag(client_id).await; + } } } diff --git a/worterbuch/src/server/common.rs b/worterbuch/src/server/common.rs index 1b46594..1f85080 100644 --- a/worterbuch/src/server/common.rs +++ b/worterbuch/src/server/common.rs @@ -348,6 +348,8 @@ pub enum WbFunction { Export(oneshot::Sender>), Len(oneshot::Sender), SupportedProtocolVersion(oneshot::Sender), + KeepaliveLag(Uuid, u32), + ClearKeepaliveLag(Uuid), } #[derive(Clone)] @@ -560,6 +562,18 @@ impl CloneableWbApi { .await?; Ok(rx.await?) } + + pub fn keepalive_lag(&self, client_id: Uuid, lag: u32) { + self.tx + .try_send(WbFunction::KeepaliveLag(client_id, lag)) + .ok(); + } + + pub fn clear_keepalive_lag(&self, client_id: Uuid) { + self.tx + .try_send(WbFunction::ClearKeepaliveLag(client_id)) + .ok(); + } } async fn authorize( @@ -1283,6 +1297,7 @@ pub fn check_client_keepalive( last_keepalive_tx: Instant, client_id: Uuid, keepalive_timeout: Duration, + wb: &CloneableWbApi, ) -> anyhow::Result<()> { let lag = last_keepalive_tx - last_keepalive_rx; @@ -1292,6 +1307,9 @@ pub fn check_client_keepalive( client_id, lag.as_secs() ); + wb.keepalive_lag(client_id, lag.as_secs() as u32); + } else { + wb.clear_keepalive_lag(client_id); } if lag >= keepalive_timeout { diff --git a/worterbuch/src/server/poem/websocket.rs b/worterbuch/src/server/poem/websocket.rs index c88e8dd..6729708 100644 --- a/worterbuch/src/server/poem/websocket.rs +++ b/worterbuch/src/server/poem/websocket.rs @@ -166,7 +166,7 @@ async fn serve_loop( }, _ = keepalive_timer.tick() => { // check how long ago the last message was received - check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout)?; + check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout, &worterbuch)?; // send out message if the last has been more than a second ago send_keepalive(last_keepalive_tx, &ws_send_tx, ).await?; } diff --git a/worterbuch/src/server/tcp.rs b/worterbuch/src/server/tcp.rs index dc71be6..85747cb 100644 --- a/worterbuch/src/server/tcp.rs +++ b/worterbuch/src/server/tcp.rs @@ -220,7 +220,7 @@ async fn serve_loop( }, _ = keepalive_timer.tick() => { // check how long ago the last message was received - check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout)?; + check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout, &worterbuch)?; // send out message if the last has been more than a second ago send_keepalive(last_keepalive_tx, &tcp_send_tx, ).await?; } diff --git a/worterbuch/src/server/unix.rs b/worterbuch/src/server/unix.rs index 80ea416..c31eb0c 100644 --- a/worterbuch/src/server/unix.rs +++ b/worterbuch/src/server/unix.rs @@ -221,7 +221,7 @@ async fn serve_loop( }, _ = keepalive_timer.tick() => { // check how long ago the last message was received - check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout)?; + check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout, &worterbuch)?; // send out message if the last has been more than a second ago send_keepalive(last_keepalive_tx, &tcp_send_tx, ).await?; } diff --git a/worterbuch/src/worterbuch.rs b/worterbuch/src/worterbuch.rs index 7cb85db..404f5e6 100644 --- a/worterbuch/src/worterbuch.rs +++ b/worterbuch/src/worterbuch.rs @@ -40,8 +40,8 @@ use worterbuch_common::{ parse_segments, topic, GraveGoods, Key, KeySegment, KeyValuePairs, LastWill, PState, PStateEvent, Path, Protocol, ProtocolVersion, RegularKeySegment, RequestPattern, ServerMessage, TransactionId, SYSTEM_TOPIC_CLIENTS, SYSTEM_TOPIC_CLIENTS_ADDRESS, - SYSTEM_TOPIC_CLIENTS_PROTOCOL, SYSTEM_TOPIC_CLIENT_NAME, SYSTEM_TOPIC_GRAVE_GOODS, - SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX, + SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG, SYSTEM_TOPIC_CLIENTS_PROTOCOL, SYSTEM_TOPIC_CLIENT_NAME, + SYSTEM_TOPIC_GRAVE_GOODS, SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX, SYSTEM_TOPIC_SUBSCRIPTIONS, }; @@ -1039,6 +1039,39 @@ impl Worterbuch { Ok(()) } + + pub async fn keepalive_lag(&mut self, client_id: Uuid, lag: u32) { + if self.config.extended_monitoring { + self.set( + topic!( + SYSTEM_TOPIC_ROOT, + SYSTEM_TOPIC_CLIENTS, + client_id, + SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG + ), + json!(lag), + INTERNAL_CLIENT_ID, + ) + .await + .ok(); + } + } + + pub async fn clear_keepalive_lag(&mut self, client_id: Uuid) { + if self.config.extended_monitoring { + self.delete( + topic!( + SYSTEM_TOPIC_ROOT, + SYSTEM_TOPIC_CLIENTS, + client_id, + SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG + ), + INTERNAL_CLIENT_ID, + ) + .await + .ok(); + } + } } fn check_for_read_only_key(key: &str, client_id: &str) -> WorterbuchResult<()> {