Skip to content

Commit

Permalink
Stretto in-memory backend
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyErmilov committed Jul 29, 2023
1 parent 5d94f26 commit d26665b
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 47 deletions.
9 changes: 3 additions & 6 deletions examples/examples/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,17 @@ async fn main() {
tracing::subscriber::set_global_default(subscriber).unwrap();

let backend = RedisBackend::new().unwrap();
let inmemory = hitbox_stretto::StrettoBackendBuilder::new(12960, 1e6 as i64)
let inmemory = hitbox_stretto::StrettoBackendBuilder::new(2 ^ 16)
.finalize()
.unwrap();
// build our application with a single route
let app = Router::new()
.route("/greet/:name/", get(handler_result))
.route("/", get(handler))
.route(
"/json/",
get(handler_json), // .layer(Cache::builder().backend(inmemory.clone()).build()),
)
.route("/json/", get(handler_json))
.layer(
ServiceBuilder::new()
// .layer(Cache::builder().backend(inmemory).build())
.layer(Cache::builder().backend(inmemory).build())
.layer(Cache::builder().backend(backend).build()),
);

Expand Down
4 changes: 1 addition & 3 deletions examples/examples/tower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ async fn main() {
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();

let inmemory = StrettoBackendBuilder::new(12960, 1e6 as i64)
.finalize()
.unwrap();
let inmemory = StrettoBackendBuilder::new(2 ^ 16).finalize().unwrap();
let service = tower::ServiceBuilder::new()
.layer(tower_http::trace::TraceLayer::new_for_http())
.layer(Cache::builder().backend(inmemory).build())
Expand Down
1 change: 1 addition & 0 deletions hitbox-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ futures = { version = "0.3", default-features = false }
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bincode = "1"
thiserror = "1"
44 changes: 43 additions & 1 deletion hitbox-backend/src/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,40 @@ impl Serializer for JsonSerializer<String> {
}
}

#[derive(Default)]
pub struct BinSerializer<Raw = Vec<u8>> {
_raw: PhantomData<Raw>,
}

impl Serializer for BinSerializer<Vec<u8>> {
type Raw = Vec<u8>;

fn deserialize<T>(data: Self::Raw) -> Result<CachedValue<T>, SerializerError>
where
T: DeserializeOwned,
{
let deserialized: SerializableCachedValue<T> = bincode::deserialize(&data)
.map_err(|err| SerializerError::Deserialize(Box::new(err)))?;
let cached_value = deserialized.into_cached_value();
Ok(CachedValue::new(
cached_value.data.into(),
cached_value.expired,
))
}

fn serialize<T>(value: &CachedValue<T>) -> Result<Self::Raw, SerializerError>
where
T: Serialize,
{
let serializable_value: SerializableCachedValue<&T> = SerializableCachedValue {
data: &value.data,
expired: value.expired,
};
Ok(bincode::serialize(&serializable_value)
.map_err(|err| SerializerError::Serialize(Box::new(err)))?)
}
}

#[cfg(test)]
mod test {
use std::convert::Infallible;
Expand Down Expand Up @@ -155,7 +189,15 @@ mod test {
fn test_json_string_serializer() {
let value = CachedValue::new(Test::new(), Utc::now());
let raw = JsonSerializer::<String>::serialize(&value).unwrap();
dbg!(&raw);
assert_eq!(raw.len(), 71);
assert_eq!(value, JsonSerializer::<String>::deserialize(raw).unwrap());
}

#[test]
fn test_bincode_serializer() {
let value = CachedValue::new(Test::new(), Utc::now());
let raw = <BinSerializer>::serialize(&value).unwrap();
assert_eq!(raw.len(), 54);
assert_eq!(value, BinSerializer::<Vec<u8>>::deserialize(raw).unwrap());
}
}
3 changes: 3 additions & 0 deletions hitbox-stretto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
http = "0.2"
lazy_static = "1"

[dev-dependencies]
chrono = { version = "0.4", features = ["serde"] }
100 changes: 70 additions & 30 deletions hitbox-stretto/src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use axum::async_trait;
use hitbox_backend::{
serializer::{JsonSerializer, Serializer},
serializer::{BinSerializer, Serializer},
BackendError, BackendResult, CacheBackend, CacheableResponse, CachedValue, DeleteStatus,
};
use std::time::Duration;
use stretto::AsyncCache;

const COST: i64 = 0;

#[derive(Clone)]
pub struct StrettoBackend {
cache: AsyncCache<String, Vec<u8>>,
pub(crate) cache: AsyncCache<String, Vec<u8>>,
}

impl StrettoBackend {
pub fn new(cache: AsyncCache<String, Vec<u8>>) -> Self {
pub fn builder(cache: AsyncCache<String, Vec<u8>>) -> Self {
Self { cache }
}
}
Expand All @@ -26,12 +24,18 @@ impl CacheBackend for StrettoBackend {
T: CacheableResponse,
<T as CacheableResponse>::Cached: serde::de::DeserializeOwned,
{
let () = self
.cache
.wait()
.await
.map_err(crate::error::Error::from)
.map_err(BackendError::from)?;

match self.cache.get(&key).await {
Some(cached) => Ok(Some(
JsonSerializer::<Vec<u8>>::deserialize(cached.value().to_owned())
.map_err(BackendError::from)
.unwrap(),
)),
Some(cached) => BinSerializer::<Vec<u8>>::deserialize(cached.value().to_owned())
.map_err(BackendError::from)
.map(Some),

None => Ok(None),
}
}
Expand All @@ -46,15 +50,20 @@ impl CacheBackend for StrettoBackend {
T: CacheableResponse + Send,
T::Cached: serde::Serialize + Send + Sync,
{
let serialized =
JsonSerializer::<Vec<u8>>::serialize(&value).map_err(BackendError::from)?;
let serialized = BinSerializer::<Vec<u8>>::serialize(&value).map_err(BackendError::from)?;
let cost = serialized.len();
let inserted = match ttl {
Some(ttl) => {
self.cache
.insert_with_ttl(key, serialized, COST, Duration::from_secs(ttl as u64))
.insert_with_ttl(
key,
serialized,
cost as i64,
Duration::from_secs(ttl as u64),
)
.await
}
None => self.cache.insert(key, serialized, COST).await,
None => self.cache.insert(key, serialized, cost as i64).await,
};
if inserted {
Ok(())
Expand All @@ -65,32 +74,63 @@ impl CacheBackend for StrettoBackend {

async fn delete(&self, key: String) -> BackendResult<DeleteStatus> {
self.cache.remove(&key).await;
Ok(DeleteStatus::Deleted(0))
Ok(DeleteStatus::Deleted(1))
}

async fn start(&self) -> BackendResult<()> {
Ok(())
}
}

#[tokio::test]
async fn test() {
let c: AsyncCache<String, String> = AsyncCache::new(1000, 100, tokio::spawn).unwrap();
#[cfg(test)]
mod test {
use axum::async_trait;
use chrono::Utc;
use serde::{Deserialize, Serialize};

use super::*;
use hitbox_backend::CacheableResponse;

for i in 0..100 {
let key = format!("key-{}", i);
let r = c.insert(key, "value".to_string(), 1).await;
dbg!(r);
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)]
struct Test {
a: i32,
b: String,
}

c.wait().await.unwrap();
#[async_trait]
impl CacheableResponse for Test {
type Cached = Self;

for i in 0..100 {
let key = format!("key-{}", i);
let value = c.get(&key).await;
match value {
Some(v) => dbg!(v.to_string()),
None => dbg!("None".to_string()),
};
async fn into_cached(self) -> Self::Cached {
self
}
async fn from_cached(cached: Self::Cached) -> Self {
cached
}
}

impl Test {
pub fn new() -> Self {
Self {
a: 42,
b: "nope".to_owned(),
}
}
}

#[tokio::test]
async fn test_set_and_get() {
let cache = crate::StrettoBackendBuilder::new(100).finalize().unwrap();
let value = CachedValue::new(Test::new(), Utc::now());
let res = cache.set::<Test>("key-1".to_string(), &value, None).await;
assert!(res.is_ok());
let value = cache
.get::<Test>("key-1".to_string())
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(value.a, 42);
assert_eq!(value.b, "nope".to_owned());
}
}
12 changes: 5 additions & 7 deletions hitbox-stretto/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ type Cache = AsyncCacheBuilder<String, Vec<u8>>;
pub struct StrettoBackendBuilder(Cache);

impl StrettoBackendBuilder {
pub fn new(num_counters: usize, max_cost: i64) -> Self {
Self(AsyncCacheBuilder::new(num_counters, max_cost))
pub fn new(max_size: i64) -> Self {
let num_counters = max_size * 10;
Self(AsyncCacheBuilder::new(num_counters as usize, max_size))
}

pub fn set_buffer_size(self, sz: usize) -> Self {
Expand All @@ -19,18 +20,15 @@ impl StrettoBackendBuilder {
Self(self.0.set_buffer_items(sz))
}

pub fn set_ingore_internal_cost(self, val: bool) -> Self {
Self(self.0.set_ignore_internal_cost(val))
}

pub fn set_cleanup_duration(self, d: Duration) -> Self {
Self(self.0.set_cleanup_duration(d))
}

pub fn finalize(self) -> Result<crate::backend::StrettoBackend, Error> {
self.0
.set_ignore_internal_cost(true)
.finalize(tokio::spawn)
.map(crate::backend::StrettoBackend::new)
.map(|cache| crate::backend::StrettoBackend { cache })
.map_err(Error::from)
}
}

0 comments on commit d26665b

Please sign in to comment.