Skip to content

Commit

Permalink
begin port
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed May 13, 2024
1 parent 961df18 commit eba7254
Show file tree
Hide file tree
Showing 16 changed files with 59 additions and 51 deletions.
29 changes: 20 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ members = [
]
resolver = "2"

[workspace.dependencies]
fred = { version = "9.0.3", features = [
"enable-rustls-ring",
"i-scripts",
"partial-tracing",
"sha-1",
] }

[workspace.lints.clippy]
all = "warn"
pedantic = "warn"
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ license.workspace = true

[dependencies]
enum_dispatch = "0.3.13"
fred = { workspace = true }
kitsune-error = { path = "../kitsune-error" }
moka = { version = "0.12.7", features = ["sync"] }
multiplex-pool = { path = "../../lib/multiplex-pool" }
serde = "1.0.201"
simd-json = "0.13.10"
tracing = "0.1.40"
Expand Down
27 changes: 14 additions & 13 deletions crates/kitsune-cache/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::CacheBackend;
use fred::{clients::RedisPool, interfaces::KeysInterface, types::Expiration};
use kitsune_error::Result;
use redis::{aio::ConnectionManager, AsyncCommands};
use serde::{de::DeserializeOwned, Serialize};
use std::{fmt::Display, marker::PhantomData, time::Duration};
use typed_builder::TypedBuilder;
Expand All @@ -14,7 +14,7 @@ where
namespace: String,
#[builder(setter(into))]
prefix: String,
redis_conn: multiplex_pool::Pool<ConnectionManager>,
redis_conn: RedisPool,
ttl: Duration,

