Skip to content

Commit

Permalink
remove own util stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed May 12, 2024
1 parent 165d4ba commit f550f84
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 45 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "d0
scraper = { git = "https://github.com/causal-agent/scraper.git", rev = "d67111f5cc0b7da6e6ff10e4549d87cf09ba3e5b" }
tokio-postgres-rustls = { git = "https://github.com/jbg/tokio-postgres-rustls.git", rev = "b16c1bc0f5d4f91324174fd1bd839d743a70f86a" }

# Support XAUTOCLAIM
redis = { git = "https://github.com/redis-rs/redis-rs.git", rev = "0c544b548b52180acda8e8394ab21b2761497e50" }

# Patch to make OpenTelemetry with with hyper 1
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "b44cb130e4a102b0d676289e91c003f4b1008d08" }
opentelemetry-http = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "b44cb130e4a102b0d676289e91c003f4b1008d08" }
Expand Down
4 changes: 2 additions & 2 deletions lib/athena/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
error::{Error, Result},
JobContextRepository, JobData, JobQueue, JobResult, Outcome, Runnable,
};
use ahash::AHashMap;
use ahash::HashMap;
use futures_util::TryStreamExt;
use just_retry::RetryExt;
use speedy_uuid::Uuid;
Expand Down Expand Up @@ -40,7 +40,7 @@ where
let job_data = job_data
.into_iter()
.map(|data| (data.job_id, data))
.collect::<AHashMap<Uuid, JobData>>();
.collect::<HashMap<Uuid, JobData>>();
let job_data = Arc::new(job_data);

while let Some((job_id, job_ctx)) = context_stream
Expand Down
29 changes: 15 additions & 14 deletions lib/athena/src/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use self::{scheduled::ScheduledJobActor, util::StreamAutoClaimReply};
use self::scheduled::ScheduledJobActor;
use crate::{
consts::{BLOCK_TIME, MAX_RETRIES, MIN_IDLE_TIME},
error::Result,
Expand All @@ -13,7 +13,7 @@ use just_retry::{
};
use redis::{
aio::ConnectionLike,
streams::{StreamReadOptions, StreamReadReply},
streams::{StreamAutoClaimOptions, StreamAutoClaimReply, StreamReadOptions, StreamReadReply},
AsyncCommands, RedisResult,
};
use smol_str::SmolStr;
Expand All @@ -24,7 +24,6 @@ use triomphe::Arc;
use typed_builder::TypedBuilder;

mod scheduled;
mod util;

type Pool = multiplex_pool::Pool<redis::aio::ConnectionManager>;

Expand Down Expand Up @@ -154,17 +153,19 @@ where
let mut redis_conn = self.redis_pool.get();
self.initialise_group(&mut redis_conn).await?;

let StreamAutoClaimReply { claimed_ids, .. }: StreamAutoClaimReply =
redis::cmd("XAUTOCLAIM")
.arg(self.queue_name.as_str())
.arg(self.consumer_group.as_str())
.arg(self.consumer_name.as_str())
.arg(MIN_IDLE_TIME.as_millis() as u64)
.arg("0-0")
.arg("COUNT")
.arg(max_jobs)
.query_async(&mut redis_conn)
.await?;
let StreamAutoClaimReply {
claimed: claimed_ids,
..
} = redis_conn
.xautoclaim_options(
self.queue_name.as_str(),
self.consumer_group.as_str(),
self.consumer_name.as_str(),
MIN_IDLE_TIME.as_millis() as u64,
"0-0",
StreamAutoClaimOptions::default().count(max_jobs),
)
.await?;

let claimed_ids = if claimed_ids.len() == max_jobs {
Either::Left(claimed_ids.into_iter())
Expand Down
27 changes: 0 additions & 27 deletions lib/athena/src/redis/util.rs

This file was deleted.

0 comments on commit f550f84

Please sign in to comment.