From a31f36079f6b494ae0524dd530bcfe243854a31e Mon Sep 17 00:00:00 2001 From: Michael Bachmann Date: Thu, 14 Sep 2023 12:43:50 +0200 Subject: [PATCH] accessing worterubch over channels instead of RwLock --- worterbuch-common/src/client.rs | 21 +- worterbuch-common/src/error.rs | 18 +- worterbuch/src/lib.rs | 150 ++++++++--- worterbuch/src/main.rs | 26 +- worterbuch/src/persistence.rs | 15 +- worterbuch/src/server/common.rs | 275 +++++++++++++++++++- worterbuch/src/server/common/v1_0.rs | 369 ++++++++++----------------- worterbuch/src/server/poem.rs | 80 +++--- worterbuch/src/stats.rs | 33 ++- worterbuch/src/worterbuch.rs | 4 +- 10 files changed, 599 insertions(+), 392 deletions(-) diff --git a/worterbuch-common/src/client.rs b/worterbuch-common/src/client.rs index 34fe17b..469ebbe 100644 --- a/worterbuch-common/src/client.rs +++ b/worterbuch-common/src/client.rs @@ -1,6 +1,5 @@ use crate::{ - GraveGoods, Key, LastWill, Path, ProtocolVersions, RequestPattern, TransactionId, UniqueFlag, - Value, + GraveGoods, Key, LastWill, ProtocolVersions, RequestPattern, TransactionId, UniqueFlag, Value, }; use serde::{Deserialize, Serialize}; @@ -14,8 +13,6 @@ pub enum ClientMessage { Publish(Publish), Subscribe(Subscribe), PSubscribe(PSubscribe), - Export(Export), - Import(Import), Unsubscribe(Unsubscribe), Delete(Delete), PDelete(PDelete), @@ -36,8 +33,6 @@ impl ClientMessage { ClientMessage::Publish(m) => Some(m.transaction_id), ClientMessage::Subscribe(m) => Some(m.transaction_id), ClientMessage::PSubscribe(m) => Some(m.transaction_id), - ClientMessage::Export(m) => Some(m.transaction_id), - ClientMessage::Import(m) => Some(m.transaction_id), ClientMessage::Unsubscribe(m) => Some(m.transaction_id), ClientMessage::Delete(m) => Some(m.transaction_id), ClientMessage::PDelete(m) => Some(m.transaction_id), @@ -102,20 +97,6 @@ pub struct PSubscribe { pub unique: UniqueFlag, } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Export { - pub transaction_id: TransactionId, - pub path: Path, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Import { - pub transaction_id: TransactionId, - pub path: Path, -} - #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Unsubscribe { diff --git a/worterbuch-common/src/error.rs b/worterbuch-common/src/error.rs index 6db292e..0e8fdf1 100644 --- a/worterbuch-common/src/error.rs +++ b/worterbuch-common/src/error.rs @@ -1,6 +1,10 @@ use crate::{server::Err, ErrorCode, Key, MetaData, RequestPattern}; use std::{fmt, io, net::AddrParseError, num::ParseIntError}; -use tokio::sync::{broadcast, mpsc::error::SendError, oneshot}; +use tokio::sync::{ + broadcast, + mpsc::{self, error::SendError}, + oneshot, +}; #[derive(Debug)] pub enum ConfigError { @@ -138,6 +142,18 @@ impl Context> } } +impl From> for WorterbuchError { + fn from(value: mpsc::error::SendError) -> Self { + WorterbuchError::Other(Box::new(value), "Internal server error".to_owned()) + } +} + +impl From for WorterbuchError { + fn from(value: oneshot::error::RecvError) -> Self { + WorterbuchError::Other(Box::new(value), "Internal server error".to_owned()) + } +} + pub type WorterbuchResult = std::result::Result; #[derive(Debug)] diff --git a/worterbuch/src/lib.rs b/worterbuch/src/lib.rs index 632ba04..38faaa7 100644 --- a/worterbuch/src/lib.rs +++ b/worterbuch/src/lib.rs @@ -8,25 +8,57 @@ mod worterbuch; pub use crate::worterbuch::*; pub use config::*; +use server::common::{CloneableWbApi, WbFunction}; use stats::{SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS}; use worterbuch_common::topic; use crate::stats::track_stats; use anyhow::Result; -use std::sync::Arc; #[cfg(not(target_os = "windows"))] use tokio::signal::unix::{signal, SignalKind}; -use tokio::{ - spawn, - sync::{oneshot, RwLock}, -}; +use tokio::{select, spawn, sync::mpsc}; pub async fn run_worterbuch(config: Config) -> Result<()> { + let config_pers = config.clone(); + let use_persistence = config.use_persistence; - let worterbuch = start_worterbuch(config.clone()).await?; + let mut worterbuch = if use_persistence { + persistence::load(config.clone()).await? + } else { + Worterbuch::with_config(config.clone()) + }; - let (_terminate_tx, terminate_rx) = oneshot::channel::<()>(); + worterbuch.set( + topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS), + serde_json::to_value(worterbuch.supported_protocol_versions()).expect("cannot fail"), + )?; + + let (api_tx, mut api_rx) = mpsc::channel(1); + let api = CloneableWbApi::new(api_tx); + + let worterbuch_pers = api.clone(); + let worterbuch_uptime = api.clone(); + + if use_persistence { + spawn(persistence::periodic(worterbuch_pers, config_pers)); + } + + spawn(track_stats(worterbuch_uptime)); + + spawn(server::poem::start(api.clone(), config.clone())); + + let (_terminate_tx, mut terminate_rx) = mpsc::channel(1); + + loop { + select! { + recv = api_rx.recv() => match recv { + Some(function) => process_api_call(&mut worterbuch, function).await, + None => break, + }, + _ = terminate_rx.recv() => break, + } + } #[cfg(not(target_os = "windows"))] spawn(async move { @@ -34,7 +66,7 @@ pub async fn run_worterbuch(config: Config) -> Result<()> { Ok(mut signal) => { signal.recv().await; log::info!("SIGTERM received."); - if let Err(_) = _terminate_tx.send(()) { + if let Err(_) = _terminate_tx.send(()).await { log::error!("Error sending terminate signal"); } } @@ -42,44 +74,84 @@ pub async fn run_worterbuch(config: Config) -> Result<()> { } }); - terminate_rx.await?; - log::info!("Shutting down."); if use_persistence { - persistence::once(worterbuch.clone(), config).await?; + persistence::once(&api, config).await?; } Ok(()) } -pub async fn start_worterbuch(config: Config) -> Result>> { - let config_pers = config.clone(); - - let use_persistence = config.use_persistence; - - let mut worterbuch = if use_persistence { - persistence::load(config.clone()).await? - } else { - Worterbuch::with_config(config.clone()) - }; - - worterbuch.set( - topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS), - serde_json::to_value(worterbuch.supported_protocol_versions()).expect("cannot fail"), - )?; - - let worterbuch = Arc::new(RwLock::new(worterbuch)); - let worterbuch_pers = worterbuch.clone(); - let worterbuch_uptime = worterbuch.clone(); - - if use_persistence { - spawn(persistence::periodic(worterbuch_pers, config_pers)); +async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) { + match function { + WbFunction::Handshake(client_protocol_versions, last_will, grave_goods, client_id, tx) => { + tx.send(worterbuch.handshake( + &client_protocol_versions, + last_will, + grave_goods, + client_id, + )) + .ok(); + } + WbFunction::Get(key, tx) => { + tx.send(worterbuch.get(&key)).ok(); + } + WbFunction::Set(key, value, tx) => { + tx.send(worterbuch.set(key, value)).ok(); + } + WbFunction::Publish(key, value, tx) => { + tx.send(worterbuch.publish(key, value)).ok(); + } + WbFunction::Ls(parent, tx) => { + tx.send(worterbuch.ls(&parent)).ok(); + } + WbFunction::PGet(pattern, tx) => { + tx.send(worterbuch.pget(&pattern)).ok(); + } + WbFunction::Subscribe(client_id, transaction_id, key, unique, tx) => { + tx.send(worterbuch.subscribe(client_id, transaction_id, key, unique)) + .ok(); + } + WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, tx) => { + tx.send(worterbuch.psubscribe(client_id, transaction_id, pattern, unique)) + .ok(); + } + WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => { + tx.send(worterbuch.subscribe_ls(client_id, transaction_id, parent)) + .ok(); + } + WbFunction::Unsubscribe(client_id, transaction_id, tx) => { + tx.send(worterbuch.unsubscribe(client_id, transaction_id)) + .ok(); + } + WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => { + tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id)) + .ok(); + } + WbFunction::Delete(key, tx) => { + tx.send(worterbuch.delete(key)).ok(); + } + WbFunction::PDelete(pattern, tx) => { + tx.send(worterbuch.pdelete(pattern)).ok(); + } + WbFunction::Connected(client_id, remote_addr) => { + worterbuch.connected(client_id, remote_addr); + } + WbFunction::Disconnected(client_id, remote_addr) => { + worterbuch.disconnected(client_id, remote_addr); + } + WbFunction::Config(tx) => { + tx.send(worterbuch.config().clone()).ok(); + } + WbFunction::Export(tx) => { + tx.send(worterbuch.export()).ok(); + } + WbFunction::Len(tx) => { + tx.send(worterbuch.len()).ok(); + } + WbFunction::SupportedProtocolVersions(tx) => { + tx.send(worterbuch.supported_protocol_versions()).ok(); + } } - - spawn(track_stats(worterbuch_uptime)); - - spawn(server::poem::start(worterbuch.clone(), config.clone())); - - Ok(worterbuch) } diff --git a/worterbuch/src/main.rs b/worterbuch/src/main.rs index 8cf67dd..0dab7f5 100644 --- a/worterbuch/src/main.rs +++ b/worterbuch/src/main.rs @@ -1,36 +1,20 @@ use anyhow::Result; use clap::Parser; -use tokio::runtime::{self, Runtime}; use worterbuch::run_worterbuch; use worterbuch::Config; #[derive(Parser)] #[command(author, version, about = "An in-memory data base / message broker hybrid", long_about = None)] -struct Args { - /// Start Wörterbuch in single threaded mode. Default is multi threaded. - #[arg(short, long)] - single_threaded: bool, -} +struct Args {} -fn main() -> Result<()> { +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { dotenv::dotenv().ok(); env_logger::init(); let config = Config::new()?; - let args: Args = Args::parse(); - - let single_threaded = args.single_threaded || config.single_threaded; - - let rt = if single_threaded { - log::info!("Starting Wörterbuch in single-threaded mode …"); - runtime::Builder::new_current_thread() - .enable_all() - .build()? - } else { - log::info!("Starting Wörterbuch in multi-threaded mode …"); - Runtime::new()? - }; + let _args: Args = Args::parse(); - rt.block_on(run_worterbuch(config))?; + run_worterbuch(config).await?; Ok(()) } diff --git a/worterbuch/src/persistence.rs b/worterbuch/src/persistence.rs index 79d8525..768927a 100644 --- a/worterbuch/src/persistence.rs +++ b/worterbuch/src/persistence.rs @@ -1,29 +1,26 @@ -use crate::{config::Config, worterbuch::Worterbuch}; +use crate::{config::Config, server::common::CloneableWbApi, worterbuch::Worterbuch}; use anyhow::Result; use sha2::{Digest, Sha256}; -use std::{path::PathBuf, sync::Arc}; +use std::path::PathBuf; use tokio::{ fs::{self, File}, io::AsyncWriteExt, - sync::RwLock, time::sleep, }; -pub(crate) async fn periodic(worterbuch: Arc>, config: Config) -> Result<()> { +pub(crate) async fn periodic(worterbuch: CloneableWbApi, config: Config) -> Result<()> { let interval = config.persistence_interval; loop { sleep(interval).await; - once(worterbuch.clone(), config.clone()).await?; + once(&worterbuch, config.clone()).await?; } } -pub(crate) async fn once(worterbuch: Arc>, config: Config) -> Result<()> { - let wb = worterbuch.read().await; - +pub(crate) async fn once(worterbuch: &CloneableWbApi, config: Config) -> Result<()> { let (json_temp_path, json_path, sha_temp_path, sha_path) = file_paths(&config); - let json = wb.export()?.to_string(); + let json = worterbuch.export().await?.to_string(); let mut hasher = Sha256::new(); hasher.update(&json); diff --git a/worterbuch/src/server/common.rs b/worterbuch/src/server/common.rs index dd205a7..eb2b657 100644 --- a/worterbuch/src/server/common.rs +++ b/worterbuch/src/server/common.rs @@ -1,15 +1,21 @@ mod v1_0; -use crate::worterbuch::Worterbuch; -use std::sync::Arc; -use tokio::sync::{mpsc::UnboundedSender, RwLock}; +use crate::{subscribers::SubscriptionId, Config}; +use std::net::SocketAddr; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot, +}; use uuid::Uuid; -use worterbuch_common::{error::WorterbuchResult, ProtocolVersion}; +use worterbuch_common::{ + error::WorterbuchResult, Handshake, Key, KeyValuePairs, PStateEvent, ProtocolVersion, + ProtocolVersions, RegularKeySegment, RequestPattern, TransactionId, Value, +}; pub async fn process_incoming_message( client_id: Uuid, msg: &str, - worterbuch: Arc>, + worterbuch: &CloneableWbApi, tx: UnboundedSender, protocol_version: &ProtocolVersion, ) -> WorterbuchResult<(bool, bool)> { @@ -20,3 +26,262 @@ pub async fn process_incoming_message( _ => panic!("looks like the server accidentally accepted a connection to a client that speaks an unsupported protocol version"), } } + +pub enum WbFunction { + Handshake( + ProtocolVersions, + KeyValuePairs, + Vec, + Uuid, + oneshot::Sender>, + ), + Get(Key, oneshot::Sender>), + Set(Key, Value, oneshot::Sender>), + Publish(Key, Value, oneshot::Sender>), + Ls( + Option, + oneshot::Sender>>, + ), + PGet( + RequestPattern, + oneshot::Sender>, + ), + Subscribe( + Uuid, + TransactionId, + Key, + bool, + oneshot::Sender, SubscriptionId)>>, + ), + PSubscribe( + Uuid, + TransactionId, + RequestPattern, + bool, + oneshot::Sender, SubscriptionId)>>, + ), + SubscribeLs( + Uuid, + TransactionId, + Option, + oneshot::Sender< + WorterbuchResult<(UnboundedReceiver>, SubscriptionId)>, + >, + ), + Unsubscribe(Uuid, TransactionId, oneshot::Sender>), + UnsubscribeLs(Uuid, TransactionId, oneshot::Sender>), + Delete(Key, oneshot::Sender>), + PDelete( + RequestPattern, + oneshot::Sender>, + ), + Connected(Uuid, SocketAddr), + Disconnected(Uuid, SocketAddr), + Config(oneshot::Sender), + Export(oneshot::Sender>), + Len(oneshot::Sender), + SupportedProtocolVersions(oneshot::Sender), +} + +#[derive(Clone)] +pub struct CloneableWbApi { + tx: mpsc::Sender, +} + +impl CloneableWbApi { + pub fn new(tx: mpsc::Sender) -> Self { + CloneableWbApi { tx } + } + + pub async fn handshake( + &self, + supported_protocol_versions: ProtocolVersions, + last_will: KeyValuePairs, + grave_goods: Vec, + client_id: Uuid, + ) -> WorterbuchResult { + let (tx, rx) = oneshot::channel(); + self.tx + .send(WbFunction::Handshake( + supported_protocol_versions, + last_will, + grave_goods, + client_id, + tx, + )) + .await?; + rx.await? + } + + pub async fn get(&self, key: Key) -> WorterbuchResult<(String, Value)> { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Get(key, tx)).await?; + rx.await? + } + + pub async fn pget<'a>(&self, pattern: RequestPattern) -> WorterbuchResult { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::PGet(pattern, tx)).await?; + rx.await? + } + + pub async fn set(&self, key: Key, value: Value) -> WorterbuchResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Set(key, value, tx)).await?; + rx.await? + } + + pub async fn publish(&self, key: Key, value: Value) -> WorterbuchResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Publish(key, value, tx)).await?; + rx.await? + } + + pub async fn ls(&self, parent: Option) -> WorterbuchResult> { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Ls(parent, tx)).await?; + rx.await? + } + + pub async fn subscribe( + &self, + client_id: Uuid, + transaction_id: TransactionId, + key: Key, + unique: bool, + ) -> WorterbuchResult<(UnboundedReceiver, SubscriptionId)> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(WbFunction::Subscribe( + client_id, + transaction_id, + key, + unique, + tx, + )) + .await?; + rx.await? + } + + pub async fn psubscribe( + &self, + client_id: Uuid, + transaction_id: TransactionId, + pattern: RequestPattern, + unique: bool, + ) -> WorterbuchResult<(UnboundedReceiver, SubscriptionId)> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(WbFunction::PSubscribe( + client_id, + transaction_id, + pattern, + unique, + tx, + )) + .await?; + rx.await? + } + + pub async fn subscribe_ls( + &self, + client_id: Uuid, + transaction_id: TransactionId, + parent: Option, + ) -> WorterbuchResult<(UnboundedReceiver>, SubscriptionId)> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(WbFunction::SubscribeLs( + client_id, + transaction_id, + parent, + tx, + )) + .await?; + rx.await? + } + + pub async fn unsubscribe( + &self, + client_id: Uuid, + transaction_id: TransactionId, + ) -> WorterbuchResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(WbFunction::Unsubscribe(client_id, transaction_id, tx)) + .await?; + rx.await? + } + + pub async fn unsubscribe_ls( + &self, + client_id: Uuid, + transaction_id: TransactionId, + ) -> WorterbuchResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(WbFunction::UnsubscribeLs(client_id, transaction_id, tx)) + .await?; + rx.await? + } + + pub async fn delete(&self, key: Key) -> WorterbuchResult<(Key, Value)> { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Delete(key, tx)).await?; + rx.await? + } + + pub async fn pdelete(&self, pattern: RequestPattern) -> WorterbuchResult { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::PDelete(pattern, tx)).await?; + rx.await? + } + + pub async fn connected( + &self, + client_id: Uuid, + remote_addr: SocketAddr, + ) -> WorterbuchResult<()> { + self.tx + .send(WbFunction::Connected(client_id, remote_addr)) + .await?; + Ok(()) + } + + pub async fn disconnected( + &self, + client_id: Uuid, + remote_addr: SocketAddr, + ) -> WorterbuchResult<()> { + self.tx + .send(WbFunction::Disconnected(client_id, remote_addr)) + .await?; + Ok(()) + } + + pub async fn config(&self) -> WorterbuchResult { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Config(tx)).await?; + Ok(rx.await?) + } + + pub async fn export(&self) -> WorterbuchResult { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Export(tx)).await?; + rx.await? + } + + pub async fn len(&self) -> WorterbuchResult { + let (tx, rx) = oneshot::channel(); + self.tx.send(WbFunction::Len(tx)).await?; + Ok(rx.await?) + } + + pub async fn supported_protocol_versions(&self) -> WorterbuchResult { + let (tx, rx) = oneshot::channel(); + self.tx + .send(WbFunction::SupportedProtocolVersions(tx)) + .await?; + Ok(rx.await?) + } +} diff --git a/worterbuch/src/server/common/v1_0.rs b/worterbuch/src/server/common/v1_0.rs index 47f45a1..cdb3dd3 100644 --- a/worterbuch/src/server/common/v1_0.rs +++ b/worterbuch/src/server/common/v1_0.rs @@ -1,24 +1,20 @@ -use crate::worterbuch::Worterbuch; use serde::Serialize; -use std::sync::Arc; -use tokio::{ - fs::File, - spawn, - sync::{mpsc::UnboundedSender, RwLock}, -}; +use tokio::{spawn, sync::mpsc::UnboundedSender}; use uuid::Uuid; use worterbuch_common::{ error::WorterbuchResult, error::{Context, WorterbuchError}, - Ack, ClientMessage as CM, Delete, Err, ErrorCode, Export, Get, HandshakeRequest, Import, - KeyValuePair, Ls, LsState, MetaData, PDelete, PGet, PState, PStateEvent, PSubscribe, Publish, - ServerMessage, Set, State, StateEvent, Subscribe, SubscribeLs, Unsubscribe, UnsubscribeLs, + Ack, ClientMessage as CM, Delete, Err, ErrorCode, Get, HandshakeRequest, KeyValuePair, Ls, + LsState, MetaData, PDelete, PGet, PState, PStateEvent, PSubscribe, Publish, ServerMessage, Set, + State, StateEvent, Subscribe, SubscribeLs, Unsubscribe, UnsubscribeLs, }; +use super::CloneableWbApi; + pub async fn process_incoming_message( client_id: Uuid, msg: &str, - worterbuch: Arc>, + worterbuch: &CloneableWbApi, tx: UnboundedSender, ) -> WorterbuchResult<(bool, bool)> { let mut hs = false; @@ -26,47 +22,43 @@ pub async fn process_incoming_message( Ok(Some(msg)) => match msg { CM::HandshakeRequest(msg) => { hs = true; - handshake(msg, worterbuch.clone(), tx.clone(), client_id.clone()).await?; + handshake(msg, worterbuch, &tx, client_id.clone()).await?; } CM::Get(msg) => { - get(msg, worterbuch.clone(), tx.clone()).await?; + get(msg, worterbuch, &tx).await?; } CM::PGet(msg) => { - pget(msg, worterbuch.clone(), tx.clone()).await?; + pget(msg, worterbuch, &tx).await?; } CM::Set(msg) => { - set(msg, worterbuch.clone(), tx.clone()).await?; + set(msg, worterbuch, &tx).await?; } CM::Publish(msg) => { - publish(msg, worterbuch.clone(), tx.clone()).await?; + publish(msg, worterbuch, &tx).await?; } CM::Subscribe(msg) => { let unique = msg.unique; - subscribe(msg, client_id, worterbuch.clone(), tx.clone(), unique).await?; + subscribe(msg, client_id, worterbuch, &tx, unique).await?; } CM::PSubscribe(msg) => { let unique = msg.unique; - psubscribe(msg, client_id, worterbuch.clone(), tx.clone(), unique).await?; - } - CM::Export(msg) => export(msg, worterbuch.clone(), tx.clone()).await?, - CM::Import(msg) => import(msg, worterbuch.clone(), tx.clone()).await?, - CM::Unsubscribe(msg) => { - unsubscribe(msg, worterbuch.clone(), tx.clone(), client_id).await? + psubscribe(msg, client_id, worterbuch, &tx, unique).await?; } + CM::Unsubscribe(msg) => unsubscribe(msg, worterbuch, &tx, client_id).await?, CM::Delete(msg) => { - delete(msg, worterbuch.clone(), tx.clone()).await?; + delete(msg, worterbuch, &tx).await?; } CM::PDelete(msg) => { - pdelete(msg, worterbuch.clone(), tx.clone()).await?; + pdelete(msg, worterbuch, &tx).await?; } CM::Ls(msg) => { - ls(msg, worterbuch.clone(), tx.clone()).await?; + ls(msg, worterbuch, &tx).await?; } CM::SubscribeLs(msg) => { - subscribe_ls(msg, client_id.clone(), worterbuch.clone(), tx.clone()).await?; + subscribe_ls(msg, client_id, worterbuch, &tx).await?; } CM::UnsubscribeLs(msg) => { - unsubscribe_ls(msg, client_id.clone(), worterbuch.clone(), tx.clone()).await?; + unsubscribe_ls(msg, client_id, worterbuch, &tx).await?; } CM::Keepalive => (), }, @@ -85,21 +77,22 @@ pub async fn process_incoming_message( async fn handshake( msg: HandshakeRequest, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, client_id: Uuid, ) -> WorterbuchResult<()> { - let mut wb = worterbuch.write().await; - - let response = match wb.handshake( - &msg.supported_protocol_versions, - msg.last_will, - msg.grave_goods, - client_id, - ) { + let response = match worterbuch + .handshake( + msg.supported_protocol_versions, + msg.last_will, + msg.grave_goods, + client_id, + ) + .await + { Ok(handshake) => handshake, Err(e) => { - handle_store_error(e, client.clone(), 0).await?; + handle_store_error(e, client, 0).await?; return Ok(()); } }; @@ -108,7 +101,7 @@ async fn handshake( Ok(data) => client .send(data) .context(|| format!("Error sending HANDSHAKE message",))?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -116,15 +109,13 @@ async fn handshake( async fn get( msg: Get, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let wb = worterbuch.read().await; - - let key_value = match wb.get(&msg.key) { + let key_value = match worterbuch.get(msg.key).await { Ok(key_value) => key_value.into(), Err(e) => { - handle_store_error(e, client.clone(), msg.transaction_id).await?; + handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } }; @@ -141,7 +132,7 @@ async fn get( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -149,15 +140,13 @@ async fn get( async fn pget( msg: PGet, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let wb = worterbuch.read().await; - - let values = match wb.pget(&msg.request_pattern) { + let values = match worterbuch.pget(msg.request_pattern.clone()).await { Ok(values) => values.into_iter().map(KeyValuePair::from).collect(), Err(e) => { - handle_store_error(e, client.clone(), msg.transaction_id).await?; + handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } }; @@ -175,7 +164,7 @@ async fn pget( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -183,12 +172,10 @@ async fn pget( async fn set( msg: Set, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let mut wb = worterbuch.write().await; - - if let Err(e) = wb.set(msg.key, msg.value) { + if let Err(e) = worterbuch.set(msg.key, msg.value).await { handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } @@ -204,7 +191,7 @@ async fn set( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -212,12 +199,10 @@ async fn set( async fn publish( msg: Publish, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let mut wb = worterbuch.write().await; - - if let Err(e) = wb.publish(msg.key, msg.value) { + if let Err(e) = worterbuch.publish(msg.key, msg.value).await { handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } @@ -233,7 +218,7 @@ async fn publish( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -242,21 +227,20 @@ async fn publish( async fn subscribe( msg: Subscribe, client_id: Uuid, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, unique: bool, ) -> WorterbuchResult { - let wb_unsub = worterbuch.clone(); - let mut wb = worterbuch.write().await; - - let (mut rx, subscription) = - match wb.subscribe(client_id, msg.transaction_id, msg.key.clone(), unique) { - Ok(it) => it, - Err(e) => { - handle_store_error(e, client, msg.transaction_id).await?; - return Ok(false); - } - }; + let (mut rx, subscription) = match worterbuch + .subscribe(client_id, msg.transaction_id, msg.key.clone(), unique) + .await + { + Ok(it) => it, + Err(e) => { + handle_store_error(e, client, msg.transaction_id).await?; + return Ok(false); + } + }; let response = Ack { transaction_id: msg.transaction_id, @@ -269,11 +253,14 @@ async fn subscribe( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client.clone()).await?, + Err(e) => handle_encode_error(e).await?, } let transaction_id = msg.transaction_id; + let wb_unsub = worterbuch.clone(); + let client_sub = client.clone(); + spawn(async move { log::debug!("Receiving events for subscription {subscription:?} …"); while let Some(event) = rx.recv().await { @@ -286,13 +273,13 @@ async fn subscribe( }; match serde_json::to_string(&ServerMessage::State(state)) { Ok(data) => { - if let Err(e) = client.clone().send(data) { + if let Err(e) = client_sub.send(data) { log::error!("Error sending STATE message to client: {e}"); break; } } Err(e) => { - if let Err(e) = handle_encode_error(e, client.clone()).await { + if let Err(e) = handle_encode_error(e).await { log::error!("Error sending ERROR message to client: {e}"); break; } @@ -301,11 +288,9 @@ async fn subscribe( } } - let mut wb = wb_unsub.write().await; - log::debug!("No more events, ending subscription {subscription:?}."); - match wb.unsubscribe(client_id, transaction_id) { + match wb_unsub.unsubscribe(client_id, transaction_id).await { Ok(()) => { - log::warn!("Subscription {subscription:?} was not cleaned up properly!"); + log::warn!("Subscription was not cleaned up properly!"); } Err(WorterbuchError::NotSubscribed) => { /* this is expected */ } Err(e) => { @@ -320,19 +305,19 @@ async fn subscribe( async fn psubscribe( msg: PSubscribe, client_id: Uuid, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, unique: bool, ) -> WorterbuchResult { - let wb_unsub = worterbuch.clone(); - let mut wb = worterbuch.write().await; - - let (mut rx, subscription) = match wb.psubscribe( - client_id, - msg.transaction_id, - msg.request_pattern.clone(), - unique, - ) { + let (mut rx, subscription) = match worterbuch + .psubscribe( + client_id, + msg.transaction_id, + msg.request_pattern.clone(), + unique, + ) + .await + { Ok(rx) => rx, Err(e) => { handle_store_error(e, client, msg.transaction_id).await?; @@ -351,12 +336,15 @@ async fn psubscribe( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client.clone()).await?, + Err(e) => handle_encode_error(e).await?, } let transaction_id = msg.transaction_id; let request_pattern = msg.request_pattern; + let wb_unsub = worterbuch.clone(); + let client_sub = client.clone(); + spawn(async move { log::debug!("Receiving events for subscription {subscription:?} …"); while let Some(event) = rx.recv().await { @@ -367,13 +355,13 @@ async fn psubscribe( }; match serde_json::to_string(&ServerMessage::PState(event)) { Ok(data) => { - if let Err(e) = client.clone().send(data) { + if let Err(e) = client_sub.send(data) { log::error!("Error sending STATE message to client: {e}"); break; } } Err(e) => { - if let Err(e) = handle_encode_error(e, client.clone()).await { + if let Err(e) = handle_encode_error(e).await { log::error!("Error sending ERROR message to client: {e}"); break; } @@ -381,11 +369,9 @@ async fn psubscribe( } } - let mut wb = wb_unsub.write().await; - log::debug!("No more events, ending subscription {subscription:?}."); - match wb.unsubscribe(client_id, transaction_id) { + match wb_unsub.unsubscribe(client_id, transaction_id).await { Ok(()) => { - log::warn!("Subscription {subscription:?} was not cleaned up properly!"); + log::warn!("Subscription was not cleaned up properly!"); } Err(WorterbuchError::NotSubscribed) => { /* this is expected */ } Err(e) => { @@ -397,83 +383,13 @@ async fn psubscribe( Ok(true) } -async fn export( - msg: Export, - worterbuch: Arc>, - client: UnboundedSender, -) -> WorterbuchResult<()> { - log::info!("export"); - let wb = worterbuch.read().await; - let mut file = File::create(&msg.path) - .await - .context(|| format!("Error creating file {}", &msg.path))?; - match wb.export_to_file(&mut file).await { - Ok(_) => { - let response = Ack { - transaction_id: msg.transaction_id, - }; - - match serde_json::to_string(&ServerMessage::Ack(response)) { - Ok(data) => client.send(data).context(|| { - format!( - "Error sending ACK message for transaction ID {}", - msg.transaction_id - ) - })?, - Err(e) => handle_encode_error(e, client.clone()).await?, - } - } - Err(e) => { - handle_store_error(e, client, msg.transaction_id).await?; - return Ok(()); - } - } - - Ok(()) -} - -async fn import( - msg: Import, - worterbuch: Arc>, - client: UnboundedSender, -) -> WorterbuchResult<()> { - log::info!("import"); - let mut wb = worterbuch.write().await; - - match wb.import_from_file(&msg.path).await { - Ok(()) => { - let response = Ack { - transaction_id: msg.transaction_id, - }; - - match serde_json::to_string(&ServerMessage::Ack(response)) { - Ok(data) => client.send(data).context(|| { - format!( - "Error sending ACK message for transaction ID {}", - msg.transaction_id - ) - })?, - Err(e) => handle_encode_error(e, client.clone()).await?, - } - } - Err(e) => { - handle_store_error(e, client, msg.transaction_id).await?; - return Ok(()); - } - } - - Ok(()) -} - async fn unsubscribe( msg: Unsubscribe, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, client_id: Uuid, ) -> WorterbuchResult<()> { - let mut wb = worterbuch.write().await; - - if let Err(e) = wb.unsubscribe(client_id, msg.transaction_id) { + if let Err(e) = worterbuch.unsubscribe(client_id, msg.transaction_id).await { handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); }; @@ -488,7 +404,7 @@ async fn unsubscribe( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client.clone()).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -496,15 +412,13 @@ async fn unsubscribe( async fn delete( msg: Delete, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let mut wb = worterbuch.write().await; - - let key_value = match wb.delete(msg.key) { + let key_value = match worterbuch.delete(msg.key).await { Ok(key_value) => key_value.into(), Err(e) => { - handle_store_error(e, client.clone(), msg.transaction_id).await?; + handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } }; @@ -521,7 +435,7 @@ async fn delete( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -529,15 +443,13 @@ async fn delete( async fn pdelete( msg: PDelete, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let mut wb = worterbuch.write().await; - - let deleted = match wb.pdelete(msg.request_pattern.clone()) { + let deleted = match worterbuch.pdelete(msg.request_pattern.clone()).await { Ok(it) => it, Result::Err(e) => { - handle_store_error(e, client.clone(), msg.transaction_id).await?; + handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } }; @@ -555,7 +467,7 @@ async fn pdelete( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -563,15 +475,13 @@ async fn pdelete( async fn ls( msg: Ls, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let wb = worterbuch.read().await; - - let children = match wb.ls(&msg.parent) { + let children = match worterbuch.ls(msg.parent).await { Ok(it) => it, Result::Err(e) => { - handle_store_error(e, client.clone(), msg.transaction_id).await?; + handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } }; @@ -588,7 +498,7 @@ async fn ls( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) @@ -597,20 +507,19 @@ async fn ls( async fn subscribe_ls( msg: SubscribeLs, client_id: Uuid, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult { - let wb_unsub = worterbuch.clone(); - let mut wb = worterbuch.write().await; - - let (mut rx, subscription) = - match wb.subscribe_ls(client_id, msg.transaction_id, msg.parent.clone()) { - Ok(it) => it, - Err(e) => { - handle_store_error(e, client, msg.transaction_id).await?; - return Ok(false); - } - }; + let (mut rx, subscription) = match worterbuch + .subscribe_ls(client_id, msg.transaction_id, msg.parent.clone()) + .await + { + Ok(it) => it, + Err(e) => { + handle_store_error(e, client, msg.transaction_id).await?; + return Ok(false); + } + }; let response = Ack { transaction_id: msg.transaction_id, @@ -623,11 +532,14 @@ async fn subscribe_ls( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client.clone()).await?, + Err(e) => handle_encode_error(e).await?, } let transaction_id = msg.transaction_id; + let wb_unsub = worterbuch.clone(); + let client_sub = client.clone(); + spawn(async move { log::debug!("Receiving events for ls subscription {subscription:?} …"); while let Some(children) = rx.recv().await { @@ -637,13 +549,13 @@ async fn subscribe_ls( }; match serde_json::to_string(&ServerMessage::LsState(state)) { Ok(data) => { - if let Err(e) = client.clone().send(data) { + if let Err(e) = client_sub.send(data) { log::error!("Error sending STATE message to client: {e}"); break; } } Err(e) => { - if let Err(e) = handle_encode_error(e, client.clone()).await { + if let Err(e) = handle_encode_error(e).await { log::error!("Error sending ERROR message to client: {e}"); break; } @@ -651,11 +563,9 @@ async fn subscribe_ls( } } - let mut wb = wb_unsub.write().await; - log::debug!("No more events, ending ls subscription {subscription:?}."); - match wb.unsubscribe_ls(client_id, transaction_id) { + match wb_unsub.unsubscribe_ls(client_id, transaction_id).await { Ok(()) => { - log::warn!("Ls Subscription {subscription:?} was not cleaned up properly!"); + log::warn!("Ls Subscription was not cleaned up properly!"); } Err(WorterbuchError::NotSubscribed) => { /* this is expected */ } Err(e) => { @@ -670,12 +580,13 @@ async fn subscribe_ls( async fn unsubscribe_ls( msg: UnsubscribeLs, client_id: Uuid, - worterbuch: Arc>, - client: UnboundedSender, + worterbuch: &CloneableWbApi, + client: &UnboundedSender, ) -> WorterbuchResult<()> { - let mut wb = worterbuch.write().await; - - if let Err(e) = wb.unsubscribe_ls(client_id, msg.transaction_id) { + if let Err(e) = worterbuch + .unsubscribe_ls(client_id, msg.transaction_id) + .await + { handle_store_error(e, client, msg.transaction_id).await?; return Ok(()); } @@ -690,23 +601,19 @@ async fn unsubscribe_ls( msg.transaction_id ) })?, - Err(e) => handle_encode_error(e, client.clone()).await?, + Err(e) => handle_encode_error(e).await?, } Ok(()) } -async fn handle_encode_error( - e: serde_json::Error, - client: UnboundedSender, -) -> WorterbuchResult<()> { - drop(client); +async fn handle_encode_error(e: serde_json::Error) -> WorterbuchResult<()> { panic!("Failed to encode a value to JSON: {e}"); } async fn handle_store_error( e: WorterbuchError, - client: UnboundedSender, + client: &UnboundedSender, transaction_id: u64, ) -> WorterbuchResult<()> { let error_code = ErrorCode::from(&e); diff --git a/worterbuch/src/server/poem.rs b/worterbuch/src/server/poem.rs index 3597d13..643e7d2 100644 --- a/worterbuch/src/server/poem.rs +++ b/worterbuch/src/server/poem.rs @@ -1,4 +1,7 @@ -use crate::{server::common::process_incoming_message, Config, Worterbuch}; +use crate::{ + server::common::{process_incoming_message, CloneableWbApi}, + Config, +}; use anyhow::anyhow; use futures::{sink::SinkExt, stream::StreamExt}; use poem::{ @@ -17,12 +20,11 @@ use serde_json::Value; use std::{ env, net::SocketAddr, - sync::Arc, time::{Duration, Instant}, }; use tokio::{ select, - sync::{mpsc, RwLock}, + sync::mpsc, time::{sleep, MissedTickBehavior}, }; use uuid::Uuid; @@ -35,15 +37,14 @@ const ASYNC_API_YAML: &'static str = include_str!("../../asyncapi.yaml"); const VERSION: &str = env!("CARGO_PKG_VERSION"); struct Api { - worterbuch: Arc>, + worterbuch: CloneableWbApi, } #[OpenApi] impl Api { #[oai(path = "/get/:key", method = "get")] async fn get(&self, Path(key): Path) -> Result> { - let wb = self.worterbuch.read().await; - match wb.get(&key) { + match self.worterbuch.get(key).await { Ok(kvp) => { let kvp: KeyValuePair = kvp.into(); Ok(Json(kvp)) @@ -54,8 +55,7 @@ impl Api { #[oai(path = "/pget/:pattern", method = "get")] async fn pget(&self, Path(pattern): Path) -> Result> { - let wb = self.worterbuch.read().await; - match wb.pget(&pattern) { + match self.worterbuch.pget(pattern).await { Ok(kvps) => Ok(Json(kvps)), Err(e) => to_error_response(e), } @@ -67,8 +67,7 @@ impl Api { Path(key): Path, Json(value): Json, ) -> Result> { - let mut wb = self.worterbuch.write().await; - match wb.set(key, value) { + match self.worterbuch.set(key, value).await { Ok(()) => {} Err(e) => return to_error_response(e), } @@ -81,8 +80,7 @@ impl Api { Path(key): Path, Json(value): Json, ) -> Result> { - let mut wb = self.worterbuch.write().await; - match wb.publish(key, value) { + match self.worterbuch.publish(key, value).await { Ok(()) => {} Err(e) => return to_error_response(e), } @@ -91,8 +89,7 @@ impl Api { #[oai(path = "/delete/:key", method = "delete")] async fn delete(&self, Path(key): Path) -> Result> { - let mut wb = self.worterbuch.write().await; - match wb.delete(key) { + match self.worterbuch.delete(key).await { Ok(kvp) => { let kvp: KeyValuePair = kvp.into(); Ok(Json(kvp)) @@ -103,8 +100,7 @@ impl Api { #[oai(path = "/pdelete/:pattern", method = "delete")] async fn pdelete(&self, Path(pattern): Path) -> Result> { - let mut wb = self.worterbuch.write().await; - match wb.pdelete(pattern) { + match self.worterbuch.pdelete(pattern).await { Ok(kvps) => Ok(Json(kvps)), Err(e) => to_error_response(e), } @@ -112,8 +108,7 @@ impl Api { #[oai(path = "/ls/:key", method = "get")] async fn ls(&self, Path(key): Path) -> Result>> { - let wb = self.worterbuch.read().await; - match wb.ls(&Some(key)) { + match self.worterbuch.ls(Some(key)).await { Ok(kvps) => Ok(Json(kvps)), Err(e) => to_error_response(e), } @@ -121,8 +116,7 @@ impl Api { #[oai(path = "/ls", method = "get")] async fn ls_root(&self) -> Result>> { - let wb = self.worterbuch.read().await; - match wb.ls(&None) { + match self.worterbuch.ls(None).await { Ok(kvps) => Ok(Json(kvps)), Err(e) => to_error_response(e), } @@ -143,19 +137,20 @@ fn to_error_response(e: WorterbuchError) -> Result { #[handler] async fn ws( ws: WebSocket, - Data(data): Data<&(Arc>, ProtocolVersion)>, + Data(data): Data<&(CloneableWbApi, ProtocolVersion)>, req: &Request, ) -> impl IntoResponse { - let worterbuch = &data.0; + log::info!("Client connected"); + let worterbuch = data.0.clone(); let proto_version = data.1.to_owned(); - let wb: Arc> = worterbuch.clone(); let remote = *req .remote_addr() .as_socket_addr() .expect("Client has no remote address."); ws.protocols(vec!["worterbuch"]) .on_upgrade(move |socket| async move { - let mut client_handler = ClientHandler::new(socket, wb, remote, proto_version).await; + let mut client_handler = + ClientHandler::new(socket, worterbuch, remote, proto_version).await; if let Err(e) = client_handler.serve().await { log::error!("Error in WS connection: {e}"); } @@ -192,18 +187,15 @@ fn admin_data() -> (String, String, String) { (admin_name, admin_url, admin_email) } -pub async fn start( - worterbuch: Arc>, - config: Config, -) -> Result<(), std::io::Error> { +pub async fn start(worterbuch: CloneableWbApi, config: Config) -> Result<(), std::io::Error> { let port = config.port; let bind_addr = config.bind_addr; let public_addr = config.public_address; let proto = config.proto; - let proto_versions = { - let wb = worterbuch.read().await; - wb.supported_protocol_versions() - }; + let proto_versions = worterbuch + .supported_protocol_versions() + .await + .unwrap_or(Vec::new()); let addr = format!("{bind_addr}:{port}"); @@ -278,7 +270,7 @@ struct ClientHandler { keepalive_timeout: Duration, send_timeout: Duration, websocket: WebSocketStream, - worterbuch: Arc>, + worterbuch: CloneableWbApi, remote_addr: SocketAddr, proto_version: ProtocolVersion, } @@ -286,14 +278,14 @@ struct ClientHandler { impl ClientHandler { async fn new( websocket: WebSocketStream, - worterbuch: Arc>, + worterbuch: CloneableWbApi, remote_addr: SocketAddr, proto_version: ProtocolVersion, ) -> Self { - let wb = worterbuch.clone(); - let wb = wb.read().await; - let keepalive_timeout = wb.config().keepalive_timeout.clone(); - let send_timeout = wb.config().send_timeout.clone(); + let config = worterbuch.config().await.ok(); + let (keepalive_timeout, send_timeout) = config + .map(|c| (c.keepalive_timeout, c.send_timeout)) + .unwrap_or((Duration::from_secs(10), Duration::from_secs(10))); Self { client_id: Uuid::new_v4(), handshake_complete: false, @@ -309,15 +301,12 @@ impl ClientHandler { } async fn serve(&mut self) -> anyhow::Result<()> { - let client_id = self.client_id.clone(); + let client_id = self.client_id; let remote_addr = self.remote_addr; log::info!("New client connected: {client_id} ({remote_addr})"); - { - let mut wb = self.worterbuch.write().await; - wb.connected(self.client_id, remote_addr); - } + self.worterbuch.connected(client_id, remote_addr).await?; log::debug!("Receiving messages from client {client_id} ({remote_addr}) …",); @@ -325,8 +314,7 @@ impl ClientHandler { log::error!("Error in serve loop: {e}"); } - let mut wb = self.worterbuch.write().await; - wb.disconnected(client_id, remote_addr); + self.worterbuch.disconnected(client_id, remote_addr).await?; Ok(()) } @@ -347,7 +335,7 @@ impl ClientHandler { let (msg_processed, handshake) = process_incoming_message( self.client_id, &text, - self.worterbuch.clone(), + &mut self.worterbuch, tx.clone(), &self.proto_version, ) diff --git a/worterbuch/src/stats.rs b/worterbuch/src/stats.rs index 25477cf..45adcfd 100644 --- a/worterbuch/src/stats.rs +++ b/worterbuch/src/stats.rs @@ -1,10 +1,7 @@ -use crate::worterbuch::Worterbuch; +use crate::server::common::CloneableWbApi; use serde_json::json; -use std::{sync::Arc, time::Duration}; -use tokio::{ - sync::RwLock, - time::{sleep, Instant}, -}; +use std::time::Duration; +use tokio::time::{sleep, Instant}; use worterbuch_common::error::WorterbuchResult; pub const SYSTEM_TOPIC_ROOT: &str = "$SYS"; @@ -12,35 +9,35 @@ pub const SYSTEM_TOPIC_CLIENTS: &str = "clients"; pub const SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS: &str = "supportedProtocolVersions"; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub async fn track_stats(wb: Arc>) -> WorterbuchResult<()> { +pub async fn track_stats(wb: CloneableWbApi) -> WorterbuchResult<()> { let start = Instant::now(); - wb.write() - .await - .set(format!("{SYSTEM_TOPIC_ROOT}/version"), json!(VERSION))?; + wb.set(format!("{SYSTEM_TOPIC_ROOT}/version"), json!(VERSION)) + .await?; loop { update_stats(&wb, start).await?; - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(1)).await; } } -async fn update_stats(wb: &Arc>, start: Instant) -> WorterbuchResult<()> { - let mut wb_write = wb.write().await; - update_uptime(&mut wb_write, start.elapsed())?; - update_message_count(&mut wb_write)?; +async fn update_stats(wb: &CloneableWbApi, start: Instant) -> WorterbuchResult<()> { + update_uptime(wb, start.elapsed()).await?; + update_message_count(wb).await?; Ok(()) } -fn update_uptime(wb: &mut Worterbuch, uptime: Duration) -> WorterbuchResult<()> { +async fn update_uptime(wb: &CloneableWbApi, uptime: Duration) -> WorterbuchResult<()> { wb.set( format!("{SYSTEM_TOPIC_ROOT}/uptime"), json!(uptime.as_secs()), ) + .await } -fn update_message_count(wb: &mut Worterbuch) -> WorterbuchResult<()> { - let len = wb.len(); +async fn update_message_count(wb: &CloneableWbApi) -> WorterbuchResult<()> { + let len = wb.len().await?; wb.set( format!("{SYSTEM_TOPIC_ROOT}/store/values/count"), json!(len), ) + .await } diff --git a/worterbuch/src/worterbuch.rs b/worterbuch/src/worterbuch.rs index 975e648..389100d 100644 --- a/worterbuch/src/worterbuch.rs +++ b/worterbuch/src/worterbuch.rs @@ -204,7 +204,7 @@ impl Worterbuch { &mut self, client_id: Uuid, transaction_id: TransactionId, - parent: Option, + parent: Option, ) -> WorterbuchResult<(UnboundedReceiver>, SubscriptionId)> { let children = self.ls(&parent).unwrap_or_else(|_| Vec::new()); let path: Vec = parent @@ -368,7 +368,7 @@ impl Worterbuch { } } - pub fn delete(&mut self, key: Key) -> WorterbuchResult<(String, Value)> { + pub fn delete(&mut self, key: Key) -> WorterbuchResult<(Key, Value)> { let path: Vec = parse_segments(&key)?; if path.is_empty() || path[0] == SYSTEM_TOPIC_ROOT {