Skip to content

Commit

Permalink
accessing worterubch over channels instead of RwLock
Browse files Browse the repository at this point in the history
  • Loading branch information
babymotte committed Sep 14, 2023
1 parent 65ea30a commit a31f360
Show file tree
Hide file tree
Showing 10 changed files with 599 additions and 392 deletions.
21 changes: 1 addition & 20 deletions worterbuch-common/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{
GraveGoods, Key, LastWill, Path, ProtocolVersions, RequestPattern, TransactionId, UniqueFlag,
Value,
GraveGoods, Key, LastWill, ProtocolVersions, RequestPattern, TransactionId, UniqueFlag, Value,
};
use serde::{Deserialize, Serialize};

Expand All @@ -14,8 +13,6 @@ pub enum ClientMessage {
Publish(Publish),
Subscribe(Subscribe),
PSubscribe(PSubscribe),
Export(Export),
Import(Import),
Unsubscribe(Unsubscribe),
Delete(Delete),
PDelete(PDelete),
Expand All @@ -36,8 +33,6 @@ impl ClientMessage {
ClientMessage::Publish(m) => Some(m.transaction_id),
ClientMessage::Subscribe(m) => Some(m.transaction_id),
ClientMessage::PSubscribe(m) => Some(m.transaction_id),
ClientMessage::Export(m) => Some(m.transaction_id),
ClientMessage::Import(m) => Some(m.transaction_id),
ClientMessage::Unsubscribe(m) => Some(m.transaction_id),
ClientMessage::Delete(m) => Some(m.transaction_id),
ClientMessage::PDelete(m) => Some(m.transaction_id),
Expand Down Expand Up @@ -102,20 +97,6 @@ pub struct PSubscribe {
pub unique: UniqueFlag,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Export {
pub transaction_id: TransactionId,
pub path: Path,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Import {
pub transaction_id: TransactionId,
pub path: Path,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Unsubscribe {
Expand Down
18 changes: 17 additions & 1 deletion worterbuch-common/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::{server::Err, ErrorCode, Key, MetaData, RequestPattern};
use std::{fmt, io, net::AddrParseError, num::ParseIntError};
use tokio::sync::{broadcast, mpsc::error::SendError, oneshot};
use tokio::sync::{
broadcast,
mpsc::{self, error::SendError},
oneshot,
};

#[derive(Debug)]
pub enum ConfigError {
Expand Down Expand Up @@ -138,6 +142,18 @@ impl<T, V: fmt::Debug + 'static + Send + Sync> Context<T, SendError<V>>
}
}

impl<T: Send + Sync + 'static> From<mpsc::error::SendError<T>> for WorterbuchError {
fn from(value: mpsc::error::SendError<T>) -> Self {
WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
}
}

impl From<oneshot::error::RecvError> for WorterbuchError {
fn from(value: oneshot::error::RecvError) -> Self {
WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
}
}

pub type WorterbuchResult<T> = std::result::Result<T, WorterbuchError>;

#[derive(Debug)]
Expand Down
150 changes: 111 additions & 39 deletions worterbuch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,78 +8,150 @@ mod worterbuch;

pub use crate::worterbuch::*;
pub use config::*;
use server::common::{CloneableWbApi, WbFunction};
use stats::{SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS};
use worterbuch_common::topic;

use crate::stats::track_stats;
use anyhow::Result;
use std::sync::Arc;
#[cfg(not(target_os = "windows"))]
use tokio::signal::unix::{signal, SignalKind};
use tokio::{
spawn,
sync::{oneshot, RwLock},
};
use tokio::{select, spawn, sync::mpsc};

pub async fn run_worterbuch(config: Config) -> Result<()> {
let config_pers = config.clone();

let use_persistence = config.use_persistence;

let worterbuch = start_worterbuch(config.clone()).await?;
let mut worterbuch = if use_persistence {
persistence::load(config.clone()).await?
} else {
Worterbuch::with_config(config.clone())
};

let (_terminate_tx, terminate_rx) = oneshot::channel::<()>();
worterbuch.set(
topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS),
serde_json::to_value(worterbuch.supported_protocol_versions()).expect("cannot fail"),
)?;

let (api_tx, mut api_rx) = mpsc::channel(1);
let api = CloneableWbApi::new(api_tx);

let worterbuch_pers = api.clone();
let worterbuch_uptime = api.clone();

if use_persistence {
spawn(persistence::periodic(worterbuch_pers, config_pers));
}

spawn(track_stats(worterbuch_uptime));

spawn(server::poem::start(api.clone(), config.clone()));

let (_terminate_tx, mut terminate_rx) = mpsc::channel(1);

loop {
select! {
recv = api_rx.recv() => match recv {
Some(function) => process_api_call(&mut worterbuch, function).await,
None => break,
},
_ = terminate_rx.recv() => break,
}
}

#[cfg(not(target_os = "windows"))]
spawn(async move {
match signal(SignalKind::terminate()) {
Ok(mut signal) => {
signal.recv().await;
log::info!("SIGTERM received.");
if let Err(_) = _terminate_tx.send(()) {
if let Err(_) = _terminate_tx.send(()).await {
log::error!("Error sending terminate signal");
}
}
Err(e) => log::error!("Error registring SIGTERM handler: {e}"),
}
});

terminate_rx.await?;

log::info!("Shutting down.");

if use_persistence {
persistence::once(worterbuch.clone(), config).await?;
persistence::once(&api, config).await?;
}

Ok(())
}

pub async fn start_worterbuch(config: Config) -> Result<Arc<RwLock<Worterbuch>>> {
let config_pers = config.clone();

let use_persistence = config.use_persistence;

let mut worterbuch = if use_persistence {
persistence::load(config.clone()).await?
} else {
Worterbuch::with_config(config.clone())
};

worterbuch.set(
topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS),
serde_json::to_value(worterbuch.supported_protocol_versions()).expect("cannot fail"),
)?;

let worterbuch = Arc::new(RwLock::new(worterbuch));
let worterbuch_pers = worterbuch.clone();
let worterbuch_uptime = worterbuch.clone();

if use_persistence {
spawn(persistence::periodic(worterbuch_pers, config_pers));
async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) {
match function {
WbFunction::Handshake(client_protocol_versions, last_will, grave_goods, client_id, tx) => {
tx.send(worterbuch.handshake(
&client_protocol_versions,
last_will,
grave_goods,
client_id,
))
.ok();
}
WbFunction::Get(key, tx) => {
tx.send(worterbuch.get(&key)).ok();
}
WbFunction::Set(key, value, tx) => {
tx.send(worterbuch.set(key, value)).ok();
}
WbFunction::Publish(key, value, tx) => {
tx.send(worterbuch.publish(key, value)).ok();
}
WbFunction::Ls(parent, tx) => {
tx.send(worterbuch.ls(&parent)).ok();
}
WbFunction::PGet(pattern, tx) => {
tx.send(worterbuch.pget(&pattern)).ok();
}
WbFunction::Subscribe(client_id, transaction_id, key, unique, tx) => {
tx.send(worterbuch.subscribe(client_id, transaction_id, key, unique))
.ok();
}
WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, tx) => {
tx.send(worterbuch.psubscribe(client_id, transaction_id, pattern, unique))
.ok();
}
WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
tx.send(worterbuch.subscribe_ls(client_id, transaction_id, parent))
.ok();
}
WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
tx.send(worterbuch.unsubscribe(client_id, transaction_id))
.ok();
}
WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
.ok();
}
WbFunction::Delete(key, tx) => {
tx.send(worterbuch.delete(key)).ok();
}
WbFunction::PDelete(pattern, tx) => {
tx.send(worterbuch.pdelete(pattern)).ok();
}
WbFunction::Connected(client_id, remote_addr) => {
worterbuch.connected(client_id, remote_addr);
}
WbFunction::Disconnected(client_id, remote_addr) => {
worterbuch.disconnected(client_id, remote_addr);
}
WbFunction::Config(tx) => {
tx.send(worterbuch.config().clone()).ok();
}
WbFunction::Export(tx) => {
tx.send(worterbuch.export()).ok();
}
WbFunction::Len(tx) => {
tx.send(worterbuch.len()).ok();
}
WbFunction::SupportedProtocolVersions(tx) => {
tx.send(worterbuch.supported_protocol_versions()).ok();
}
}

spawn(track_stats(worterbuch_uptime));

spawn(server::poem::start(worterbuch.clone(), config.clone()));

Ok(worterbuch)
}
26 changes: 5 additions & 21 deletions worterbuch/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,20 @@
use anyhow::Result;
use clap::Parser;
use tokio::runtime::{self, Runtime};
use worterbuch::run_worterbuch;
use worterbuch::Config;

