Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis: add basic impl #80

Draft
wants to merge 1 commit into
base: tower
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions examples/examples/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use axum::{extract::Path, routing::get, Json, Router};

use hitbox_redis::Builder;
use hitbox_tower::Cache;
use http::StatusCode;
use tower::ServiceBuilder;

async fn handler_result(Path(name): Path<String>) -> Result<String, StatusCode> {
dbg!("axum::handler_result");
Ok(format!("Hello, {name}"))
}

async fn handler() -> String {
dbg!("axum::handler");
format!("root")
}

#[derive(serde::Serialize)]
struct Greet {
name: String,
answer: u32,
}

async fn handler_json() -> Json<Greet> {
dbg!("axum::handler");
Json(Greet {
name: "root".to_owned(),
answer: 42,
})
}

#[tokio::main]
async fn main() {
let subscriber = tracing_subscriber::fmt()
.pretty()
.with_env_filter("debug,hitbox=trace")
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();

let redis = Builder::standalone()
.set_host("localhost".to_owned())
.set_port(6379)
.build()
.unwrap();
let cache = Cache::new(redis);

// build our application with a single route
let app = Router::new()
.route("/greet/:name/", get(handler_result))
.route("/", get(handler))
.route(
"/json/",
get(handler_json), //.layer(cache)
)
.layer(ServiceBuilder::new().layer(cache));

// run it with hyper on localhost:3000
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
1 change: 1 addition & 0 deletions hitbox-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ serde_json = "1"
bincode = "1"
thiserror = "1"
serde_urlencoded = { version = "0.7.1", default-features = false }
bytes = "1"
31 changes: 30 additions & 1 deletion hitbox-backend/src/serializer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::marker::PhantomData;
use std::ops::Deref;

use chrono::{DateTime, Utc};
use hitbox_core::CachedValue;
Expand Down Expand Up @@ -38,7 +39,8 @@ impl<U> SerializableCachedValue<U> {
}
}

#[derive(Default)]
// TODO: remove clone
#[derive(Default, Clone)]
pub struct JsonSerializer<Raw = Vec<u8>> {
_raw: PhantomData<Raw>,
}
Expand Down Expand Up @@ -126,6 +128,33 @@ impl Serializer for BinSerializer<Vec<u8>> {
}
}

impl Serializer for bytes::Bytes {
type Raw = bytes::Bytes;

fn deserialize<T>(data: Self::Raw) -> Result<CachedValue<T>, SerializerError>
where
T: DeserializeOwned,
{
let deserialized = bincode::deserialize::<SerializableCachedValue<T>>(data.deref())
.map_err(|err| SerializerError::Deserialize(Box::new(err)))?;
let cached_value = deserialized.into_cached_value();
Ok(CachedValue::new(cached_value.data, cached_value.expired))
}

fn serialize<T>(value: &CachedValue<T>) -> Result<Self::Raw, SerializerError>
where
T: Serialize,
{
let serializable_value = SerializableCachedValue {
data: &value.data,
expired: value.expired,
};
bincode::serialize(&serializable_value)
.map(Into::into)
.map_err(|err| SerializerError::Serialize(Box::new(err)))
}
}

