diff --git a/Cargo.lock b/Cargo.lock index 9362711cb..2c532c3a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4477,6 +4477,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -5541,8 +5551,7 @@ dependencies = [ [[package]] name = "redis" version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd" +source = "git+https://github.com/redis-rs/redis-rs.git?rev=0c544b548b52180acda8e8394ab21b2761497e50#0c544b548b52180acda8e8394ab21b2761497e50" dependencies = [ "ahash 0.8.11", "arc-swap", @@ -5552,6 +5561,7 @@ dependencies = [ "futures", "futures-util", "itoa 1.0.11", + "num-bigint", "percent-encoding", "pin-project-lite", "rustls 0.22.4", diff --git a/Cargo.toml b/Cargo.toml index 0b05f2015..f56797117 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,9 @@ diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "d0 scraper = { git = "https://github.com/causal-agent/scraper.git", rev = "d67111f5cc0b7da6e6ff10e4549d87cf09ba3e5b" } tokio-postgres-rustls = { git = "https://github.com/jbg/tokio-postgres-rustls.git", rev = "b16c1bc0f5d4f91324174fd1bd839d743a70f86a" } +# Support XAUTOCLAIM +redis = { git = "https://github.com/redis-rs/redis-rs.git", rev = "0c544b548b52180acda8e8394ab21b2761497e50" } + # Patch to make OpenTelemetry with with hyper 1 opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "b44cb130e4a102b0d676289e91c003f4b1008d08" } opentelemetry-http = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "b44cb130e4a102b0d676289e91c003f4b1008d08" } diff --git a/lib/athena/src/common.rs b/lib/athena/src/common.rs index 992590d6f..2d3e4bceb 100644 --- a/lib/athena/src/common.rs +++ b/lib/athena/src/common.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, Result}, JobContextRepository, JobData, JobQueue, JobResult, Outcome, Runnable, }; -use ahash::AHashMap; +use ahash::HashMap; use futures_util::TryStreamExt; use just_retry::RetryExt; use speedy_uuid::Uuid; @@ -40,7 +40,7 @@ where let job_data = job_data .into_iter() .map(|data| (data.job_id, data)) - .collect::>(); + .collect::>(); let job_data = Arc::new(job_data); while let Some((job_id, job_ctx)) = context_stream diff --git a/lib/athena/src/redis/mod.rs b/lib/athena/src/redis/mod.rs index dd9f30340..420cb4074 100644 --- a/lib/athena/src/redis/mod.rs +++ b/lib/athena/src/redis/mod.rs @@ -1,4 +1,4 @@ -use self::{scheduled::ScheduledJobActor, util::StreamAutoClaimReply}; +use self::scheduled::ScheduledJobActor; use crate::{ consts::{BLOCK_TIME, MAX_RETRIES, MIN_IDLE_TIME}, error::Result, @@ -13,7 +13,7 @@ use just_retry::{ }; use redis::{ aio::ConnectionLike, - streams::{StreamReadOptions, StreamReadReply}, + streams::{StreamAutoClaimOptions, StreamAutoClaimReply, StreamReadOptions, StreamReadReply}, AsyncCommands, RedisResult, }; use smol_str::SmolStr; @@ -24,7 +24,6 @@ use triomphe::Arc; use typed_builder::TypedBuilder; mod scheduled; -mod util; type Pool = multiplex_pool::Pool; @@ -154,17 +153,19 @@ where let mut redis_conn = self.redis_pool.get(); self.initialise_group(&mut redis_conn).await?; - let StreamAutoClaimReply { claimed_ids, .. }: StreamAutoClaimReply = - redis::cmd("XAUTOCLAIM") - .arg(self.queue_name.as_str()) - .arg(self.consumer_group.as_str()) - .arg(self.consumer_name.as_str()) - .arg(MIN_IDLE_TIME.as_millis() as u64) - .arg("0-0") - .arg("COUNT") - .arg(max_jobs) - .query_async(&mut redis_conn) - .await?; + let StreamAutoClaimReply { + claimed: claimed_ids, + .. + } = redis_conn + .xautoclaim_options( + self.queue_name.as_str(), + self.consumer_group.as_str(), + self.consumer_name.as_str(), + MIN_IDLE_TIME.as_millis() as u64, + "0-0", + StreamAutoClaimOptions::default().count(max_jobs), + ) + .await?; let claimed_ids = if claimed_ids.len() == max_jobs { Either::Left(claimed_ids.into_iter()) diff --git a/lib/athena/src/redis/util.rs b/lib/athena/src/redis/util.rs deleted file mode 100644 index df33de218..000000000 --- a/lib/athena/src/redis/util.rs +++ /dev/null @@ -1,27 +0,0 @@ -use ahash::AHashMap; -use redis::{streams::StreamId, FromRedisValue, RedisResult}; - -#[derive(Clone, Debug)] -pub struct StreamAutoClaimReply { - pub claimed_ids: Vec, -} - -impl FromRedisValue for StreamAutoClaimReply { - fn from_redis_value(v: &redis::Value) -> RedisResult { - type AutoClaimReturnValue = ( - String, - Vec)>>, - Vec, - ); - - let (_start_stream_id, claimed_ids, _deleted_ids): AutoClaimReturnValue = - redis::from_redis_value(v)?; - - let claimed_ids: Vec = claimed_ids - .into_iter() - .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map })) - .collect(); - - Ok(Self { claimed_ids }) - } -}