Skip to content

Commit

Permalink
Ergonomic improvements to the redis list, with lots of error encapsul…
Browse files Browse the repository at this point in the history
…ation (#33)
  • Loading branch information
zakstucke authored Apr 9, 2024
1 parent 867c668 commit 3881e63
Show file tree
Hide file tree
Showing 8 changed files with 420 additions and 280 deletions.
25 changes: 13 additions & 12 deletions .zetch.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion opencollector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ exporters:
stream-name: default
# Writes all opentelemetry logs, traces, metrics to a file, useful for testing:
file/debug_file_writing:
path: /home/runner/work/bitbazaar/bitbazaar/logs/otlp_telemetry_out.log
path: /Users/zak/z/code/bitbazaar/logs/otlp_telemetry_out.log
rotation:
max_megabytes: 10
max_days: 3
Expand Down
43 changes: 19 additions & 24 deletions rust/bitbazaar/redis/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// Expire an existing key with a new/updated ttl.
///
/// https://redis.io/commands/pexpire/
pub fn expire(mut self, namespace: &'static str, key: &str, ttl: std::time::Duration) -> Self {
pub fn expire(mut self, namespace: &str, key: &str, ttl: std::time::Duration) -> Self {
self.pipe
.pexpire(
self.redis_conn.final_key(namespace, key.into()),
Expand Down Expand Up @@ -134,7 +134,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// - `value`: The value of the entry. (values of sets must be strings)
pub fn zadd(
mut self,
set_namespace: &'static str,
set_namespace: &str,
set_key: &str,
set_ttl: Option<std::time::Duration>,
score: i64,
Expand Down Expand Up @@ -167,12 +167,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// - `set_namespace`: The namespace of the set.
/// - `set_key`: The key of the set.
/// - `value`: The value of the entry. (values of sets must be strings)
pub fn zrem(
mut self,
set_namespace: &'static str,
set_key: &str,
value: impl AsRef<str>,
) -> Self {
pub fn zrem(mut self, set_namespace: &str, set_key: &str, value: impl AsRef<str>) -> Self {
self.pipe
.zrem(
self.redis_conn.final_key(set_namespace, set_key.into()),
Expand All @@ -198,7 +193,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// - items: The scores and values of the entries. (set values must be strings)
pub fn zadd_multi(
mut self,
set_namespace: &'static str,
set_namespace: &str,
set_key: &str,
set_ttl: Option<std::time::Duration>,
items: &[(i64, impl AsRef<str>)],
Expand Down Expand Up @@ -231,7 +226,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// https://redis.io/commands/zremrangebyscore/
pub fn zremrangebyscore(
mut self,
set_namespace: &'static str,
set_namespace: &str,
set_key: &str,
min: i64,
max: i64,
Expand All @@ -257,7 +252,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// (expiry accurate to the millisecond)
pub fn set(
mut self,
namespace: &'static str,
namespace: &str,
key: &str,
value: impl ToRedisArgs,
expiry: Option<std::time::Duration>,
Expand Down Expand Up @@ -290,7 +285,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// (expiry accurate to the millisecond)
pub fn mset<Value: ToRedisArgs>(
mut self,
namespace: &'static str,
namespace: &str,
pairs: impl IntoIterator<Item = (impl AsRef<str>, Value)>,
expiry: Option<std::time::Duration>,
) -> Self {
Expand Down Expand Up @@ -337,7 +332,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
/// Clear one or more keys.
pub fn clear<'key>(
mut self,
namespace: &'static str,
namespace: &str,
keys: impl IntoIterator<Item = &'key str>,
) -> Self {
let final_keys = keys
Expand All @@ -356,7 +351,7 @@ impl<'a, 'b, 'c, ReturnType> RedisBatch<'a, 'b, 'c, ReturnType> {
}

/// Clear all keys under a given namespace
pub fn clear_namespace(self, namespace: &'static str) -> Self {
pub fn clear_namespace(self, namespace: &str) -> Self {
let final_namespace = self.redis_conn.final_namespace(namespace);
self.script_no_return(CLEAR_NAMESPACE_SCRIPT.invoker().arg(final_namespace))
}
Expand Down Expand Up @@ -405,26 +400,26 @@ pub trait RedisBatchReturningOps<'c> {
) -> Self::NextType<ScriptOutput>;

/// Check if a key exists.
fn exists(self, namespace: &'static str, key: &str) -> Self::NextType<bool>;
fn exists(self, namespace: &str, key: &str) -> Self::NextType<bool>;

/// Check if multiple keys exists.
fn mexists<'key>(
self,
namespace: &'static str,
namespace: &str,
keys: impl IntoIterator<Item = &'key str>,
) -> Self::NextType<Vec<bool>>;

/// Get a value from a key. Returning `None` if the key doesn't exist.
fn get<Value: FromRedisValue>(
self,
namespace: &'static str,
namespace: &str,
key: &str,
) -> Self::NextType<Option<Value>>;

/// Get multiple values (MGET) of the same type at once. Returning `None` for each key that didn't exist.
fn mget<Value>(
self,
namespace: &'static str,
namespace: &str,
keys: impl IntoIterator<Item = impl AsRef<str>>,
) -> Self::NextType<Vec<Option<Value>>>;

Expand All @@ -444,7 +439,7 @@ pub trait RedisBatchReturningOps<'c> {
/// https://redis.io/commands/zrangebyscore/
fn zrangebyscore<Value: FromRedisValue>(
self,
set_namespace: &'static str,
set_namespace: &str,
set_key: &str,
min: i64,
max: i64,
Expand Down Expand Up @@ -473,7 +468,7 @@ macro_rules! impl_batch_ops {



fn exists(mut self, namespace: &'static str, key: &str) -> Self::NextType<bool> {
fn exists(mut self, namespace: &str, key: &str) -> Self::NextType<bool> {
self.pipe.exists(self.redis_conn.final_key(namespace, key.into()));
RedisBatch {
_returns: PhantomData,
Expand All @@ -485,7 +480,7 @@ macro_rules! impl_batch_ops {

fn mexists<'key>(
self,
namespace: &'static str,
namespace: &str,
keys: impl IntoIterator<Item = &'key str>,
) -> Self::NextType<Vec<bool>> {
let final_keys = keys.into_iter().map(Into::into).map(|key| self.redis_conn.final_key(namespace, key)).collect::<Vec<_>>();
Expand All @@ -498,7 +493,7 @@ macro_rules! impl_batch_ops {

fn get<Value: FromRedisValue>(
mut self,
namespace: &'static str,
namespace: &str,
key: &str,
) -> Self::NextType<Option<Value>> {
self.pipe.get(self.redis_conn.final_key(namespace, key.into()));
Expand All @@ -512,7 +507,7 @@ macro_rules! impl_batch_ops {

fn mget<Value>(
mut self,
namespace: &'static str,
namespace: &str,
keys: impl IntoIterator<Item = impl AsRef<str>>,
) -> Self::NextType<Vec<Option<Value>>> {
let final_keys = keys.into_iter().map(|key| self.redis_conn.final_key(namespace, key.as_ref().into())).collect::<Vec<_>>();
Expand All @@ -528,7 +523,7 @@ macro_rules! impl_batch_ops {

fn zrangebyscore<Value: FromRedisValue>(
mut self,
set_namespace: &'static str,
set_namespace: &str,
set_key: &str,
min: i64,
max: i64,
Expand Down
6 changes: 3 additions & 3 deletions rust/bitbazaar/redis/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ impl<'a> RedisConn<'a> {

/// Redis keys are all prefixed, use this to finalise a namespace outside of built in commands, e.g. for use in a custom script.
#[inline]
pub fn final_namespace(&self, namespace: &'static str) -> String {
pub fn final_namespace(&self, namespace: &str) -> String {
format!("{}:{}", self.prefix, namespace)
}

/// Redis keys are all prefixed, use this to finalise a key outside of built in commands, e.g. for use in a custom script.
#[inline]
pub fn final_key(&self, namespace: &'static str, key: Cow<'_, str>) -> String {
pub fn final_key(&self, namespace: &str, key: Cow<'_, str>) -> String {
format!("{}:{}", self.final_namespace(namespace), key)
}

Expand All @@ -57,7 +57,7 @@ impl<'a> RedisConn<'a> {
#[inline]
pub async fn cached_fn<'b, T, Fut, K: Into<Cow<'b, str>>>(
&mut self,
namespace: &'static str,
namespace: &str,
key: K,
expiry: Option<std::time::Duration>,
cb: impl FnOnce() -> Fut,
Expand Down
62 changes: 26 additions & 36 deletions rust/bitbazaar/redis/json.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,11 @@
use redis::{FromRedisValue, ToRedisArgs};

/// A wrapper on an arbitrary json object to allow reading and writing to redis.
/// Use [`RedisJsonConsume::consume`] to extract the inner json object.
/// Access the inner with .0.
#[derive(Debug)]
pub struct RedisJson<T: serde::Serialize + for<'a> serde::Deserialize<'a>>(pub T);

/// If the object is comparable, make the wrapper too:
impl<T: serde::Serialize + for<'a> serde::Deserialize<'a> + PartialEq> PartialEq for RedisJson<T> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
/// If the object is comparable, make the wrapper too:
impl<T: serde::Serialize + for<'a> serde::Deserialize<'a> + Eq> Eq for RedisJson<T> {}

/// A trait to allow consuming a redis wrapper and returning the inner json object.
pub trait RedisJsonConsume<T> {
/// Extract the inner json object from the redis wrapper.
fn consume(self) -> T;
}

impl<T: serde::Serialize + for<'a> serde::Deserialize<'a>> RedisJsonConsume<T> for RedisJson<T> {
fn consume(self) -> T {
self.0
}
}

impl<T: serde::Serialize + for<'a> serde::Deserialize<'a>> RedisJsonConsume<Option<T>>
for Option<RedisJson<T>>
{
fn consume(self) -> Option<T> {
self.map(|x| x.0)
}
}

impl<T> FromRedisValue for RedisJson<T>
where
T: serde::Serialize + for<'a> serde::Deserialize<'a>,
{
impl<T: serde::Serialize + for<'a> serde::Deserialize<'a>> FromRedisValue for RedisJson<T> {
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
match v {
redis::Value::Data(data) => Ok(Self(serde_json::from_slice(data)?)),
Expand All @@ -49,9 +17,31 @@ where
}
}

impl<T> ToRedisArgs for RedisJson<T>
impl<T: serde::Serialize + for<'a> serde::Deserialize<'a>> ToRedisArgs for RedisJson<T> {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + redis::RedisWrite,
{
let data = serde_json::to_vec(&self.0).unwrap();
out.write_arg(&data)
}
}

/// A borrowed wrapper on an arbitrary json object to writing to redis.
/// Use this over [`RedisJson`] when you don't want to own the data and prevent unnecessary cloning.
/// Access the inner with .0.
#[derive(Debug)]
pub struct RedisJsonBorrowed<'a, T>(pub &'a T)
where
// Needs to be serializable from the reference, deserializable to T itself:
T: serde::Deserialize<'a>,
&'a T: serde::Serialize;

impl<'a, T> ToRedisArgs for RedisJsonBorrowed<'a, T>
where
T: serde::Serialize + for<'a> serde::Deserialize<'a>,
// Needs to be serializable from the reference, deserializable to T itself:
T: serde::Deserialize<'a>,
&'a T: serde::Serialize,
{
fn write_redis_args<W>(&self, out: &mut W)
where
Expand Down
10 changes: 5 additions & 5 deletions rust/bitbazaar/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ mod wrapper;
pub use batch::{RedisBatch, RedisBatchFire, RedisBatchReturningOps};
pub use conn::RedisConn;
pub use dlock::{RedisLock, RedisLockErr};
pub use json::{RedisJson, RedisJsonConsume};
pub use json::{RedisJson, RedisJsonBorrowed};
pub use script::{RedisScript, RedisScriptInvoker};
pub use temp_list::RedisTempList;
pub use temp_list::{RedisTempList, RedisTempListItem};
pub use wrapper::Redis;

#[cfg(test)]
Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
.fire()
.await
.flatten()
.consume(),
.map(|x| x.0),
exp
);
}
Expand All @@ -301,7 +301,7 @@ mod tests {
}))
})
.await?
.consume(),
.0,
ExampleJson {
ree: "roo".to_string(),
},
Expand Down Expand Up @@ -331,7 +331,7 @@ mod tests {
}
)
.await?
.consume(),
.0,
ExampleJson {
ree: "roo".to_string(),
},
Expand Down
Loading

0 comments on commit 3881e63

Please sign in to comment.