#[cfg(test)]
mod test {
use async_trait::async_trait;
Expand Down
2 changes: 1 addition & 1 deletion hitbox-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ keywords = ["cache", "async", "cache-backend", "hitbox", "redis"]
hitbox-backend = { path = "../hitbox-backend", version = "0.1.0" }
hitbox = { path = "../hitbox", version = "0.1.0" }
log = "0.4"
redis = { version = "0.23", features = ["tokio-comp", "connection-manager"] }
thiserror = "1"
async-trait = "0.1"
serde = "1"
tokio = "1"
tracing = { version = "0.1", default-features = false }
fred = { version = "6.3.0" }

[dev-dependencies]
chrono = "0.4"
Expand Down
218 changes: 84 additions & 134 deletions hitbox-redis/src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,166 +1,116 @@
//! Redis backend actor implementation.
use crate::error::Error;
use std::future::Future;
use std::marker::PhantomData;

use async_trait::async_trait;
use hitbox::{CacheKey, CacheableResponse, CachedValue};
use fred::{
clients::RedisClient,
interfaces::{ClientLike, KeysInterface},
types::{Expiration, FromRedis, RedisKey, RedisValue},
};
use hitbox_backend::{
serializer::{JsonSerializer, Serializer},
BackendError, BackendResult, CacheBackend, DeleteStatus, KeySerializer,
UrlEncodedKeySerializer,
serializer::Serializer, BackendError, BackendResult, CacheBackend, CacheableResponse,
CachedValue, DeleteStatus,
};
use redis::{aio::ConnectionManager, Client};
use tokio::sync::OnceCell;
use tracing::trace;

/// Redis cache backend based on redis-rs crate.
///
/// This struct provides redis as storage [Backend] for hitbox.
/// Its use one [MultiplexedConnection] for asynchronous network interaction.
///
/// [MultiplexedConnection]: redis::aio::MultiplexedConnection
/// [Backend]: hitbox_backend::Backend
#[derive(Clone)]
pub struct RedisBackend {
client: Client,
connection: OnceCell<ConnectionManager>,
}

impl RedisBackend {
/// Create new backend instance with default settings.
///
/// # Examples
/// ```
/// use hitbox_redis::RedisBackend;
///
/// #[tokio::main]
/// async fn main() {
/// let backend = RedisBackend::new();
/// }
/// ```
pub fn new() -> Result<RedisBackend, BackendError> {
Ok(Self::builder().build()?)
}

/// Creates new RedisBackend builder with default settings.
pub fn builder() -> RedisBackendBuilder {
RedisBackendBuilder::default()
}

/// Create lazy connection to redis via [ConnectionManager](redis::aio::ConnectionManager)
pub async fn connection(&self) -> Result<&ConnectionManager, BackendError> {
trace!("Get connection manager");
let manager = self
.connection
.get_or_try_init(|| {
trace!("Initialize new redis connection manager");
self.client.get_tokio_connection_manager()
})
.await
.map_err(Error::from)?;
Ok(manager)
}
}

/// Part of builder pattern implementation for RedisBackend actor.
pub struct RedisBackendBuilder {
connection_info: String,
}
use crate::error::Error;

impl Default for RedisBackendBuilder {
fn default() -> Self {
Self {
connection_info: "redis://127.0.0.1/".to_owned(),
}
}
#[derive(Clone)]
pub struct RedisBackend<S> {
pub(super) client: RedisClient,
pub(super) _ser: PhantomData<S>,
}

impl RedisBackendBuilder {
/// Set connection info (host, port, database, etc.) for RedisBackend actor.
pub fn server(mut self, connection_info: String) -> Self {
self.connection_info = connection_info;
self
}

/// Create new instance of Redis backend with passed settings.
pub fn build(self) -> Result<RedisBackend, Error> {
Ok(RedisBackend {
client: Client::open(self.connection_info)?,
connection: OnceCell::new(),
})
impl<S> RedisBackend<S> {
async fn execute<'a, T, Fut, F>(&'a self, f: F) -> Result<T, Error>
where
F: FnOnce(&'a RedisClient) -> Fut,
Fut: Future<Output = Result<T, Error>>,
T: Send,
{
let connection_task = self.client.connect();
self.client.wait_for_connect().await?;
let client = &self.client;
let result = f(client).await?;
self.client.quit().await?;
connection_task.await??;
Ok(result)
}
}

#[async_trait]
impl CacheBackend for RedisBackend {
async fn get<T>(&self, key: &CacheKey) -> BackendResult<Option<CachedValue<T::Cached>>>
impl<S> CacheBackend for RedisBackend<S>
where
S: Serializer + Send + Sync,
S::Raw: Send + Sync + FromRedis,
RedisValue: From<S::Raw>,
{
async fn get<T>(&self, key: String) -> BackendResult<Option<CachedValue<T::Cached>>>
where
T: CacheableResponse,
<T as CacheableResponse>::Cached: serde::de::DeserializeOwned,
{
let client = self.client.clone();
let cache_key = UrlEncodedKeySerializer::serialize(key)?;
async move {
let mut con = client.get_tokio_connection_manager().await.unwrap();
let result: Option<Vec<u8>> = redis::cmd("GET")
.arg(cache_key)
.query_async(&mut con)
.await
.map_err(Error::from)
.map_err(BackendError::from)?;
result
.map(|value| {
JsonSerializer::<Vec<u8>>::deserialize(value).map_err(BackendError::from)
})
.transpose()
}
.await
}

async fn delete(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
let mut con = self.connection().await?.clone();
let cache_key = UrlEncodedKeySerializer::serialize(key)?;
redis::cmd("DEL")
.arg(cache_key)
.query_async(&mut con)
.await
.map(|res| {
if res > 0 {
DeleteStatus::Deleted(res)
} else {
DeleteStatus::Missing
}
tracing::debug!("RedisBackend::get::{}", &key);
let key = RedisKey::from(key);
let result = self
.execute(|client| async move {
client
.get::<Option<S::Raw>, _>(key)
.await
.map_err(Error::from)
})
.map_err(Error::from)
.map_err(BackendError::from)
.await
.map_err(BackendError::from)?;
result
.map(|value| S::deserialize(value).map_err(BackendError::from))
.transpose()
}

async fn set<T>(
&self,
key: &CacheKey,
key: String,
value: &CachedValue<T::Cached>,
ttl: Option<u32>,
) -> BackendResult<()>
where
T: CacheableResponse + Send,
T::Cached: serde::Serialize + Send + Sync,
<T as CacheableResponse>::Cached: serde::Serialize + Send + Sync,
{
let mut con = self.connection().await?.clone();
let mut request = redis::cmd("SET");
let cache_key = UrlEncodedKeySerializer::serialize(key)?;
let serialized_value =
JsonSerializer::<Vec<u8>>::serialize(value).map_err(BackendError::from)?;
request.arg(cache_key).arg(serialized_value);
if let Some(ttl) = ttl {
request.arg("EX").arg(ttl);
};
request
.query_async(&mut con)
.await
.map_err(Error::from)
.map_err(BackendError::from)
tracing::debug!("RedisBackend::set::{}", &key);
let key = RedisKey::from(key);
let ser_value = S::serialize(value).map_err(BackendError::from)?;
self.execute(|client| async move {
let expire = ttl.map(|ttl| Expiration::EX(ttl as i64));
client
.set::<(), _, S::Raw>(key, ser_value, expire, None, false)
.await
.map_err(Error::from)
})
.await
.map_err(BackendError::from)
}

async fn delete(&self, key: String) -> BackendResult<DeleteStatus> {
tracing::debug!("RedisBackend::delete::{}", &key);
let key = RedisKey::from(key);
self.execute(|client| async move {
client
.del::<u32, _>(key)
.await
.map(|res| {
if res > 0 {
DeleteStatus::Deleted(res)
} else {
DeleteStatus::Missing
}
})
.map_err(Error::from)
})
.await
.map_err(BackendError::from)
}

async fn start(&self) -> BackendResult<()> {
self.connection().await?;
tracing::debug!("RedisBackend::start");
Ok(())
}
}
Loading