From f89cf7263605cae7afcbe36ec1ea2bf044463e81 Mon Sep 17 00:00:00 2001 From: aumetra Date: Sun, 1 Oct 2023 17:27:19 +0200 Subject: [PATCH] dedup and finish --- Cargo.lock | 20 +++++---- Cargo.toml | 1 + crates/kitsune-messaging/Cargo.toml | 7 +--- crates/kitsune-messaging/src/lib.rs | 2 + crates/kitsune-messaging/src/redis.rs | 39 ++++++------------ crates/kitsune-messaging/src/util.rs | 9 ++++ crates/kitsune-retry-policies/Cargo.toml | 12 ++++++ crates/kitsune-retry-policies/src/lib.rs | 52 ++++++++++++++++++++++++ kitsune-job-runner/Cargo.toml | 7 +--- kitsune-job-runner/src/lib.rs | 25 +----------- lib/athena/Cargo.toml | 6 +-- lib/athena/src/error.rs | 7 ---- lib/athena/src/queue/mod.rs | 19 +-------- 13 files changed, 107 insertions(+), 99 deletions(-) create mode 100644 crates/kitsune-messaging/src/util.rs create mode 100644 crates/kitsune-retry-policies/Cargo.toml create mode 100644 crates/kitsune-retry-policies/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 9bad8755b..603b807d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -442,9 +442,9 @@ dependencies = [ "async-trait", "deadpool-redis", "either", - "futures-retry-policies", "futures-util", "iso8601-timestamp", + "kitsune-retry-policies", "once_cell", "rand", "redis", @@ -2181,9 +2181,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ "errno-dragonfly", "libc", @@ -3444,11 +3444,10 @@ dependencies = [ "clap", "color-eyre", "deadpool-redis", - "futures-retry-policies", "kitsune-core", "kitsune-db", + "kitsune-retry-policies", "mimalloc", - "retry-policies", "tokio", "toml 0.8.1", "tracing", @@ -3472,11 +3471,10 @@ version = "0.0.1-pre.3" dependencies = [ "ahash 0.8.3", "async-trait", - "futures-retry-policies", "futures-util", + "kitsune-retry-policies", "pin-project-lite", "redis", - "retry-policies", "serde", "simd-json", "tokio", @@ -3484,6 +3482,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "kitsune-retry-policies" +version = "0.0.1-pre.3" +dependencies = [ + "futures-retry-policies", + "retry-policies", +] + [[package]] name = "kitsune-search" version = "0.0.1-pre.3" diff --git a/Cargo.toml b/Cargo.toml index 549a87906..4471e0487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "crates/kitsune-http-signatures", "crates/kitsune-language", "crates/kitsune-messaging", + "crates/kitsune-retry-policies", "crates/kitsune-search", "crates/kitsune-storage", "crates/kitsune-test", diff --git a/crates/kitsune-messaging/Cargo.toml b/crates/kitsune-messaging/Cargo.toml index 6d5a1644b..05b1a797e 100644 --- a/crates/kitsune-messaging/Cargo.toml +++ b/crates/kitsune-messaging/Cargo.toml @@ -6,19 +6,14 @@ edition.workspace = true [dependencies] ahash = "0.8.3" async-trait = "0.1.73" -futures-retry-policies = { version = "0.3.1", features = [ - "retry-policies", - "tokio", - "tracing", -] } futures-util = "0.3.28" +kitsune-retry-policies = { path = "../kitsune-retry-policies" } pin-project-lite = "0.2.13" redis = { version = "0.23.3", features = [ "aio", "connection-manager", "tokio-rustls-comp", ] } -retry-policies = "0.2.0" serde = "1.0.188" simd-json = "0.11.1" tokio = { version = "1.32.0", features = ["macros", "rt", "sync"] } diff --git a/crates/kitsune-messaging/src/lib.rs b/crates/kitsune-messaging/src/lib.rs index 7b3e18812..d72a7a8a3 100644 --- a/crates/kitsune-messaging/src/lib.rs +++ b/crates/kitsune-messaging/src/lib.rs @@ -28,6 +28,8 @@ pub type BoxError = Box; /// Type alias for Result, defaulting to [`BoxError`] on the error branch pub type Result = std::result::Result; +mod util; + pub mod redis; pub mod tokio_broadcast; diff --git a/crates/kitsune-messaging/src/redis.rs b/crates/kitsune-messaging/src/redis.rs index e7ddae559..843b5629a 100644 --- a/crates/kitsune-messaging/src/redis.rs +++ b/crates/kitsune-messaging/src/redis.rs @@ -2,41 +2,22 @@ //! Redis implementation //! -use crate::{MessagingBackend, Result}; +use crate::{util::TransparentDebug, MessagingBackend, Result}; use ahash::AHashMap; use async_trait::async_trait; -use futures_retry_policies::{ - retry_policies::RetryPolicies, tokio::RetryFutureExt, tracing::Traced, RetryPolicy, ShouldRetry, -}; use futures_util::{future, stream::BoxStream, StreamExt, TryStreamExt}; +use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt}; use redis::{ aio::{ConnectionManager, PubSub}, AsyncCommands, RedisError, }; -use retry_policies::{policies::ExponentialBackoff, Jitter}; -use std::{ - fmt::Debug, - time::{Duration, SystemTime}, -}; +use std::fmt::Debug; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_stream::wrappers::BroadcastStream; const BROADCAST_CAPACITY: usize = 10; -const CONNECTION_RETRY_DELAY: Duration = Duration::from_secs(5); const REGISTRATION_QUEUE_SIZE: usize = 50; -fn futures_backoff_policy() -> impl RetryPolicy -where - Res: Debug + ShouldRetry, -{ - let policy = ExponentialBackoff::builder() - .jitter(Jitter::Bounded) - .build_with_total_retry_duration(Duration::from_secs(24 * 3600)) - .for_task_started_at(SystemTime::now().into()); - - Traced(RetryPolicies::new(policy)) -} - macro_rules! handle_err { ($result:expr, $msg:literal $(,)?) => {{ if let Err(error) = { $result } { @@ -98,14 +79,18 @@ impl MultiplexActor { debug!(%pattern, "Failed to find correct receiver"); } } else { - self.conn = (|| async move { - self.client - .get_async_connection() - .await - .map(|conn| conn.into_pubsub()) + self.conn = (|| { + let client = self.client.clone(); + async move { + client + .get_async_connection() + .await + .map(|conn| TransparentDebug(conn.into_pubsub())) + } }) .retry(futures_backoff_policy()) .await + .map(|conn| conn.0) .unwrap(); for key in self.mapping.keys() { diff --git a/crates/kitsune-messaging/src/util.rs b/crates/kitsune-messaging/src/util.rs new file mode 100644 index 000000000..d2afa3d10 --- /dev/null +++ b/crates/kitsune-messaging/src/util.rs @@ -0,0 +1,9 @@ +use std::fmt; + +pub struct TransparentDebug(pub T); + +impl fmt::Debug for TransparentDebug { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} {{ ... }}", std::any::type_name::()) + } +} diff --git a/crates/kitsune-retry-policies/Cargo.toml b/crates/kitsune-retry-policies/Cargo.toml new file mode 100644 index 000000000..ebd31ba7d --- /dev/null +++ b/crates/kitsune-retry-policies/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "kitsune-retry-policies" +edition.workspace = true +version.workspace = true + +[dependencies] +futures-retry-policies = { version = "0.3.1", features = [ + "retry-policies", + "tokio", + "tracing", +] } +retry-policies = "0.2.0" diff --git a/crates/kitsune-retry-policies/src/lib.rs b/crates/kitsune-retry-policies/src/lib.rs new file mode 100644 index 000000000..49b7e519d --- /dev/null +++ b/crates/kitsune-retry-policies/src/lib.rs @@ -0,0 +1,52 @@ +use futures_retry_policies::{retry_policies::RetryPolicies, tracing::Traced}; +use retry_policies::{policies::ExponentialBackoff, Jitter}; +use std::{ + fmt::{self, Debug}, + ops::ControlFlow, + time::{Duration, SystemTime}, +}; + +pub use futures_retry_policies::{tokio::RetryFutureExt, RetryPolicy}; + +pub struct NeverRetry(T); + +impl Debug for NeverRetry +where + T: Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl futures_retry_policies::ShouldRetry for NeverRetry { + fn should_retry(&self, _attempts: u32) -> bool { + false + } +} + +impl futures_retry_policies::RetryPolicy for NeverRetry +where + T: futures_retry_policies::RetryPolicy>, +{ + fn should_retry(&mut self, result: Res) -> ControlFlow { + match self.0.should_retry(NeverRetry(result)) { + ControlFlow::Break(NeverRetry(val)) => ControlFlow::Break(val), + ControlFlow::Continue(dur) => ControlFlow::Continue(dur), + } + } +} + +pub fn futures_backoff_policy() -> impl futures_retry_policies::RetryPolicy +where + Res: Debug, +{ + Traced(NeverRetry(RetryPolicies::new(backoff_policy()))) +} + +pub fn backoff_policy() -> impl retry_policies::RetryPolicy { + ExponentialBackoff::builder() + .jitter(Jitter::Bounded) + .build_with_total_retry_duration(Duration::from_secs(24 * 3600)) // Kill the retrying after 24 hours + .for_task_started_at(SystemTime::now().into()) +} diff --git a/kitsune-job-runner/Cargo.toml b/kitsune-job-runner/Cargo.toml index 1a4d3eea7..ed6704f05 100644 --- a/kitsune-job-runner/Cargo.toml +++ b/kitsune-job-runner/Cargo.toml @@ -8,15 +8,10 @@ athena = { path = "../lib/athena" } clap = { version = "4.4.6", features = ["derive"] } color-eyre = "0.6.2" deadpool-redis = "0.13.0" -futures-retry-policies = { version = "0.3.1", features = [ - "retry-policies", - "tokio", - "tracing", -] } kitsune-core = { path = "../crates/kitsune-core" } kitsune-db = { path = "../crates/kitsune-db" } +kitsune-retry-policies = { path = "../crates/kitsune-retry-policies" } mimalloc = "0.1.39" -retry-policies = "0.2.0" tokio = { version = "1.32.0", features = ["full"] } toml = "0.8.1" tracing = "0.1.37" diff --git a/kitsune-job-runner/src/lib.rs b/kitsune-job-runner/src/lib.rs index dcea84b0a..355988f49 100644 --- a/kitsune-job-runner/src/lib.rs +++ b/kitsune-job-runner/src/lib.rs @@ -2,9 +2,6 @@ extern crate tracing; use athena::JobQueue; -use futures_retry_policies::{ - retry_policies::RetryPolicies, tracing::Traced, RetryPolicy, ShouldRetry, -}; use kitsune_core::{ activitypub::Deliverer, config::JobQueueConfiguration, @@ -12,29 +9,12 @@ use kitsune_core::{ state::State as CoreState, }; use kitsune_db::PgPool; -use retry_policies::{policies::ExponentialBackoff, Jitter}; -use std::{ - fmt::Debug, - ops::ControlFlow, - sync::Arc, - time::{Duration, SystemTime}, -}; +use kitsune_retry_policies::{futures_backoff_policy, RetryPolicy}; +use std::{ops::ControlFlow, sync::Arc, time::Duration}; use tokio::task::JoinSet; const EXECUTION_TIMEOUT_DURATION: Duration = Duration::from_secs(30); -fn futures_backoff_policy() -> impl RetryPolicy -where - Res: Debug + ShouldRetry, -{ - let policy = ExponentialBackoff::builder() - .jitter(Jitter::Bounded) - .build_with_total_retry_duration(Duration::from_secs(24 * 3600)) - .for_task_started_at(SystemTime::now().into()); - - Traced(RetryPolicies::new(policy)) -} - pub fn prepare_job_queue( db_pool: PgPool, config: &JobQueueConfiguration, @@ -66,7 +46,6 @@ pub async fn run_dispatcher( let mut job_joinset = JoinSet::new(); loop { let mut backoff_policy = futures_backoff_policy(); - loop { let result = job_queue .spawn_jobs( diff --git a/lib/athena/Cargo.toml b/lib/athena/Cargo.toml index 8884ae012..4253714d1 100644 --- a/lib/athena/Cargo.toml +++ b/lib/athena/Cargo.toml @@ -9,13 +9,9 @@ ahash = "0.7.6" # We can not update because of the "redis" crate async-trait = "0.1.73" deadpool-redis = "0.13.0" either = { version = "1.9.0", default-features = false } -futures-retry-policies = { version = "0.3.1", features = [ - "retry-policies", - "tokio", - "tracing", -] } futures-util = { version = "0.3.28", default-features = false } iso8601-timestamp = { version = "0.2.12", features = ["diesel-pg"] } +kitsune-retry-policies = { path = "../../crates/kitsune-retry-policies" } once_cell = "1.18.0" rand = "0.8.5" redis = { version = "0.23.3", default-features = false, features = [ diff --git a/lib/athena/src/error.rs b/lib/athena/src/error.rs index 7290fb13f..27df17136 100644 --- a/lib/athena/src/error.rs +++ b/lib/athena/src/error.rs @@ -1,4 +1,3 @@ -use futures_retry_policies::ShouldRetry; use thiserror::Error; pub type BoxError = Box; @@ -21,9 +20,3 @@ pub enum Error { #[error(transparent)] Uuid(#[from] speedy_uuid::Error), } - -impl ShouldRetry for Error { - fn should_retry(&self, _attempts: u32) -> bool { - false - } -} diff --git a/lib/athena/src/queue/mod.rs b/lib/athena/src/queue/mod.rs index 7bb6c3cbd..0da1d2987 100644 --- a/lib/athena/src/queue/mod.rs +++ b/lib/athena/src/queue/mod.rs @@ -3,11 +3,9 @@ use crate::{error::Result, impl_to_redis_args, Error, JobContextRepository, Runn use ahash::AHashMap; use deadpool_redis::Pool as RedisPool; use either::Either; -use futures_retry_policies::{ - retry_policies::RetryPolicies, tokio::RetryFutureExt, tracing::Traced, ShouldRetry, -}; use futures_util::StreamExt; use iso8601_timestamp::Timestamp; +use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt}; use redis::{ aio::ConnectionLike, streams::{StreamReadOptions, StreamReadReply}, @@ -18,7 +16,6 @@ use serde::{Deserialize, Serialize}; use smol_str::SmolStr; use speedy_uuid::Uuid; use std::{ - fmt::Debug, str::FromStr, sync::Arc, time::{Duration, SystemTime}, @@ -34,20 +31,6 @@ const MIN_IDLE_TIME: Duration = Duration::from_secs(10 * 60); const MAX_RETRIES: u32 = 10; -fn futures_backoff_policy() -> impl futures_retry_policies::RetryPolicy -where - Res: Debug + ShouldRetry, -{ - Traced(RetryPolicies::new(backoff_policy())) -} - -fn backoff_policy() -> impl retry_policies::RetryPolicy { - ExponentialBackoff::builder() - .jitter(Jitter::Bounded) - .build_with_total_retry_duration(Duration::from_secs(24 * 3600)) // Kill the retrying after 24 hours - .for_task_started_at(SystemTime::now().into()) -} - enum JobState<'a> { Succeeded { job_id: Uuid,