Skip to content

Commit

Permalink
publishing keepalive lag to $SYS/clients
Browse files Browse the repository at this point in the history
  • Loading branch information
babymotte committed Jul 15, 2024
1 parent 92e1a2c commit d08f106
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 5 deletions.
1 change: 1 addition & 0 deletions worterbuch-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub const SYSTEM_TOPIC_SOURCES: &str = "source-code";
pub const SYSTEM_TOPIC_SUBSCRIPTIONS: &str = "subscriptions";
pub const SYSTEM_TOPIC_CLIENTS_PROTOCOL: &str = "protocol";
pub const SYSTEM_TOPIC_CLIENTS_ADDRESS: &str = "address";
pub const SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG: &str = "keepaliveLag";
pub const SYSTEM_TOPIC_LAST_WILL: &str = "lastWill";
pub const SYSTEM_TOPIC_GRAVE_GOODS: &str = "graveGoods";
pub const SYSTEM_TOPIC_CLIENT_NAME: &str = "clientName";
Expand Down
6 changes: 6 additions & 0 deletions worterbuch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,11 @@ async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) {
WbFunction::SupportedProtocolVersion(tx) => {
tx.send(worterbuch.supported_protocol_version()).ok();
}
WbFunction::KeepaliveLag(client_id, lag) => {
worterbuch.keepalive_lag(client_id, lag).await;
}
WbFunction::ClearKeepaliveLag(client_id) => {
worterbuch.clear_keepalive_lag(client_id).await;
}
}
}
18 changes: 18 additions & 0 deletions worterbuch/src/server/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ pub enum WbFunction {
Export(oneshot::Sender<WorterbuchResult<Value>>),
Len(oneshot::Sender<usize>),
SupportedProtocolVersion(oneshot::Sender<ProtocolVersion>),
KeepaliveLag(Uuid, u32),
ClearKeepaliveLag(Uuid),
}

#[derive(Clone)]
Expand Down Expand Up @@ -560,6 +562,18 @@ impl CloneableWbApi {
.await?;
Ok(rx.await?)
}

pub fn keepalive_lag(&self, client_id: Uuid, lag: u32) {
self.tx
.try_send(WbFunction::KeepaliveLag(client_id, lag))
.ok();
}

pub fn clear_keepalive_lag(&self, client_id: Uuid) {
self.tx
.try_send(WbFunction::ClearKeepaliveLag(client_id))
.ok();
}
}

async fn authorize(
Expand Down Expand Up @@ -1283,6 +1297,7 @@ pub fn check_client_keepalive(
last_keepalive_tx: Instant,
client_id: Uuid,
keepalive_timeout: Duration,
wb: &CloneableWbApi,
) -> anyhow::Result<()> {
let lag = last_keepalive_tx - last_keepalive_rx;

Expand All @@ -1292,6 +1307,9 @@ pub fn check_client_keepalive(
client_id,
lag.as_secs()
);
wb.keepalive_lag(client_id, lag.as_secs() as u32);
} else {
wb.clear_keepalive_lag(client_id);
}

if lag >= keepalive_timeout {
Expand Down
2 changes: 1 addition & 1 deletion worterbuch/src/server/poem/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn serve_loop(
},
_ = keepalive_timer.tick() => {
// check how long ago the last message was received
check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout)?;
check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout, &worterbuch)?;
// send out message if the last has been more than a second ago
send_keepalive(last_keepalive_tx, &ws_send_tx, ).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion worterbuch/src/server/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async fn serve_loop(
},
_ = keepalive_timer.tick() => {
// check how long ago the last message was received
check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout)?;
check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout, &worterbuch)?;
// send out message if the last has been more than a second ago
send_keepalive(last_keepalive_tx, &tcp_send_tx, ).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion worterbuch/src/server/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async fn serve_loop(
},
_ = keepalive_timer.tick() => {
// check how long ago the last message was received
check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout)?;
check_client_keepalive(last_keepalive_rx, last_keepalive_tx, client_id, keepalive_timeout, &worterbuch)?;
// send out message if the last has been more than a second ago
send_keepalive(last_keepalive_tx, &tcp_send_tx, ).await?;
}
Expand Down
37 changes: 35 additions & 2 deletions worterbuch/src/worterbuch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use worterbuch_common::{
parse_segments, topic, GraveGoods, Key, KeySegment, KeyValuePairs, LastWill, PState,
PStateEvent, Path, Protocol, ProtocolVersion, RegularKeySegment, RequestPattern, ServerMessage,
TransactionId, SYSTEM_TOPIC_CLIENTS, SYSTEM_TOPIC_CLIENTS_ADDRESS,
SYSTEM_TOPIC_CLIENTS_PROTOCOL, SYSTEM_TOPIC_CLIENT_NAME, SYSTEM_TOPIC_GRAVE_GOODS,
SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX,
SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG, SYSTEM_TOPIC_CLIENTS_PROTOCOL, SYSTEM_TOPIC_CLIENT_NAME,
SYSTEM_TOPIC_GRAVE_GOODS, SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX,
SYSTEM_TOPIC_SUBSCRIPTIONS,
};

Expand Down Expand Up @@ -1039,6 +1039,39 @@ impl Worterbuch {

Ok(())
}

pub async fn keepalive_lag(&mut self, client_id: Uuid, lag: u32) {
if self.config.extended_monitoring {
self.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_CLIENTS,
client_id,
SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG
),
json!(lag),
INTERNAL_CLIENT_ID,
)
.await
.ok();
}
}

pub async fn clear_keepalive_lag(&mut self, client_id: Uuid) {
if self.config.extended_monitoring {
self.delete(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_CLIENTS,
client_id,
SYSTEM_TOPIC_CLIENTS_KEEPALIVE_LAG
),
INTERNAL_CLIENT_ID,
)
.await
.ok();
}
}
}

fn check_for_read_only_key(key: &str, client_id: &str) -> WorterbuchResult<()> {
Expand Down

0 comments on commit d08f106

Please sign in to comment.