Skip to content

Commit

Permalink
implemented periodic persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
babymotte committed Jun 30, 2022
1 parent f3d7e48 commit 1a7a203
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 23 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions libworterbuch/src/client/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::ConfigResult;
use crate::error::{ConfigIntContext, ConfigResult};
use std::env;

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -19,7 +19,7 @@ impl Config {
}

if let Ok(val) = env::var("WORTERBUCH_PORT") {
self.port = val.parse()?;
self.port = val.parse().as_port()?;
}

Ok(())
Expand Down
16 changes: 13 additions & 3 deletions libworterbuch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub enum ConfigError {
InvalidMultiWildcard(String),
InvalidPort(ParseIntError),
InvalidAddr(AddrParseError),
InvalidInterval(ParseIntError),
}

impl std::error::Error for ConfigError {}
Expand All @@ -149,13 +150,22 @@ impl fmt::Display for ConfigError {
),
ConfigError::InvalidPort(e) => write!(f, "invalid port: {e}"),
ConfigError::InvalidAddr(e) => write!(f, "invalid address: {e}"),
ConfigError::InvalidInterval(e) => write!(f, "invalid port: {e}"),
}
}
}

impl From<ParseIntError> for ConfigError {
fn from(e: ParseIntError) -> Self {
ConfigError::InvalidPort(e)
pub trait ConfigIntContext<I> {
fn as_port(self) -> Result<I, ConfigError>;
fn as_interval(self) -> Result<I, ConfigError>;
}

impl<I> ConfigIntContext<I> for Result<I, ParseIntError> {
fn as_port(self) -> Result<I, ConfigError> {
self.map_err(ConfigError::InvalidPort)
}
fn as_interval(self) -> Result<I, ConfigError> {
self.map_err(ConfigError::InvalidInterval)
}
}

Expand Down
2 changes: 2 additions & 0 deletions worterbuch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
uuid = { version = "1.1.1", features = ["v4"] }
clap = "3.2.6"
sha2 = "0.10.2"
hex = "0.4.3"

juniper = { version = "0.15.9", optional = true }
juniper_graphql_ws = { version = "0.3.0", optional = true }
Expand Down
34 changes: 28 additions & 6 deletions worterbuch/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#[cfg(feature = "server")]
use std::net::IpAddr;
use std::{env, net::IpAddr};
use std::{env, net::IpAddr, time::Duration};

use libworterbuch::error::{ConfigError, ConfigResult};
use libworterbuch::{
codec::Path,
error::{ConfigError, ConfigIntContext, ConfigResult},
};

#[derive(Debug, Clone, PartialEq)]
pub struct Config {
Expand All @@ -22,6 +25,9 @@ pub struct Config {
pub cert_path: Option<String>,
#[cfg(feature = "web")]
pub key_path: Option<String>,
pub persistent_data: bool,
pub persistence_interval: Duration,
pub data_dir: Path,
}

impl Config {
Expand All @@ -45,12 +51,12 @@ impl Config {

#[cfg(feature = "web")]
if let Ok(val) = env::var("WORTERBUCH_WEB_PORT") {
self.web_port = val.parse()?;
self.web_port = val.parse().as_port()?;
}

#[cfg(feature = "tcp")]
if let Ok(val) = env::var("WORTERBUCH_TCP_PORT") {
self.tcp_port = val.parse()?;
self.tcp_port = val.parse().as_port()?;
}

#[cfg(feature = "graphql")]
Expand All @@ -59,8 +65,20 @@ impl Config {
}

if let Ok(val) = env::var("WORTERBUCH_BIND_ADDRESS") {
let ip: IpAddr = val.parse()?;
self.bind_addr = ip;
self.bind_addr = val.parse()?;
}

if let Ok(val) = env::var("WORTERBUCH_PERSISTENT_DATA") {
self.persistent_data = val.to_lowercase() == "true";
}

if let Ok(val) = env::var("WORTERBUCH_PERSISTENCE_INTERVAL") {
let secs = val.parse().as_interval()?;
self.persistence_interval = Duration::from_secs(secs);
}

if let Ok(val) = env::var("WORTERBUCH_DATA_DIR") {
self.data_dir = val;
}

Ok(())
Expand Down Expand Up @@ -92,6 +110,10 @@ impl Default for Config {
cert_path: None,
#[cfg(feature = "web")]
key_path: None,
persistent_data: false,
// TODO increase default persistence period
persistence_interval: Duration::from_secs(5),
data_dir: "./data".into(),
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions worterbuch/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod config;
mod persistence;
#[cfg(not(feature = "docker"))]
mod repl;
mod server;
Expand Down Expand Up @@ -34,6 +35,8 @@ async fn main() -> Result<()> {

async fn run(msg: &str) -> Result<()> {
let config = Config::new()?;
let config_pers = config.clone();
let config_ppers = config.clone();

App::new("worterbuch")
.version(env!("CARGO_PKG_VERSION"))
Expand All @@ -46,7 +49,13 @@ async fn run(msg: &str) -> Result<()> {
log::debug!("Wildcard: {}", config.wildcard);
log::debug!("Multi-Wildcard: {}", config.multi_wildcard);

// let restore_from_persistence = config.persistent_data;

let worterbuch = Arc::new(RwLock::new(Worterbuch::with_config(config.clone())));
let worterbuch_pers = worterbuch.clone();
let worterbuch_ppers = worterbuch.clone();

spawn(persistence::periodic(worterbuch_pers, config_pers));

#[cfg(feature = "graphql")]
spawn(server::gql_warp::start(worterbuch.clone(), config.clone()));
Expand All @@ -68,5 +77,7 @@ async fn run(msg: &str) -> Result<()> {
repl(worterbuch).await;
}

persistence::once(worterbuch_ppers, config_ppers).await?;

Ok(())
}
44 changes: 44 additions & 0 deletions worterbuch/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use crate::{config::Config, worterbuch::Worterbuch};
use anyhow::Result;
use std::{path::PathBuf, sync::Arc};
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
sync::RwLock,
time::sleep,
};

pub(crate) async fn periodic(worterbuch: Arc<RwLock<Worterbuch>>, config: Config) -> Result<()> {
let interval = config.persistence_interval;

loop {
sleep(interval).await;
once(worterbuch.clone(), config.clone()).await?;
}
}

pub(crate) async fn once(worterbuch: Arc<RwLock<Worterbuch>>, config: Config) -> Result<()> {
let wb = worterbuch.read().await;

let dir = PathBuf::from(&config.data_dir);

let mut json_temp_path = dir.clone();
json_temp_path.push(".store.json~");
let mut json_path = dir.clone();
json_path.push(".store.json");
let mut sha_temp_path = dir.clone();
sha_temp_path.push(".store.sha~");
let mut sha_path = dir.clone();
sha_path.push(".store.sha");

let mut file = File::create(&json_temp_path).await?;
let sha = wb.export_to_file(&mut file).await?;
let sha = hex::encode(&sha);
let mut file = File::create(&sha_temp_path).await?;
file.write_all(sha.as_bytes()).await?;

fs::copy(&json_temp_path, &json_path).await?;
fs::copy(&sha_temp_path, &sha_path).await?;

Ok(())
}
8 changes: 6 additions & 2 deletions worterbuch/src/server/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use libworterbuch::{
use serde::Serialize;
use std::sync::Arc;
use tokio::{
fs::File,
io::AsyncReadExt,
spawn,
sync::{mpsc::UnboundedSender, RwLock},
Expand Down Expand Up @@ -297,8 +298,11 @@ async fn export(
) -> WorterbuchResult<()> {
log::info!("export");
let wb = worterbuch.read().await;
match wb.export_to_file(&msg.path).await {
Ok(()) => {
let mut file = File::create(&msg.path)
.await
.context(|| format!("Error creating file {}", &msg.path))?;
match wb.export_to_file(&mut file).await {
Ok(_) => {
let response = Ack {
transaction_id: msg.transaction_id,
};
Expand Down
24 changes: 14 additions & 10 deletions worterbuch/src/worterbuch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use libworterbuch::{
};
use serde::{Deserialize, Serialize};
use serde_json::{from_str, to_value, Value};
use sha2::{Digest, Sha256};
use std::fmt::Display;
use tokio::{
fs::File,
Expand Down Expand Up @@ -192,17 +193,20 @@ impl Worterbuch {
Ok(imported_values)
}

pub async fn export_to_file(&self, path: &Path) -> WorterbuchResult<()> {
log::info!("Exporting to {path} …");
let json = self.export()?;
let mut file = File::create(path)
.await
.context(|| format!("Error creating file {path}"))?;
file.write_all(json.to_string().as_bytes())
pub async fn export_to_file(&self, file: &mut File) -> WorterbuchResult<Vec<u8>> {
log::debug!("Exporting to {file:?} …");
let json = self.export()?.to_string();
let json_bytes = json.as_bytes();

let mut hasher = Sha256::new();
hasher.update(b"hello world");
let result = hasher.finalize();

file.write_all(json_bytes)
.await
.context(|| format!("Error writing to file {path}"))?;
log::info!("Done.");
Ok(())
.context(|| format!("Error writing to file {file:?}"))?;
log::debug!("Done.");
Ok(result.as_slice().to_owned())
}

pub async fn import_from_file(&mut self, path: &Path) -> WorterbuchResult<()> {
Expand Down

0 comments on commit 1a7a203

Please sign in to comment.