Skip to content

Commit

Permalink
dedup and finish
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Oct 1, 2023
1 parent d04ac4c commit f89cf72
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 99 deletions.
20 changes: 13 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"crates/kitsune-http-signatures",
"crates/kitsune-language",
"crates/kitsune-messaging",
"crates/kitsune-retry-policies",
"crates/kitsune-search",
"crates/kitsune-storage",
"crates/kitsune-test",
Expand Down
7 changes: 1 addition & 6 deletions crates/kitsune-messaging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@ edition.workspace = true
[dependencies]
ahash = "0.8.3"
async-trait = "0.1.73"
futures-retry-policies = { version = "0.3.1", features = [
"retry-policies",
"tokio",
"tracing",
] }
futures-util = "0.3.28"
kitsune-retry-policies = { path = "../kitsune-retry-policies" }
pin-project-lite = "0.2.13"
redis = { version = "0.23.3", features = [
"aio",
"connection-manager",
"tokio-rustls-comp",
] }
retry-policies = "0.2.0"
serde = "1.0.188"
simd-json = "0.11.1"
tokio = { version = "1.32.0", features = ["macros", "rt", "sync"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/kitsune-messaging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub type BoxError = Box<dyn Error + Send + Sync>;
/// Type alias for Result, defaulting to [`BoxError`] on the error branch
pub type Result<T, E = BoxError> = std::result::Result<T, E>;

mod util;

pub mod redis;
pub mod tokio_broadcast;

Expand Down
39 changes: 12 additions & 27 deletions crates/kitsune-messaging/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,22 @@
//! Redis implementation
//!
use crate::{MessagingBackend, Result};
use crate::{util::TransparentDebug, MessagingBackend, Result};
use ahash::AHashMap;
use async_trait::async_trait;
use futures_retry_policies::{
retry_policies::RetryPolicies, tokio::RetryFutureExt, tracing::Traced, RetryPolicy, ShouldRetry,
};
use futures_util::{future, stream::BoxStream, StreamExt, TryStreamExt};
use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt};
use redis::{
aio::{ConnectionManager, PubSub},
AsyncCommands, RedisError,
};
use retry_policies::{policies::ExponentialBackoff, Jitter};
use std::{
fmt::Debug,
time::{Duration, SystemTime},
};
use std::fmt::Debug;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::BroadcastStream;

const BROADCAST_CAPACITY: usize = 10;
const CONNECTION_RETRY_DELAY: Duration = Duration::from_secs(5);
const REGISTRATION_QUEUE_SIZE: usize = 50;

fn futures_backoff_policy<Res>() -> impl RetryPolicy<Res>
where
Res: Debug + ShouldRetry,
{
let policy = ExponentialBackoff::builder()
.jitter(Jitter::Bounded)
.build_with_total_retry_duration(Duration::from_secs(24 * 3600))
.for_task_started_at(SystemTime::now().into());

Traced(RetryPolicies::new(policy))
}

macro_rules! handle_err {
($result:expr, $msg:literal $(,)?) => {{
if let Err(error) = { $result } {
Expand Down Expand Up @@ -98,14 +79,18 @@ impl MultiplexActor {
debug!(%pattern, "Failed to find correct receiver");
}
} else {
self.conn = (|| async move {
self.client
.get_async_connection()
.await
.map(|conn| conn.into_pubsub())
self.conn = (|| {
let client = self.client.clone();
async move {
client
.get_async_connection()
.await
.map(|conn| TransparentDebug(conn.into_pubsub()))
}
})
.retry(futures_backoff_policy())
.await
.map(|conn| conn.0)
.unwrap();

for key in self.mapping.keys() {
Expand Down
9 changes: 9 additions & 0 deletions crates/kitsune-messaging/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::fmt;

pub struct TransparentDebug<T>(pub T);

impl<T> fmt::Debug for TransparentDebug<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} {{ ... }}", std::any::type_name::<T>())
}
}
12 changes: 12 additions & 0 deletions crates/kitsune-retry-policies/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "kitsune-retry-policies"
edition.workspace = true
version.workspace = true

[dependencies]
futures-retry-policies = { version = "0.3.1", features = [
"retry-policies",
"tokio",
"tracing",
] }
retry-policies = "0.2.0"
52 changes: 52 additions & 0 deletions crates/kitsune-retry-policies/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use futures_retry_policies::{retry_policies::RetryPolicies, tracing::Traced};
use retry_policies::{policies::ExponentialBackoff, Jitter};
use std::{
fmt::{self, Debug},
ops::ControlFlow,
time::{Duration, SystemTime},
};

pub use futures_retry_policies::{tokio::RetryFutureExt, RetryPolicy};

pub struct NeverRetry<T>(T);

impl<T> Debug for NeverRetry<T>
where
T: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

impl<T> futures_retry_policies::ShouldRetry for NeverRetry<T> {
fn should_retry(&self, _attempts: u32) -> bool {
false
}
}

impl<Res, T> futures_retry_policies::RetryPolicy<Res> for NeverRetry<T>
where
T: futures_retry_policies::RetryPolicy<NeverRetry<Res>>,
{
fn should_retry(&mut self, result: Res) -> ControlFlow<Res, Duration> {
match self.0.should_retry(NeverRetry(result)) {
ControlFlow::Break(NeverRetry(val)) => ControlFlow::Break(val),
ControlFlow::Continue(dur) => ControlFlow::Continue(dur),
}
}
}

pub fn futures_backoff_policy<Res>() -> impl futures_retry_policies::RetryPolicy<Res>
where
Res: Debug,
{
Traced(NeverRetry(RetryPolicies::new(backoff_policy())))
}

pub fn backoff_policy() -> impl retry_policies::RetryPolicy {
ExponentialBackoff::builder()
.jitter(Jitter::Bounded)
.build_with_total_retry_duration(Duration::from_secs(24 * 3600)) // Kill the retrying after 24 hours
.for_task_started_at(SystemTime::now().into())
}
7 changes: 1 addition & 6 deletions kitsune-job-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ athena = { path = "../lib/athena" }
clap = { version = "4.4.6", features = ["derive"] }
color-eyre = "0.6.2"
deadpool-redis = "0.13.0"
futures-retry-policies = { version = "0.3.1", features = [
"retry-policies",
"tokio",
"tracing",
] }
kitsune-core = { path = "../crates/kitsune-core" }
kitsune-db = { path = "../crates/kitsune-db" }
kitsune-retry-policies = { path = "../crates/kitsune-retry-policies" }
mimalloc = "0.1.39"
retry-policies = "0.2.0"
tokio = { version = "1.32.0", features = ["full"] }
toml = "0.8.1"
tracing = "0.1.37"
Expand Down
25 changes: 2 additions & 23 deletions kitsune-job-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,19 @@
extern crate tracing;

use athena::JobQueue;
use futures_retry_policies::{
retry_policies::RetryPolicies, tracing::Traced, RetryPolicy, ShouldRetry,
};
use kitsune_core::{
activitypub::Deliverer,
config::JobQueueConfiguration,
job::{JobRunnerContext, KitsuneContextRepo},
state::State as CoreState,
};
use kitsune_db::PgPool;
use retry_policies::{policies::ExponentialBackoff, Jitter};
use std::{
fmt::Debug,
ops::ControlFlow,
sync::Arc,
time::{Duration, SystemTime},
};
use kitsune_retry_policies::{futures_backoff_policy, RetryPolicy};
use std::{ops::ControlFlow, sync::Arc, time::Duration};
use tokio::task::JoinSet;

const EXECUTION_TIMEOUT_DURATION: Duration = Duration::from_secs(30);

fn futures_backoff_policy<Res>() -> impl RetryPolicy<Res>
where
Res: Debug + ShouldRetry,
{
let policy = ExponentialBackoff::builder()
.jitter(Jitter::Bounded)
.build_with_total_retry_duration(Duration::from_secs(24 * 3600))
.for_task_started_at(SystemTime::now().into());

Traced(RetryPolicies::new(policy))
}

pub fn prepare_job_queue(
db_pool: PgPool,
config: &JobQueueConfiguration,
Expand Down Expand Up @@ -66,7 +46,6 @@ pub async fn run_dispatcher(
let mut job_joinset = JoinSet::new();
loop {
let mut backoff_policy = futures_backoff_policy();

loop {
let result = job_queue
.spawn_jobs(
Expand Down
6 changes: 1 addition & 5 deletions lib/athena/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@ ahash = "0.7.6" # We can not update because of the "redis" crate
async-trait = "0.1.73"
deadpool-redis = "0.13.0"
either = { version = "1.9.0", default-features = false }
futures-retry-policies = { version = "0.3.1", features = [
"retry-policies",
"tokio",
"tracing",
] }
futures-util = { version = "0.3.28", default-features = false }
iso8601-timestamp = { version = "0.2.12", features = ["diesel-pg"] }
kitsune-retry-policies = { path = "../../crates/kitsune-retry-policies" }
once_cell = "1.18.0"
rand = "0.8.5"
redis = { version = "0.23.3", default-features = false, features = [
Expand Down
7 changes: 0 additions & 7 deletions lib/athena/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use futures_retry_policies::ShouldRetry;
use thiserror::Error;

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
Expand All @@ -21,9 +20,3 @@ pub enum Error {
#[error(transparent)]
Uuid(#[from] speedy_uuid::Error),
}

impl ShouldRetry for Error {
fn should_retry(&self, _attempts: u32) -> bool {
false
}
}
19 changes: 1 addition & 18 deletions lib/athena/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use crate::{error::Result, impl_to_redis_args, Error, JobContextRepository, Runn
use ahash::AHashMap;
use deadpool_redis::Pool as RedisPool;
use either::Either;
use futures_retry_policies::{
retry_policies::RetryPolicies, tokio::RetryFutureExt, tracing::Traced, ShouldRetry,
};
use futures_util::StreamExt;
use iso8601_timestamp::Timestamp;
use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt};
use redis::{
aio::ConnectionLike,
streams::{StreamReadOptions, StreamReadReply},
Expand All @@ -18,7 +16,6 @@ use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use speedy_uuid::Uuid;
use std::{
fmt::Debug,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
Expand All @@ -34,20 +31,6 @@ const MIN_IDLE_TIME: Duration = Duration::from_secs(10 * 60);

const MAX_RETRIES: u32 = 10;

fn futures_backoff_policy<Res>() -> impl futures_retry_policies::RetryPolicy<Res>
where
Res: Debug + ShouldRetry,
{
Traced(RetryPolicies::new(backoff_policy()))
}

fn backoff_policy() -> impl retry_policies::RetryPolicy {
ExponentialBackoff::builder()
.jitter(Jitter::Bounded)
.build_with_total_retry_duration(Duration::from_secs(24 * 3600)) // Kill the retrying after 24 hours
.for_task_started_at(SystemTime::now().into())
}

enum JobState<'a> {
Succeeded {
job_id: Uuid,
Expand Down

0 comments on commit f89cf72

Please sign in to comment.