Skip to content

Commit

Permalink
graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
babymotte committed Sep 14, 2023
1 parent a31f360 commit 14b6bab
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions worterbuch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ categories = ["database-implementations"]
[dependencies]
worterbuch-common = { version = "0.37.0", features = ["web"] }
tokio = { version = "1.26.0", features = ["signal", "rt-multi-thread", "fs"] }
tokio-graceful-shutdown = "0.13.0"
log = "0.4.17"
env_logger = "0.10.0"
dotenv = "0.15.0"
Expand Down
38 changes: 14 additions & 24 deletions worterbuch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ pub use crate::worterbuch::*;
pub use config::*;
use server::common::{CloneableWbApi, WbFunction};
use stats::{SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS};
use tokio_graceful_shutdown::SubsystemHandle;
use worterbuch_common::topic;

use crate::stats::track_stats;
use anyhow::Result;
#[cfg(not(target_os = "windows"))]
use tokio::signal::unix::{signal, SignalKind};
use tokio::{select, spawn, sync::mpsc};
use tokio::{select, sync::mpsc};

pub async fn run_worterbuch(config: Config) -> Result<()> {
pub async fn run_worterbuch(subsys: SubsystemHandle) -> Result<()> {
let config = Config::new()?;
let config_pers = config.clone();

let use_persistence = config.use_persistence;
Expand All @@ -41,39 +42,28 @@ pub async fn run_worterbuch(config: Config) -> Result<()> {
let worterbuch_uptime = api.clone();

if use_persistence {
spawn(persistence::periodic(worterbuch_pers, config_pers));
subsys.start("persistence", |subsys| {
persistence::periodic(worterbuch_pers, config_pers, subsys)
});
}

spawn(track_stats(worterbuch_uptime));

spawn(server::poem::start(api.clone(), config.clone()));

let (_terminate_tx, mut terminate_rx) = mpsc::channel(1);
let sapi = api.clone();
let sconf = config.clone();
subsys.start("stats", |subsys| track_stats(worterbuch_uptime, subsys));
subsys.start("webserver", |subsys| {
server::poem::start(sapi, sconf, subsys)
});

loop {
select! {
recv = api_rx.recv() => match recv {
Some(function) => process_api_call(&mut worterbuch, function).await,
None => break,
},
_ = terminate_rx.recv() => break,
_ = subsys.on_shutdown_requested() => break,
}
}

#[cfg(not(target_os = "windows"))]
spawn(async move {
match signal(SignalKind::terminate()) {
Ok(mut signal) => {
signal.recv().await;
log::info!("SIGTERM received.");
if let Err(_) = _terminate_tx.send(()).await {
log::error!("Error sending terminate signal");
}
}
Err(e) => log::error!("Error registring SIGTERM handler: {e}"),
}
});

log::info!("Shutting down.");

if use_persistence {
Expand Down
11 changes: 8 additions & 3 deletions worterbuch/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::time::Duration;

use anyhow::Result;
use clap::Parser;
use tokio_graceful_shutdown::Toplevel;
use worterbuch::run_worterbuch;
use worterbuch::Config;

#[derive(Parser)]
#[command(author, version, about = "An in-memory data base / message broker hybrid", long_about = None)]
Expand All @@ -11,10 +13,13 @@ struct Args {}
async fn main() -> Result<()> {
dotenv::dotenv().ok();
env_logger::init();
let config = Config::new()?;
let _args: Args = Args::parse();

run_worterbuch(config).await?;
Toplevel::new()
.start("worterbuch", run_worterbuch)
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await?;

Ok(())
}
20 changes: 15 additions & 5 deletions worterbuch/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,26 @@ use std::path::PathBuf;
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
time::sleep,
select,
time::interval,
};
use tokio_graceful_shutdown::SubsystemHandle;

pub(crate) async fn periodic(worterbuch: CloneableWbApi, config: Config) -> Result<()> {
let interval = config.persistence_interval;
pub(crate) async fn periodic(
worterbuch: CloneableWbApi,
config: Config,
subsys: SubsystemHandle,
) -> Result<()> {
let mut interval = interval(config.persistence_interval);

loop {
sleep(interval).await;
once(&worterbuch, config.clone()).await?;
select! {
_ = interval.tick() => once(&worterbuch, config.clone()).await?,
_ = subsys.on_shutdown_requested() => break,
}
}

Ok(())
}

pub(crate) async fn once(worterbuch: &CloneableWbApi, config: Config) -> Result<()> {
Expand Down
20 changes: 17 additions & 3 deletions worterbuch/src/server/poem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::{
sync::mpsc,
time::{sleep, MissedTickBehavior},
};
use tokio_graceful_shutdown::SubsystemHandle;
use uuid::Uuid;
use worterbuch_common::{
error::WorterbuchError, quote, KeyValuePair, KeyValuePairs, ProtocolVersion, RegularKeySegment,
Expand All @@ -38,6 +39,7 @@ const VERSION: &str = env!("CARGO_PKG_VERSION");

struct Api {
worterbuch: CloneableWbApi,
_subsys: SubsystemHandle,
}

#[OpenApi]
Expand Down Expand Up @@ -137,7 +139,7 @@ fn to_error_response<T>(e: WorterbuchError) -> Result<T> {
#[handler]
async fn ws(
ws: WebSocket,
Data(data): Data<&(CloneableWbApi, ProtocolVersion)>,
Data(data): Data<&(CloneableWbApi, ProtocolVersion, SubsystemHandle)>,
req: &Request,
) -> impl IntoResponse {
log::info!("Client connected");
Expand Down Expand Up @@ -187,7 +189,11 @@ fn admin_data() -> (String, String, String) {
(admin_name, admin_url, admin_email)
}

pub async fn start(worterbuch: CloneableWbApi, config: Config) -> Result<(), std::io::Error> {
pub async fn start(
worterbuch: CloneableWbApi,
config: Config,
subsys: SubsystemHandle,
) -> Result<(), std::io::Error> {
let port = config.port;
let bind_addr = config.bind_addr;
let public_addr = config.public_address;
Expand All @@ -201,6 +207,7 @@ pub async fn start(worterbuch: CloneableWbApi, config: Config) -> Result<(), std

let api = Api {
worterbuch: worterbuch.clone(),
_subsys: subsys.clone(),
};

let api_path = "/api";
Expand Down Expand Up @@ -229,6 +236,7 @@ pub async fn start(worterbuch: CloneableWbApi, config: Config) -> Result<(), std
.last()
.expect("cannot be none")
.to_owned(),
subsys.clone(),
))),
);

Expand Down Expand Up @@ -259,7 +267,13 @@ pub async fn start(worterbuch: CloneableWbApi, config: Config) -> Result<(), std
log::info!("Serving ws endpoint at {proto}://{public_addr}:{port}/ws/{proto_ver}");
}

poem::Server::new(TcpListener::bind(addr)).run(app).await
poem::Server::new(TcpListener::bind(addr))
.run_with_graceful_shutdown(
app,
subsys.on_shutdown_requested(),
Some(Duration::from_secs(1)),
)
.await
}

struct ClientHandler {
Expand Down
19 changes: 15 additions & 4 deletions worterbuch/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
use crate::server::common::CloneableWbApi;
use serde_json::json;
use std::time::Duration;
use tokio::time::{sleep, Instant};
use tokio::{
select,
time::{interval, Instant},
};
use tokio_graceful_shutdown::SubsystemHandle;
use worterbuch_common::error::WorterbuchResult;

pub const SYSTEM_TOPIC_ROOT: &str = "$SYS";
pub const SYSTEM_TOPIC_CLIENTS: &str = "clients";
pub const SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSIONS: &str = "supportedProtocolVersions";
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub async fn track_stats(wb: CloneableWbApi) -> WorterbuchResult<()> {
pub async fn track_stats(wb: CloneableWbApi, subsys: SubsystemHandle) -> WorterbuchResult<()> {
let start = Instant::now();
wb.set(format!("{SYSTEM_TOPIC_ROOT}/version"), json!(VERSION))
.await?;

let mut interval = interval(Duration::from_secs(1));

loop {
update_stats(&wb, start).await?;
sleep(Duration::from_secs(1)).await;
select! {
_ = interval.tick() => update_stats(&wb, start).await?,
_ = subsys.on_shutdown_requested() => break,
}
}

Ok(())
}

async fn update_stats(wb: &CloneableWbApi, start: Instant) -> WorterbuchResult<()> {
Expand Down

0 comments on commit 14b6bab

Please sign in to comment.