Skip to content

Commit

Permalink
Correct retry logic (#357)
Browse files Browse the repository at this point in the history
* Use exponential backoff with jitter where applicable

* dedup and finish
  • Loading branch information
aumetra authored Oct 1, 2023
1 parent d8da712 commit 7ac431e
Show file tree
Hide file tree
Showing 27 changed files with 278 additions and 187 deletions.
200 changes: 116 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"crates/kitsune-http-signatures",
"crates/kitsune-language",
"crates/kitsune-messaging",
"crates/kitsune-retry-policies",
"crates/kitsune-search",
"crates/kitsune-storage",
"crates/kitsune-test",
Expand All @@ -32,7 +33,6 @@ members = [
"kitsune-search-server",
"kitsune-search-server/proto",
"lib/athena",
"lib/just-retry",
"lib/post-process",
"lib/speedy-uuid",
]
Expand Down
1 change: 0 additions & 1 deletion crates/kitsune-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ hex-simd = "0.8.0"
http = "0.2.9"
img-parts = "0.3.0"
iso8601-timestamp = "0.2.12"
just-retry = { path = "../../lib/just-retry" }
kitsune-cache = { path = "../kitsune-cache" }
kitsune-captcha = { path = "../kitsune-captcha" }
kitsune-db = { path = "../kitsune-db" }
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-email/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ edition.workspace = true
version.workspace = true

[dependencies]
askama = "0.12.0"
askama = "0.12.1"
askama_axum = "0.3.0" # Damn it, cargo. Because "kitsune" uses "askama" with the axum feature, we have to have the crate available here as well..
lettre = { version = "0.10.4", default-features = false, features = [
"builder",
Expand Down
1 change: 1 addition & 0 deletions crates/kitsune-messaging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition.workspace = true
ahash = "0.8.3"
async-trait = "0.1.73"
futures-util = "0.3.28"
kitsune-retry-policies = { path = "../kitsune-retry-policies" }
pin-project-lite = "0.2.13"
redis = { version = "0.23.3", features = [
"aio",
Expand Down
2 changes: 2 additions & 0 deletions crates/kitsune-messaging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub type BoxError = Box<dyn Error + Send + Sync>;
/// Type alias for Result, defaulting to [`BoxError`] on the error branch
pub type Result<T, E = BoxError> = std::result::Result<T, E>;

mod util;

pub mod redis;
pub mod tokio_broadcast;

Expand Down
26 changes: 15 additions & 11 deletions crates/kitsune-messaging/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
//! Redis implementation
//!
use crate::{MessagingBackend, Result};
use crate::{util::TransparentDebug, MessagingBackend, Result};
use ahash::AHashMap;
use async_trait::async_trait;
use futures_util::{future, stream::BoxStream, StreamExt, TryStreamExt};
use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt};
use redis::{
aio::{ConnectionManager, PubSub},
AsyncCommands, RedisError,
};
use std::time::Duration;
use std::fmt::Debug;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::BroadcastStream;

const BROADCAST_CAPACITY: usize = 10;
const CONNECTION_RETRY_DELAY: Duration = Duration::from_secs(5);
const REGISTRATION_QUEUE_SIZE: usize = 50;

macro_rules! handle_err {
Expand Down Expand Up @@ -79,15 +79,19 @@ impl MultiplexActor {
debug!(%pattern, "Failed to find correct receiver");
}
} else {
self.conn = loop {
match self.client.get_async_connection().await {
Ok(conn) => break conn.into_pubsub(),
Err(err) => {
error!(error = %err, "Failed to connect to Redis instance");
tokio::time::sleep(CONNECTION_RETRY_DELAY).await;
}
self.conn = (|| {
let client = self.client.clone();
async move {
client
.get_async_connection()
.await
.map(|conn| TransparentDebug(conn.into_pubsub()))
}
};
})
.retry(futures_backoff_policy())
.await
.map(|conn| conn.0)
.unwrap();

for key in self.mapping.keys() {
handle_err!(
Expand Down
9 changes: 9 additions & 0 deletions crates/kitsune-messaging/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::fmt;

pub struct TransparentDebug<T>(pub T);

impl<T> fmt::Debug for TransparentDebug<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} {{ ... }}", std::any::type_name::<T>())
}
}
12 changes: 12 additions & 0 deletions crates/kitsune-retry-policies/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "kitsune-retry-policies"
edition.workspace = true
version.workspace = true

[dependencies]
futures-retry-policies = { version = "0.3.1", features = [
"retry-policies",
"tokio",
"tracing",
] }
retry-policies = "0.2.0"
52 changes: 52 additions & 0 deletions crates/kitsune-retry-policies/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use futures_retry_policies::{retry_policies::RetryPolicies, tracing::Traced};
use retry_policies::{policies::ExponentialBackoff, Jitter};
use std::{
fmt::{self, Debug},
ops::ControlFlow,
time::{Duration, SystemTime},
};

pub use futures_retry_policies::{tokio::RetryFutureExt, RetryPolicy};

pub struct NeverRetry<T>(T);

impl<T> Debug for NeverRetry<T>
where
T: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

impl<T> futures_retry_policies::ShouldRetry for NeverRetry<T> {
fn should_retry(&self, _attempts: u32) -> bool {
false
}
}

impl<Res, T> futures_retry_policies::RetryPolicy<Res> for NeverRetry<T>
where
T: futures_retry_policies::RetryPolicy<NeverRetry<Res>>,
{
fn should_retry(&mut self, result: Res) -> ControlFlow<Res, Duration> {
match self.0.should_retry(NeverRetry(result)) {
ControlFlow::Break(NeverRetry(val)) => ControlFlow::Break(val),
ControlFlow::Continue(dur) => ControlFlow::Continue(dur),
}
}
}

pub fn futures_backoff_policy<Res>() -> impl futures_retry_policies::RetryPolicy<Res>
where
Res: Debug,
{
Traced(NeverRetry(RetryPolicies::new(backoff_policy())))
}

pub fn backoff_policy() -> impl retry_policies::RetryPolicy {
ExponentialBackoff::builder()
.jitter(Jitter::Bounded)
.build_with_total_retry_duration(Duration::from_secs(24 * 3600)) // Kill the retrying after 24 hours
.for_task_started_at(SystemTime::now().into())
}
2 changes: 1 addition & 1 deletion crates/kitsune-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tracing = "0.1.37"
# "kitsune-search" feature
bytes = { version = "1.5.0", optional = true }
kitsune-search-proto = { path = "../../kitsune-search-server/proto", optional = true }
tonic = { version = "0.10.1", optional = true }
tonic = { version = "0.10.2", optional = true }

# "meilisearch" feature
meilisearch-sdk = { version = "0.24.2", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/kitsune-storage/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod test {
use std::str;
use tempfile::TempDir;

const TEST_TEXT: &str = r#"
const TEST_TEXT: &str = r"
新時代はこの未来だ
世界中全部 変えてしまえば 変えてしまえば
ジャマモノ やなもの なんて消して
Expand Down Expand Up @@ -99,7 +99,7 @@ mod test {
果てしない音楽がもっと届くように
夢を見せるよ 夢を見せるよ 新時代だ
Ooh
新時代だ"#;
新時代だ";

#[tokio::test]
async fn basic() {
Expand Down
2 changes: 1 addition & 1 deletion kitsune-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition.workspace = true
build = "build.rs"

[dependencies]
clap = { version = "4.4.5", features = ["derive"] }
clap = { version = "4.4.6", features = ["derive"] }
color-eyre = "0.6.2"
diesel = "2.1.2"
diesel-async = "0.4.1"
Expand Down
4 changes: 2 additions & 2 deletions kitsune-job-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ version.workspace = true

[dependencies]
athena = { path = "../lib/athena" }
clap = { version = "4.4.5", features = ["derive"] }
clap = { version = "4.4.6", features = ["derive"] }
color-eyre = "0.6.2"
deadpool-redis = "0.13.0"
just-retry = { path = "../lib/just-retry" }
kitsune-core = { path = "../crates/kitsune-core" }
kitsune-db = { path = "../crates/kitsune-db" }
kitsune-retry-policies = { path = "../crates/kitsune-retry-policies" }
mimalloc = "0.1.39"
tokio = { version = "1.32.0", features = ["full"] }
toml = "0.8.1"
Expand Down
34 changes: 21 additions & 13 deletions kitsune-job-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use kitsune_core::{
state::State as CoreState,
};
use kitsune_db::PgPool;
use std::{sync::Arc, time::Duration};
use kitsune_retry_policies::{futures_backoff_policy, RetryPolicy};
use std::{ops::ControlFlow, sync::Arc, time::Duration};
use tokio::task::JoinSet;

const EXECUTION_TIMEOUT_DURATION: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -44,19 +45,26 @@ pub async fn run_dispatcher(

let mut job_joinset = JoinSet::new();
loop {
while let Err(error) = job_queue
.spawn_jobs(
num_job_workers - job_joinset.len(),
Arc::clone(&ctx),
&mut job_joinset,
)
.await
{
error!(?error, "failed to spawn more jobs");
just_retry::sleep_a_bit().await;
let mut backoff_policy = futures_backoff_policy();
loop {
let result = job_queue
.spawn_jobs(
num_job_workers - job_joinset.len(),
Arc::clone(&ctx),
&mut job_joinset,
)
.await;

if let ControlFlow::Continue(duration) = backoff_policy.should_retry(result) {
tokio::time::sleep(duration).await;
} else {
break;
}
}

let join_all = async { while job_joinset.join_next().await.is_some() {} };
let _ = tokio::time::timeout(EXECUTION_TIMEOUT_DURATION, join_all).await;
let _ = tokio::time::timeout(EXECUTION_TIMEOUT_DURATION, async {
while job_joinset.join_next().await.is_some() {}
})
.await;
}
}
4 changes: 2 additions & 2 deletions kitsune-search-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ serde = { version = "1.0.188", features = ["derive"] }
tantivy = "0.21.0"
time = "0.3.29"
tokio = { version = "1.32.0", features = ["full"] }
tonic = "0.10.1"
tonic-health = "0.10.1"
tonic = "0.10.2"
tonic-health = "0.10.2"
tower-http = { version = "0.4.4", features = ["add-extension", "trace"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
Expand Down
4 changes: 2 additions & 2 deletions kitsune-search-server/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ edition.workspace = true
[dependencies]
prost = "0.12.1"
serde = { version = "1.0.188", features = ["derive"] }
tonic = { version = "0.10.1", default-features = false, features = [
tonic = { version = "0.10.2", default-features = false, features = [
"codegen",
"transport",
"prost",
] }

[build-dependencies]
protoc-bin-vendored = { version = "3.0.0", optional = true }
tonic-build = "0.10.1"
tonic-build = "0.10.2"

[features]
default = []
Expand Down
4 changes: 2 additions & 2 deletions kitsune-search-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer as _, Regi
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

const STARTUP_FIGLET: &str = r#"
const STARTUP_FIGLET: &str = r"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃
┃ ██╗ ██╗██╗████████╗███████╗██╗ ██╗███╗ ██╗███████╗ ███████╗███████╗ █████╗ ██████╗ ██████╗██╗ ██╗ ┃
Expand All @@ -31,7 +31,7 @@ const STARTUP_FIGLET: &str = r#"
┃ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═══╝╚══════╝ ╚══════╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝ ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
"#;
";

#[tokio::main]
async fn main() {
Expand Down
6 changes: 3 additions & 3 deletions kitsune/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ build = "build.rs"
athena = { version = "0.0.1-pre.3", path = "../lib/athena" }
argon2 = { version = "0.5.2", features = ["std"] }
autometrics = { version = "0.6.0", default-features = false }
askama = { version = "0.12.0", features = [
askama = { version = "0.12.1", features = [
"with-axum",
], default-features = false }
askama_axum = "0.3.0"
Expand All @@ -28,7 +28,7 @@ axum-extra = { version = "0.8.0", features = [
axum-flash = "0.7.0"
bytes = "1.5.0"
chrono = { version = "0.4.31", default-features = false }
clap = { version = "4.4.5", features = ["derive"] }
clap = { version = "4.4.6", features = ["derive"] }
color-eyre = "0.6.2"
const-oid = { version = "0.9.5", features = ["db"] }
deadpool-redis = "0.13.0"
Expand All @@ -39,7 +39,7 @@ eyre = "0.6.8"
futures-util = "0.3.28"
headers = "0.3.9"
http = "0.2.9"
human-panic = "1.2.0"
human-panic = "1.2.1"
hyper = { version = "0.14.27", features = ["deprecated"] }
iso8601-timestamp = "0.2.12"
mimalloc = "0.1.39"
Expand Down
4 changes: 2 additions & 2 deletions kitsune/src/consts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub const API_DEFAULT_LIMIT: usize = 20;
pub const STARTUP_FIGLET: &str = r#"
pub const STARTUP_FIGLET: &str = r"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃
┃ ██╗ ██╗██╗████████╗███████╗██╗ ██╗███╗ ██╗███████╗ ┃
Expand All @@ -12,7 +12,7 @@ pub const STARTUP_FIGLET: &str = r#"
┃ ActivityPub-federated microblogging ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
"#;
";

#[must_use]
pub fn default_limit<T>() -> T
Expand Down
6 changes: 3 additions & 3 deletions kitsune/src/http/handler/well_known/webfinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ mod tests {
.unwrap();
let resource = match response {
Either::E1(Json(resource)) => resource,
Either::E2(status) => panic!("Unexpected status code: {}", status),
Either::E2(status) => panic!("Unexpected status code: {status}"),
};

assert_eq!(resource.subject, "acct:[email protected]");
Expand Down Expand Up @@ -191,7 +191,7 @@ mod tests {
.unwrap();
let resource = match response {
Either::E1(Json(resource)) => resource,
Either::E2(status) => panic!("Unexpected status code: {}", status),
Either::E2(status) => panic!("Unexpected status code: {status}"),
};

assert_eq!(resource.subject, "acct:[email protected]");
Expand All @@ -203,7 +203,7 @@ mod tests {
let response = get(db_pool, url_service, Query(query)).await.unwrap();
let resource = match response {
Either::E1(Json(resource)) => resource,
Either::E2(status) => panic!("Unexpected status code: {}", status),
Either::E2(status) => panic!("Unexpected status code: {status}"),
};

assert_eq!(resource.subject, "acct:[email protected]");
Expand Down
Loading

0 comments on commit 7ac431e

Please sign in to comment.