diff --git a/Cargo.lock b/Cargo.lock index 3f326aeb4..603b807d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,9 +116,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -130,15 +130,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" dependencies = [ "utf8parse", ] @@ -154,9 +154,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -200,9 +200,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "askama" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47cbc3cf73fa8d9833727bbee4835ba5c421a0d65b72daf9a7b5d0e0f9cfb57e" +checksum = "b79091df18a97caea757e28cd2d5fda49c6cd4bd01ddffd7ff01ace0c0ad2c28" dependencies = [ "askama_derive", "askama_escape", @@ -224,14 +224,14 @@ dependencies = [ [[package]] name = "askama_derive" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c22fbe0413545c098358e56966ff22cdd039e10215ae213cfbd65032b119fc94" +checksum = "9a0fc7dcf8bd4ead96b1d36b41df47c14beedf7b0301fc543d8f2384e66a2ec0" dependencies = [ + "askama_parser", "basic-toml", "mime", "mime_guess", - "nom", "proc-macro2", "quote", "serde", @@ -244,6 +244,15 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "619743e34b5ba4e9703bba34deac3427c72507c7159f5fd030aea8cac0cfe341" +[[package]] +name = "askama_parser" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c281ace75bcec9b4a608a6dbe5518d7bc97e8760476943bbf656e0a91b2a7c4d" +dependencies = [ + "nom", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -288,7 +297,7 @@ dependencies = [ "fnv", "futures-util", "http", - "indexmap 2.0.0", + "indexmap 2.0.2", "mime", "multer", "num-traits 0.2.16", @@ -361,7 +370,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d74240f9daa8c1e8f73e9cfcc338d20a88d00bbeb83ded49ce8e5b4dcec0f5" dependencies = [ "bytes", - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_json", ] @@ -433,13 +442,13 @@ dependencies = [ "async-trait", "deadpool-redis", "either", - "exponential-backoff", "futures-util", "iso8601-timestamp", - "just-retry", + "kitsune-retry-policies", "once_cell", "rand", "redis", + "retry-policies", "serde", "simd-json", "smol_str", @@ -1106,9 +1115,9 @@ dependencies = [ [[package]] name = "brotli" -version = "3.3.4" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -1117,9 +1126,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "2.3.4" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744" +checksum = "da74e2b81409b1b743f8f0c62cc6254afefb8b8e50bbfe3735550f7aeefa3448" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -1296,9 +1305,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -1306,9 +1315,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -1762,7 +1771,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "lock_api", "once_cell", "parking_lot_core", @@ -2172,9 +2181,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ "errno-dragonfly", "libc", @@ -2206,15 +2215,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "exponential-backoff" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47f78d87d930eee4b5686a2ab032de499c72bd1e954b84262bb03492a0f932cd" -dependencies = [ - "rand", -] - [[package]] name = "eyre" version = "0.6.8" @@ -2412,6 +2412,28 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "futures-retry-policies" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6deb4087bf085042b95438ac07d98b85976f22549c4ae8dee6b44a1ff094b049" +dependencies = [ + "chrono", + "futures-retry-policies-core", + "retry-policies", + "tokio", + "tracing", +] + +[[package]] +name = "futures-retry-policies-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "222776451f4c0381948cc6ab590f674e0216e99738c436e4f09bc9754d7f9933" +dependencies = [ + "pin-project", +] + [[package]] name = "futures-sink" version = "0.3.28" @@ -2615,9 +2637,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" dependencies = [ "ahash 0.8.3", "allocator-api2", @@ -2775,9 +2797,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "human-panic" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2df2fb4e13fa697d21d93061ebcbbd876f5ef643b48ff59cfab57a726ef140" +checksum = "b82da652938b83f94cfdaaf9ae7aaadb8430d84b0dfda226998416318727eac2" dependencies = [ "anstream", "anstyle", @@ -2938,20 +2960,20 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "serde", ] [[package]] name = "insta" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e02c584f4595792d09509a94cdb92a3cef7592b1eb2d9877ee6f527062d0ea" +checksum = "1aa511b2e298cd49b1856746f6bb73e17036bcd66b25f5e92cdcdbec9bd75686" dependencies = [ "console", "globset", @@ -3113,15 +3135,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "just-retry" -version = "0.0.1-pre.3" -dependencies = [ - "rand", - "tokio", - "tracing", -] - [[package]] name = "kitsune" version = "0.0.1-pre.3" @@ -3290,7 +3303,6 @@ dependencies = [ "hyper", "img-parts", "iso8601-timestamp", - "just-retry", "kitsune-cache", "kitsune-captcha", "kitsune-db", @@ -3432,9 +3444,9 @@ dependencies = [ "clap", "color-eyre", "deadpool-redis", - "just-retry", "kitsune-core", "kitsune-db", + "kitsune-retry-policies", "mimalloc", "tokio", "toml 0.8.1", @@ -3460,6 +3472,7 @@ dependencies = [ "ahash 0.8.3", "async-trait", "futures-util", + "kitsune-retry-policies", "pin-project-lite", "redis", "serde", @@ -3469,6 +3482,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "kitsune-retry-policies" +version = "0.0.1-pre.3" +dependencies = [ + "futures-retry-policies", + "retry-policies", +] + [[package]] name = "kitsune-search" version = "0.0.1-pre.3" @@ -3759,9 +3780,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" [[package]] name = "lock_api" @@ -3837,7 +3858,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" dependencies = [ - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -4166,7 +4187,7 @@ version = "2.0.0-rc4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e9f3b7fb4aae09e0ed66b64074297c96b323e7b9558a34952391c8c6dca543e" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "mrml-macros", "rustc-hash", "thiserror", @@ -4658,7 +4679,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.0", + "indexmap 2.0.2", ] [[package]] @@ -5259,13 +5280,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", + "regex-automata 0.3.9", "regex-syntax 0.7.5", ] @@ -5280,9 +5301,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", @@ -5307,6 +5328,17 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" +[[package]] +name = "retry-policies" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a715dc4d0e8aea3085a9a94d76e79c79c7df7c9f6be609da841a6d2489ca3687" +dependencies = [ + "anyhow", + "chrono", + "rand", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -5456,9 +5488,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.14" +version = "0.38.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f" +checksum = "d2f9da0cbd88f9f09e7814e388301c8414c51c62aa6ce1e4b5c551d49d96e531" dependencies = [ "bitflags 2.4.0", "errno", @@ -5700,7 +5732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde65b75f2603066b78d6fa239b2c07b43e06ead09435f60554d3912962b4a3c" dependencies = [ "form_urlencoded", - "indexmap 2.0.0", + "indexmap 2.0.2", "itoa", "ryu", "serde", @@ -5778,7 +5810,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_json", "serde_with_macros", @@ -5861,9 +5893,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "c1b21f559e07218024e7e9f90f96f601825397de0e25420135f7f952453fed0b" dependencies = [ "lazy_static", ] @@ -6604,7 +6636,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -6617,7 +6649,7 @@ version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca676d9ba1a322c1b64eb8045a5ec5c0cfb0c9d08e15e9ff622589ad5221c8fe" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -6626,9 +6658,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c00bc15e49625f3d2f20b17082601e5e17cf27ead69e805174026c194b6664" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", @@ -6653,9 +6685,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9d37bb15da06ae9bb945963066baca6561b505af93a52e949a85d28558459a2" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ "prettyplease", "proc-macro2", @@ -6666,9 +6698,9 @@ dependencies = [ [[package]] name = "tonic-health" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e0181bc19dca6407fcd0323b97e9f0867ce3b020be84661a5ade2821360387" +checksum = "f80db390246dfb46553481f6024f0082ba00178ea495dbb99e70ba9a4fafb5e1" dependencies = [ "async-stream", "prost", @@ -6969,7 +7001,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d82b1bc5417102a73e8464c686eef947bdfb99fcdfc0a4f228e81afa9526470a" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_json", "utoipa-gen", @@ -7412,9 +7444,9 @@ dependencies = [ [[package]] name = "xmlparser" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" [[package]] name = "yaml-rust" diff --git a/Cargo.toml b/Cargo.toml index b0f50eed0..4471e0487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "crates/kitsune-http-signatures", "crates/kitsune-language", "crates/kitsune-messaging", + "crates/kitsune-retry-policies", "crates/kitsune-search", "crates/kitsune-storage", "crates/kitsune-test", @@ -32,7 +33,6 @@ members = [ "kitsune-search-server", "kitsune-search-server/proto", "lib/athena", - "lib/just-retry", "lib/post-process", "lib/speedy-uuid", ] diff --git a/crates/kitsune-core/Cargo.toml b/crates/kitsune-core/Cargo.toml index 0e60e043f..c41182d49 100644 --- a/crates/kitsune-core/Cargo.toml +++ b/crates/kitsune-core/Cargo.toml @@ -36,7 +36,6 @@ hex-simd = "0.8.0" http = "0.2.9" img-parts = "0.3.0" iso8601-timestamp = "0.2.12" -just-retry = { path = "../../lib/just-retry" } kitsune-cache = { path = "../kitsune-cache" } kitsune-captcha = { path = "../kitsune-captcha" } kitsune-db = { path = "../kitsune-db" } diff --git a/crates/kitsune-email/Cargo.toml b/crates/kitsune-email/Cargo.toml index 91428c8ac..3104094a9 100644 --- a/crates/kitsune-email/Cargo.toml +++ b/crates/kitsune-email/Cargo.toml @@ -4,7 +4,7 @@ edition.workspace = true version.workspace = true [dependencies] -askama = "0.12.0" +askama = "0.12.1" askama_axum = "0.3.0" # Damn it, cargo. Because "kitsune" uses "askama" with the axum feature, we have to have the crate available here as well.. lettre = { version = "0.10.4", default-features = false, features = [ "builder", diff --git a/crates/kitsune-messaging/Cargo.toml b/crates/kitsune-messaging/Cargo.toml index e06fc0c71..05b1a797e 100644 --- a/crates/kitsune-messaging/Cargo.toml +++ b/crates/kitsune-messaging/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true ahash = "0.8.3" async-trait = "0.1.73" futures-util = "0.3.28" +kitsune-retry-policies = { path = "../kitsune-retry-policies" } pin-project-lite = "0.2.13" redis = { version = "0.23.3", features = [ "aio", diff --git a/crates/kitsune-messaging/src/lib.rs b/crates/kitsune-messaging/src/lib.rs index 7b3e18812..d72a7a8a3 100644 --- a/crates/kitsune-messaging/src/lib.rs +++ b/crates/kitsune-messaging/src/lib.rs @@ -28,6 +28,8 @@ pub type BoxError = Box; /// Type alias for Result, defaulting to [`BoxError`] on the error branch pub type Result = std::result::Result; +mod util; + pub mod redis; pub mod tokio_broadcast; diff --git a/crates/kitsune-messaging/src/redis.rs b/crates/kitsune-messaging/src/redis.rs index f8db7bfd7..843b5629a 100644 --- a/crates/kitsune-messaging/src/redis.rs +++ b/crates/kitsune-messaging/src/redis.rs @@ -2,20 +2,20 @@ //! Redis implementation //! -use crate::{MessagingBackend, Result}; +use crate::{util::TransparentDebug, MessagingBackend, Result}; use ahash::AHashMap; use async_trait::async_trait; use futures_util::{future, stream::BoxStream, StreamExt, TryStreamExt}; +use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt}; use redis::{ aio::{ConnectionManager, PubSub}, AsyncCommands, RedisError, }; -use std::time::Duration; +use std::fmt::Debug; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_stream::wrappers::BroadcastStream; const BROADCAST_CAPACITY: usize = 10; -const CONNECTION_RETRY_DELAY: Duration = Duration::from_secs(5); const REGISTRATION_QUEUE_SIZE: usize = 50; macro_rules! handle_err { @@ -79,15 +79,19 @@ impl MultiplexActor { debug!(%pattern, "Failed to find correct receiver"); } } else { - self.conn = loop { - match self.client.get_async_connection().await { - Ok(conn) => break conn.into_pubsub(), - Err(err) => { - error!(error = %err, "Failed to connect to Redis instance"); - tokio::time::sleep(CONNECTION_RETRY_DELAY).await; - } + self.conn = (|| { + let client = self.client.clone(); + async move { + client + .get_async_connection() + .await + .map(|conn| TransparentDebug(conn.into_pubsub())) } - }; + }) + .retry(futures_backoff_policy()) + .await + .map(|conn| conn.0) + .unwrap(); for key in self.mapping.keys() { handle_err!( diff --git a/crates/kitsune-messaging/src/util.rs b/crates/kitsune-messaging/src/util.rs new file mode 100644 index 000000000..d2afa3d10 --- /dev/null +++ b/crates/kitsune-messaging/src/util.rs @@ -0,0 +1,9 @@ +use std::fmt; + +pub struct TransparentDebug(pub T); + +impl fmt::Debug for TransparentDebug { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} {{ ... }}", std::any::type_name::()) + } +} diff --git a/crates/kitsune-retry-policies/Cargo.toml b/crates/kitsune-retry-policies/Cargo.toml new file mode 100644 index 000000000..ebd31ba7d --- /dev/null +++ b/crates/kitsune-retry-policies/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "kitsune-retry-policies" +edition.workspace = true +version.workspace = true + +[dependencies] +futures-retry-policies = { version = "0.3.1", features = [ + "retry-policies", + "tokio", + "tracing", +] } +retry-policies = "0.2.0" diff --git a/crates/kitsune-retry-policies/src/lib.rs b/crates/kitsune-retry-policies/src/lib.rs new file mode 100644 index 000000000..49b7e519d --- /dev/null +++ b/crates/kitsune-retry-policies/src/lib.rs @@ -0,0 +1,52 @@ +use futures_retry_policies::{retry_policies::RetryPolicies, tracing::Traced}; +use retry_policies::{policies::ExponentialBackoff, Jitter}; +use std::{ + fmt::{self, Debug}, + ops::ControlFlow, + time::{Duration, SystemTime}, +}; + +pub use futures_retry_policies::{tokio::RetryFutureExt, RetryPolicy}; + +pub struct NeverRetry(T); + +impl Debug for NeverRetry +where + T: Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl futures_retry_policies::ShouldRetry for NeverRetry { + fn should_retry(&self, _attempts: u32) -> bool { + false + } +} + +impl futures_retry_policies::RetryPolicy for NeverRetry +where + T: futures_retry_policies::RetryPolicy>, +{ + fn should_retry(&mut self, result: Res) -> ControlFlow { + match self.0.should_retry(NeverRetry(result)) { + ControlFlow::Break(NeverRetry(val)) => ControlFlow::Break(val), + ControlFlow::Continue(dur) => ControlFlow::Continue(dur), + } + } +} + +pub fn futures_backoff_policy() -> impl futures_retry_policies::RetryPolicy +where + Res: Debug, +{ + Traced(NeverRetry(RetryPolicies::new(backoff_policy()))) +} + +pub fn backoff_policy() -> impl retry_policies::RetryPolicy { + ExponentialBackoff::builder() + .jitter(Jitter::Bounded) + .build_with_total_retry_duration(Duration::from_secs(24 * 3600)) // Kill the retrying after 24 hours + .for_task_started_at(SystemTime::now().into()) +} diff --git a/crates/kitsune-search/Cargo.toml b/crates/kitsune-search/Cargo.toml index 67c4e1337..5034291d8 100644 --- a/crates/kitsune-search/Cargo.toml +++ b/crates/kitsune-search/Cargo.toml @@ -21,7 +21,7 @@ tracing = "0.1.37" # "kitsune-search" feature bytes = { version = "1.5.0", optional = true } kitsune-search-proto = { path = "../../kitsune-search-server/proto", optional = true } -tonic = { version = "0.10.1", optional = true } +tonic = { version = "0.10.2", optional = true } # "meilisearch" feature meilisearch-sdk = { version = "0.24.2", optional = true } diff --git a/crates/kitsune-storage/src/fs.rs b/crates/kitsune-storage/src/fs.rs index ae4cc3d81..646854b16 100644 --- a/crates/kitsune-storage/src/fs.rs +++ b/crates/kitsune-storage/src/fs.rs @@ -64,7 +64,7 @@ mod test { use std::str; use tempfile::TempDir; - const TEST_TEXT: &str = r#" + const TEST_TEXT: &str = r" 新時代はこの未来だ 世界中全部 変えてしまえば 変えてしまえば ジャマモノ やなもの なんて消して @@ -99,7 +99,7 @@ mod test { 果てしない音楽がもっと届くように 夢を見せるよ 夢を見せるよ 新時代だ Ooh - 新時代だ"#; + 新時代だ"; #[tokio::test] async fn basic() { diff --git a/kitsune-cli/Cargo.toml b/kitsune-cli/Cargo.toml index e9c24318a..1f40a2850 100644 --- a/kitsune-cli/Cargo.toml +++ b/kitsune-cli/Cargo.toml @@ -5,7 +5,7 @@ edition.workspace = true build = "build.rs" [dependencies] -clap = { version = "4.4.5", features = ["derive"] } +clap = { version = "4.4.6", features = ["derive"] } color-eyre = "0.6.2" diesel = "2.1.2" diesel-async = "0.4.1" diff --git a/kitsune-job-runner/Cargo.toml b/kitsune-job-runner/Cargo.toml index b59a4a47e..ed6704f05 100644 --- a/kitsune-job-runner/Cargo.toml +++ b/kitsune-job-runner/Cargo.toml @@ -5,12 +5,12 @@ version.workspace = true [dependencies] athena = { path = "../lib/athena" } -clap = { version = "4.4.5", features = ["derive"] } +clap = { version = "4.4.6", features = ["derive"] } color-eyre = "0.6.2" deadpool-redis = "0.13.0" -just-retry = { path = "../lib/just-retry" } kitsune-core = { path = "../crates/kitsune-core" } kitsune-db = { path = "../crates/kitsune-db" } +kitsune-retry-policies = { path = "../crates/kitsune-retry-policies" } mimalloc = "0.1.39" tokio = { version = "1.32.0", features = ["full"] } toml = "0.8.1" diff --git a/kitsune-job-runner/src/lib.rs b/kitsune-job-runner/src/lib.rs index ddcc6ccbf..355988f49 100644 --- a/kitsune-job-runner/src/lib.rs +++ b/kitsune-job-runner/src/lib.rs @@ -9,7 +9,8 @@ use kitsune_core::{ state::State as CoreState, }; use kitsune_db::PgPool; -use std::{sync::Arc, time::Duration}; +use kitsune_retry_policies::{futures_backoff_policy, RetryPolicy}; +use std::{ops::ControlFlow, sync::Arc, time::Duration}; use tokio::task::JoinSet; const EXECUTION_TIMEOUT_DURATION: Duration = Duration::from_secs(30); @@ -44,19 +45,26 @@ pub async fn run_dispatcher( let mut job_joinset = JoinSet::new(); loop { - while let Err(error) = job_queue - .spawn_jobs( - num_job_workers - job_joinset.len(), - Arc::clone(&ctx), - &mut job_joinset, - ) - .await - { - error!(?error, "failed to spawn more jobs"); - just_retry::sleep_a_bit().await; + let mut backoff_policy = futures_backoff_policy(); + loop { + let result = job_queue + .spawn_jobs( + num_job_workers - job_joinset.len(), + Arc::clone(&ctx), + &mut job_joinset, + ) + .await; + + if let ControlFlow::Continue(duration) = backoff_policy.should_retry(result) { + tokio::time::sleep(duration).await; + } else { + break; + } } - let join_all = async { while job_joinset.join_next().await.is_some() {} }; - let _ = tokio::time::timeout(EXECUTION_TIMEOUT_DURATION, join_all).await; + let _ = tokio::time::timeout(EXECUTION_TIMEOUT_DURATION, async { + while job_joinset.join_next().await.is_some() {} + }) + .await; } } diff --git a/kitsune-search-server/Cargo.toml b/kitsune-search-server/Cargo.toml index 376fabcdb..7be0c0f10 100644 --- a/kitsune-search-server/Cargo.toml +++ b/kitsune-search-server/Cargo.toml @@ -22,8 +22,8 @@ serde = { version = "1.0.188", features = ["derive"] } tantivy = "0.21.0" time = "0.3.29" tokio = { version = "1.32.0", features = ["full"] } -tonic = "0.10.1" -tonic-health = "0.10.1" +tonic = "0.10.2" +tonic-health = "0.10.2" tower-http = { version = "0.4.4", features = ["add-extension", "trace"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" diff --git a/kitsune-search-server/proto/Cargo.toml b/kitsune-search-server/proto/Cargo.toml index e8e37fde6..1c9e06825 100644 --- a/kitsune-search-server/proto/Cargo.toml +++ b/kitsune-search-server/proto/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true [dependencies] prost = "0.12.1" serde = { version = "1.0.188", features = ["derive"] } -tonic = { version = "0.10.1", default-features = false, features = [ +tonic = { version = "0.10.2", default-features = false, features = [ "codegen", "transport", "prost", @@ -14,7 +14,7 @@ tonic = { version = "0.10.1", default-features = false, features = [ [build-dependencies] protoc-bin-vendored = { version = "3.0.0", optional = true } -tonic-build = "0.10.1" +tonic-build = "0.10.2" [features] default = [] diff --git a/kitsune-search-server/src/main.rs b/kitsune-search-server/src/main.rs index f670d2ceb..68d9ec35d 100644 --- a/kitsune-search-server/src/main.rs +++ b/kitsune-search-server/src/main.rs @@ -20,7 +20,7 @@ use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer as _, Regi #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -const STARTUP_FIGLET: &str = r#" +const STARTUP_FIGLET: &str = r" ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃ ┃ ██╗ ██╗██╗████████╗███████╗██╗ ██╗███╗ ██╗███████╗ ███████╗███████╗ █████╗ ██████╗ ██████╗██╗ ██╗ ┃ @@ -31,7 +31,7 @@ const STARTUP_FIGLET: &str = r#" ┃ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═══╝╚══════╝ ╚══════╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝ ┃ ┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ -"#; +"; #[tokio::main] async fn main() { diff --git a/kitsune/Cargo.toml b/kitsune/Cargo.toml index b17d84a9b..86893bb0b 100644 --- a/kitsune/Cargo.toml +++ b/kitsune/Cargo.toml @@ -10,7 +10,7 @@ build = "build.rs" athena = { version = "0.0.1-pre.3", path = "../lib/athena" } argon2 = { version = "0.5.2", features = ["std"] } autometrics = { version = "0.6.0", default-features = false } -askama = { version = "0.12.0", features = [ +askama = { version = "0.12.1", features = [ "with-axum", ], default-features = false } askama_axum = "0.3.0" @@ -28,7 +28,7 @@ axum-extra = { version = "0.8.0", features = [ axum-flash = "0.7.0" bytes = "1.5.0" chrono = { version = "0.4.31", default-features = false } -clap = { version = "4.4.5", features = ["derive"] } +clap = { version = "4.4.6", features = ["derive"] } color-eyre = "0.6.2" const-oid = { version = "0.9.5", features = ["db"] } deadpool-redis = "0.13.0" @@ -39,7 +39,7 @@ eyre = "0.6.8" futures-util = "0.3.28" headers = "0.3.9" http = "0.2.9" -human-panic = "1.2.0" +human-panic = "1.2.1" hyper = { version = "0.14.27", features = ["deprecated"] } iso8601-timestamp = "0.2.12" mimalloc = "0.1.39" diff --git a/kitsune/src/consts.rs b/kitsune/src/consts.rs index 5c4f209ab..c0e7dc3bf 100644 --- a/kitsune/src/consts.rs +++ b/kitsune/src/consts.rs @@ -1,5 +1,5 @@ pub const API_DEFAULT_LIMIT: usize = 20; -pub const STARTUP_FIGLET: &str = r#" +pub const STARTUP_FIGLET: &str = r" ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃ ┃ ██╗ ██╗██╗████████╗███████╗██╗ ██╗███╗ ██╗███████╗ ┃ @@ -12,7 +12,7 @@ pub const STARTUP_FIGLET: &str = r#" ┃ ActivityPub-federated microblogging ┃ ┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ -"#; +"; #[must_use] pub fn default_limit() -> T diff --git a/kitsune/src/http/handler/well_known/webfinger.rs b/kitsune/src/http/handler/well_known/webfinger.rs index 25d914942..c0afee050 100644 --- a/kitsune/src/http/handler/well_known/webfinger.rs +++ b/kitsune/src/http/handler/well_known/webfinger.rs @@ -126,7 +126,7 @@ mod tests { .unwrap(); let resource = match response { Either::E1(Json(resource)) => resource, - Either::E2(status) => panic!("Unexpected status code: {}", status), + Either::E2(status) => panic!("Unexpected status code: {status}"), }; assert_eq!(resource.subject, "acct:alice@example.com"); @@ -191,7 +191,7 @@ mod tests { .unwrap(); let resource = match response { Either::E1(Json(resource)) => resource, - Either::E2(status) => panic!("Unexpected status code: {}", status), + Either::E2(status) => panic!("Unexpected status code: {status}"), }; assert_eq!(resource.subject, "acct:alice@alice.example"); @@ -203,7 +203,7 @@ mod tests { let response = get(db_pool, url_service, Query(query)).await.unwrap(); let resource = match response { Either::E1(Json(resource)) => resource, - Either::E2(status) => panic!("Unexpected status code: {}", status), + Either::E2(status) => panic!("Unexpected status code: {status}"), }; assert_eq!(resource.subject, "acct:alice@alice.example"); diff --git a/lib/athena/Cargo.toml b/lib/athena/Cargo.toml index 0c58ec524..4253714d1 100644 --- a/lib/athena/Cargo.toml +++ b/lib/athena/Cargo.toml @@ -9,10 +9,9 @@ ahash = "0.7.6" # We can not update because of the "redis" crate async-trait = "0.1.73" deadpool-redis = "0.13.0" either = { version = "1.9.0", default-features = false } -exponential-backoff = "1.2.0" futures-util = { version = "0.3.28", default-features = false } iso8601-timestamp = { version = "0.2.12", features = ["diesel-pg"] } -just-retry = { path = "../just-retry" } +kitsune-retry-policies = { path = "../../crates/kitsune-retry-policies" } once_cell = "1.18.0" rand = "0.8.5" redis = { version = "0.23.3", default-features = false, features = [ @@ -22,6 +21,7 @@ redis = { version = "0.23.3", default-features = false, features = [ "streams", "tokio-rustls-comp", ] } +retry-policies = "0.2.0" serde = { version = "1.0.188", features = ["derive"] } simd-json = "0.11.1" smol_str = "0.2.0" diff --git a/lib/athena/examples/basic_queue.rs b/lib/athena/examples/basic_queue.rs index a8ac2bcf4..953519bea 100644 --- a/lib/athena/examples/basic_queue.rs +++ b/lib/athena/examples/basic_queue.rs @@ -95,16 +95,15 @@ async fn main() { let mut jobs = JoinSet::new(); loop { - if let Ok(join_set) = tokio::time::timeout( + if tokio::time::timeout( Duration::from_secs(5), queue.spawn_jobs(20, Arc::new(()), &mut jobs), ) .await + .is_err() { - join_set.unwrap() - } else { return; - }; + } while jobs.join_next().await.is_some() {} } diff --git a/lib/athena/src/queue/mod.rs b/lib/athena/src/queue/mod.rs index debea0d0d..0da1d2987 100644 --- a/lib/athena/src/queue/mod.rs +++ b/lib/athena/src/queue/mod.rs @@ -3,19 +3,23 @@ use crate::{error::Result, impl_to_redis_args, Error, JobContextRepository, Runn use ahash::AHashMap; use deadpool_redis::Pool as RedisPool; use either::Either; -use exponential_backoff::Backoff; use futures_util::StreamExt; use iso8601_timestamp::Timestamp; -use just_retry::rerun_until_success; +use kitsune_retry_policies::{futures_backoff_policy, RetryFutureExt}; use redis::{ aio::ConnectionLike, streams::{StreamReadOptions, StreamReadReply}, AsyncCommands, RedisResult, }; +use retry_policies::{policies::ExponentialBackoff, Jitter, RetryDecision, RetryPolicy}; use serde::{Deserialize, Serialize}; use smol_str::SmolStr; use speedy_uuid::Uuid; -use std::{str::FromStr, sync::Arc, time::Duration}; +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; use tokio::{sync::OnceCell, task::JoinSet, time::Instant}; use typed_builder::TypedBuilder; @@ -26,7 +30,6 @@ const BLOCK_TIME: Duration = Duration::from_secs(2); const MIN_IDLE_TIME: Duration = Duration::from_secs(10 * 60); const MAX_RETRIES: u32 = 10; -const MIN_BACKOFF_DURATION: Duration = Duration::from_secs(5); enum JobState<'a> { Succeeded { @@ -256,13 +259,17 @@ where JobState::Failed { fail_count, job_id, .. } => { - let backoff = Backoff::new(self.max_retries, MIN_BACKOFF_DURATION, None); - if let Some(backoff_duration) = backoff.next(*fail_count) { + let backoff = ExponentialBackoff::builder() + .jitter(Jitter::Bounded) + .build_with_max_retries(self.max_retries); + + if let RetryDecision::Retry { execute_after } = backoff.should_retry(*fail_count) { let job_meta = JobMeta { job_id: *job_id, fail_count: fail_count + 1, }; - let backoff_timestamp = Timestamp::now_utc() + backoff_duration; + + let backoff_timestamp = Timestamp::from(SystemTime::from(execute_after)); let enqueue_cmd = self.enqueue_redis_cmd(&job_meta, Some(backoff_timestamp))?; pipeline.add_command(enqueue_cmd); @@ -348,7 +355,10 @@ where tokio::select! { result = &mut run_fut => break result, _ = tick_interval.tick() => { - rerun_until_success(|| this.reclaim_job(job_data)).await; + (|| this.reclaim_job(job_data)) + .retry(futures_backoff_policy()) + .await + .expect("Failed to reclaim job"); } } }; @@ -367,7 +377,10 @@ where } }; - rerun_until_success(|| this.complete_job(&job_state)).await; + (|| this.complete_job(&job_state)) + .retry(futures_backoff_policy()) + .await + .expect("Failed to mark job as completed"); }); } diff --git a/lib/just-retry/Cargo.toml b/lib/just-retry/Cargo.toml deleted file mode 100644 index d57fbb540..000000000 --- a/lib/just-retry/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "just-retry" -edition.workspace = true -version.workspace = true - -[dependencies] -rand = "0.8.5" -tokio = { version = "1.32.0", features = ["time"] } -tracing = "0.1.37" diff --git a/lib/just-retry/src/lib.rs b/lib/just-retry/src/lib.rs deleted file mode 100644 index b7c7ecaa9..000000000 --- a/lib/just-retry/src/lib.rs +++ /dev/null @@ -1,31 +0,0 @@ -#![forbid(rust_2018_idioms)] -#![warn(clippy::all, clippy::pedantic)] - -use rand::Rng; -use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; - -const ERROR_SLEEP_RANGE_SECS: RangeInclusive = 3.0..=6.0; - -pub async fn sleep_a_bit() { - let sleep_duration = - Duration::from_secs_f64(rand::thread_rng().gen_range(ERROR_SLEEP_RANGE_SECS)); - - tokio::time::sleep(sleep_duration).await; -} - -pub async fn rerun_until_success(mut func: F) -> Ok -where - F: FnMut() -> Fut, - Fut: Future>, - Err: Debug, -{ - loop { - match func().await { - Ok(val) => break val, - Err(error) => { - tracing::error!(?error, "rerun iteration failed"); - sleep_a_bit().await; - } - } - } -} diff --git a/lib/post-process/Cargo.toml b/lib/post-process/Cargo.toml index 2c2bf770f..611b460c3 100644 --- a/lib/post-process/Cargo.toml +++ b/lib/post-process/Cargo.toml @@ -13,7 +13,7 @@ criterion = { version = "0.5.1", default-features = false, features = [ "rayon", ] } futures = "0.3.28" -insta = { version = "1.32.0", features = ["glob"] } +insta = { version = "1.33.0", features = ["glob"] } pretty_assertions = "1.4.0" [[bench]]