#[derive(Parser)]
#[command(author, version, about = "An in-memory data base / message broker hybrid", long_about = None)]
struct Args {
/// Start Wörterbuch in single threaded mode. Default is multi threaded.
#[arg(short, long)]
single_threaded: bool,
}
struct Args {}

fn main() -> Result<()> {
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
env_logger::init();
let config = Config::new()?;
let args: Args = Args::parse();

let single_threaded = args.single_threaded || config.single_threaded;

let rt = if single_threaded {
log::info!("Starting Wörterbuch in single-threaded mode …");
runtime::Builder::new_current_thread()
.enable_all()
.build()?
} else {
log::info!("Starting Wörterbuch in multi-threaded mode …");
Runtime::new()?
};
let _args: Args = Args::parse();

rt.block_on(run_worterbuch(config))?;
run_worterbuch(config).await?;

Ok(())
}
15 changes: 6 additions & 9 deletions worterbuch/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
use crate::{config::Config, worterbuch::Worterbuch};
use crate::{config::Config, server::common::CloneableWbApi, worterbuch::Worterbuch};
use anyhow::Result;
use sha2::{Digest, Sha256};
use std::{path::PathBuf, sync::Arc};
use std::path::PathBuf;
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
sync::RwLock,
time::sleep,
};

pub(crate) async fn periodic(worterbuch: Arc<RwLock<Worterbuch>>, config: Config) -> Result<()> {
pub(crate) async fn periodic(worterbuch: CloneableWbApi, config: Config) -> Result<()> {
let interval = config.persistence_interval;

loop {
sleep(interval).await;
once(worterbuch.clone(), config.clone()).await?;
once(&worterbuch, config.clone()).await?;
}
}

pub(crate) async fn once(worterbuch: Arc<RwLock<Worterbuch>>, config: Config) -> Result<()> {
let wb = worterbuch.read().await;

pub(crate) async fn once(worterbuch: &CloneableWbApi, config: Config) -> Result<()> {
let (json_temp_path, json_path, sha_temp_path, sha_path) = file_paths(&config);

let json = wb.export()?.to_string();
let json = worterbuch.export().await?.to_string();

let mut hasher = Sha256::new();
hasher.update(&json);
Expand Down
Loading

0 comments on commit a31f360

Please sign in to comment.