// Type phantom data
Expand All @@ -28,11 +28,7 @@ impl<K, V> Redis<K, V>
where
K: ?Sized,
{
pub fn new<P>(
redis_conn: multiplex_pool::Pool<ConnectionManager>,
prefix: P,
ttl: Duration,
) -> Self
pub fn new<P>(redis_conn: RedisPool, prefix: P, ttl: Duration) -> Self
where
P: Into<String>,
{
Expand All @@ -55,22 +51,20 @@ where
{
#[instrument(skip_all, fields(%key))]
async fn delete(&self, key: &K) -> Result<()> {
let mut conn = self.redis_conn.get();
let key = self.compute_key(key);

debug!(%key, "Deleting cache entry");
conn.del(key).await?;
self.redis_conn.del(key).await?;

Ok(())
}

#[instrument(skip_all, fields(%key))]
async fn get(&self, key: &K) -> Result<Option<V>> {
let mut conn = self.redis_conn.get();
let key = self.compute_key(key);

debug!(%key, "Fetching cache entry");
if let Some(serialised) = conn.get::<_, Option<String>>(&key).await? {
if let Some(serialised) = self.redis_conn.get::<Option<String>, _>(&key).await? {
let mut serialised_bytes = serialised.into_bytes();
let deserialised = simd_json::from_slice(&mut serialised_bytes)?;
Ok(Some(deserialised))
Expand All @@ -81,12 +75,19 @@ where

#[instrument(skip_all, fields(%key))]
async fn set(&self, key: &K, value: &V) -> Result<()> {
let mut conn = self.redis_conn.get();
let key = self.compute_key(key);
let serialised = simd_json::to_string(value)?;

debug!(%key, ttl = ?self.ttl, "Setting cache entry");
conn.set_ex(key, serialised, self.ttl.as_secs()).await?;
self.redis_conn
.set(
key,
serialised,
Some(Expiration::EX(self.ttl.as_secs() as i64)),
None,
false,
)
.await?;

Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion crates/kitsune-oidc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ kitsune-derive = { path = "../kitsune-derive" }
kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
moka = { version = "0.12.7", features = ["sync"] }
multiplex-pool = { path = "../../lib/multiplex-pool" }
oauth2 = { version = "5.0.0-alpha.4", default-features = false }
once_cell = "1.19.0"
openidconnect = { version = "4.0.0-alpha.1", default-features = false, features = [
Expand Down
1 change: 0 additions & 1 deletion crates/kitsune-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ kitsune-storage = { path = "../kitsune-storage" }
kitsune-url = { path = "../kitsune-url" }
kitsune-util = { path = "../kitsune-util" }
mime = "0.3.17"
multiplex-pool = { path = "../../lib/multiplex-pool" }
password-hash = { version = "0.5.0", features = ["std"] }
pkcs8 = "0.10.2"
post-process = { path = "../../lib/post-process" }
Expand Down
1 change: 0 additions & 1 deletion crates/kitsune-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ isolang = "2.4.0"
kitsune-config = { path = "../kitsune-config" }
kitsune-db = { path = "../kitsune-db" }
kitsune-s3 = { path = "../kitsune-s3" }
multiplex-pool = { path = "../../lib/multiplex-pool" }
pin-project-lite = "0.2.14"
rand = "0.8.5"
rusty-s3 = { version = "0.5.0", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-wasm-mrf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ blake3 = "1.5.1"
color-eyre = "0.6.3"
derive_more = { version = "1.0.0-beta.6", features = ["from"] }
enum_dispatch = "0.3.13"
fred = { workspace = true }
futures-util = { version = "0.3.30", default-features = false, features = [
"alloc",
] }
Expand All @@ -20,7 +21,6 @@ kitsune-derive = { path = "../kitsune-derive" }
kitsune-error = { path = "../kitsune-error" }
kitsune-type = { path = "../kitsune-type" }
mrf-manifest = { path = "../../lib/mrf-manifest", features = ["decode"] }
multiplex-pool = { path = "../../lib/multiplex-pool" }
simd-json = "0.13.10"
slab = "0.4.9"
sled = "0.34.7"
Expand Down
2 changes: 1 addition & 1 deletion crates/kitsune-wasm-mrf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl MrfService {
kv_storage::FsBackend::from_path(path.as_str())?.into()
}
KvStorage::Redis(RedisKvStorage { ref url, pool_size }) => {
let client = redis::Client::open(url.as_str())?;
let pool = fred::
kv_storage::RedisBackend::from_client(client, pool_size.get())
.await?
.into()
Expand Down
1 change: 0 additions & 1 deletion crates/kitsune-webfinger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ kitsune-error = { path = "../kitsune-error" }
kitsune-http-client = { path = "../kitsune-http-client" }
kitsune-type = { path = "../kitsune-type" }
kitsune-util = { path = "../kitsune-util" }
multiplex-pool = { path = "../../lib/multiplex-pool" }
tracing = "0.1.40"
triomphe = "0.1.11"
urlencoding = "2.1.3"
Expand Down
1 change: 0 additions & 1 deletion kitsune-job-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ kitsune-service = { path = "../crates/kitsune-service" }
kitsune-url = { path = "../crates/kitsune-url" }
kitsune-wasm-mrf = { path = "../crates/kitsune-wasm-mrf" }
mimalloc = "0.1.41"
multiplex-pool = { path = "../lib/multiplex-pool" }
tokio = { version = "1.37.0", features = ["full"] }
tracing = "0.1.40"
triomphe = "0.1.11"
Expand Down
1 change: 0 additions & 1 deletion kitsune/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ kitsune-scss-compiler = { path = "../crates/kitsune-scss-compiler" }
[dev-dependencies]
kitsune-http-client = { path = "../crates/kitsune-http-client" }
kitsune-test = { path = "../crates/kitsune-test" }
multiplex-pool = { path = "../lib/multiplex-pool" }
pretty_assertions = "1.4.0"

[features]
Expand Down
10 changes: 2 additions & 8 deletions lib/athena/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ async-trait = "0.1.80"
either = { version = "1.11.0", default-features = false, optional = true }
futures-util = { version = "0.3.30", default-features = false }
iso8601-timestamp = "0.2.17"
fred = { workspace = true, optional = true }
just-retry = { path = "../just-retry" }
multiplex-pool = { path = "../multiplex-pool", optional = true }
once_cell = { version = "1.19.0", optional = true }
rand = { version = "0.8.5", optional = true }
serde = { version = "1.0.201", features = ["derive"] }
Expand All @@ -33,13 +33,7 @@ typetag = "0.2.16"
unsize = "1.1.0"

[features]
redis = [
"dep:either",
"dep:multiplex-pool",
"dep:once_cell",
"dep:rand",
"dep:simd-json",
]
redis = ["dep:either", "dep:fred", "dep:once_cell", "dep:rand", "dep:simd-json"]

[dev-dependencies]
kitsune-test = { path = "../../crates/kitsune-test" }
Expand Down
2 changes: 1 addition & 1 deletion lib/athena/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum Error {

#[cfg(feature = "redis")]
#[error(transparent)]
Redis(#[from] redis::RedisError),
Redis(#[from] fred::error::RedisError),

#[cfg(feature = "redis")]
#[error(transparent)]
Expand Down
5 changes: 2 additions & 3 deletions lib/athena/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
};
use async_trait::async_trait;
use either::Either;
use fred::clients::RedisPool;
use iso8601_timestamp::Timestamp;
use just_retry::{
retry_policies::{policies::ExponentialBackoff, Jitter},
Expand All @@ -25,8 +26,6 @@ use typed_builder::TypedBuilder;

mod scheduled;

type Pool = multiplex_pool::Pool<redis::aio::ConnectionManager>;

#[derive(TypedBuilder)]
pub struct JobQueue<CR> {
#[builder(default = "athena-job-runners".into(), setter(into))]
Expand All @@ -39,7 +38,7 @@ pub struct JobQueue<CR> {
max_retries: u32,
#[builder(setter(into))]
queue_name: SmolStr,
redis_pool: Pool,
redis_pool: RedisPool,
#[builder(default = SmolStr::from(format!("{queue_name}:scheduled")))]
scheduled_queue_name: SmolStr,

Expand Down
17 changes: 9 additions & 8 deletions lib/athena/src/redis/scheduled.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::Pool;
use crate::error::Result;
use fred::{clients::RedisPool, types::Script, util::NONE};
use once_cell::sync::Lazy;
use rand::Rng;
use redis::Script;
use smol_str::SmolStr;
use std::{ops::RangeInclusive, time::Duration};
use typed_builder::TypedBuilder;
Expand All @@ -12,22 +11,24 @@ use typed_builder::TypedBuilder;
const SCHEDULE_PAUSE_RANGE: RangeInclusive<u64> = 5..=10;
// This functionality is expressed as a script since scripts are executed transactionally
static SCHEDULE_SCRIPT: Lazy<Script> =
Lazy::new(|| Script::new(include_str!("../../lua/copy_scheduled.lua")));
Lazy::new(|| Script::from_lua(include_str!("../../lua/copy_scheduled.lua")));

#[derive(TypedBuilder)]
pub struct ScheduledJobActor {
redis_pool: Pool,
redis_pool: RedisPool,
scheduled_queue_name: SmolStr,
queue_name: SmolStr,
}

impl ScheduledJobActor {
async fn run(&mut self) -> Result<()> {
let mut conn = self.redis_pool.get();
let client = self.redis_pool.next();
SCHEDULE_SCRIPT
.key(self.queue_name.as_str())
.key(self.scheduled_queue_name.as_str())
.invoke_async(&mut conn)
.evalsha_with_reload(
client,
(self.queue_name.as_str(), self.scheduled_queue_name.as_str()),
NONE,
)
.await?;

Ok(())
Expand Down

0 comments on commit eba7254

Please sign in to comment.