Skip to content

Commit

Permalink
Merge pull request #92 from bkhalifeh/main
Browse files Browse the repository at this point in the history
feat: implement bb8-redis
  • Loading branch information
genusistimelord authored Oct 4, 2024
2 parents 1802d86 + 7392205 commit e0428c9
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 9 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"databases/mongo",
"databases/redispool",
"databases/surreal",
"databases/redis-bb8-pool",
]

[package]
Expand Down Expand Up @@ -47,15 +48,15 @@ futures = "0.3.30"
bytes = "1.7.1"
dashmap = "6.0.1"
aes-gcm = "0.10.3"
base64 = "0.22.1"
rand = "0.8.5"
base64 = "0.22.1"
rand = "0.8.5"
hmac = "0.12.1"
sha2 = "0.10.8"
forwarded-header-value = "0.1.1"
fastbloom-rs = { version = "0.5.9", optional = true }

[workspace.dependencies]
axum_session = { version = "0.14.0", path = "./"}
axum_session = { version = "0.14.0", path = "./" }
chrono = { version = "0.4.38", default-features = false, features = [
"clock",
"serde",
Expand All @@ -64,13 +65,10 @@ async-trait = "0.1.81"
tokio = { version = "1.39.3", features = ["full", "tracing"] }
serde = { version = "1.0.208", features = ["derive"] }
serde_json = "1.0.125"
axum = "0.7.5"
axum = "0.7.5"

[package.metadata.docs.rs]
features = [
"key-store",
"advanced",
]
features = ["key-store", "advanced"]
rustdoc-args = ["--document-private-items"]

[dev-dependencies]
Expand All @@ -88,7 +86,7 @@ redis_pool = "0.6.0"
redis = { version = "0.27.2" }
tower = "0.4.13"
http-body-util = "0.1.0"
axum_session_sqlx = { path = "./databases/sqlx", features = ["sqlite"]}
axum_session_sqlx = { path = "./databases/sqlx", features = ["sqlite"] }
axum_session_surreal = { path = "./databases/surreal" }
axum_session_redispool = { path = "./databases/redispool" }
axum_session.workspace = true
Expand Down
17 changes: 17 additions & 0 deletions databases/redis-bb8-pool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "axum_session_redis_bb8_pool"
version = "0.1.0"
authors = ["Behzad Khalifeh <[email protected]>"]
description = "📝 RedisPool (BB8) Database layer for axum_session."
edition = "2021"
license = "MIT"
readme = "README.md"
documentation = "https://docs.rs/axum_session_redispool"
keywords = ["Axum", "Tower", "Redis", "Session", "BB8"]
repository = "https://github.com/AscendingCreations/AxumSession"

[dependencies]
async-trait.workspace = true
redis = { version = "0.27.2", features = ["aio", "tokio-comp"] }
bb8-redis = "0.17.0"
axum_session.workspace = true
Empty file.
8 changes: 8 additions & 0 deletions databases/redis-bb8-pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#![doc = include_str!("../README.md")]
#![allow(dead_code)]
#![warn(clippy::all, nonstandard_style, future_incompatible)]
#![forbid(unsafe_code)]

mod redis_bb8_pool;
pub use self::redis_bb8_pool::*;
pub(crate) mod redis_bb8_tools;
207 changes: 207 additions & 0 deletions databases/redis-bb8-pool/src/redis_bb8_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use async_trait::async_trait;
use axum_session::{DatabaseError, DatabasePool, Session, SessionStore};
use bb8_redis::{bb8::Pool, RedisConnectionManager};
///Redis's Session Helper type for the DatabasePool.
pub type SessionRedisSession = Session<SessionRedisPool>;
///Redis's Session Store Helper type for the DatabasePool.
pub type SessionRedisSessionStore = SessionStore<SessionRedisPool>;

type SingleRedisPool = Pool<RedisConnectionManager>;

///Redis's Pool type for the DatabasePool. Needs a redis Client.
#[derive(Clone)]
pub struct SessionRedisPool {
pool: SingleRedisPool,
}

impl From<SingleRedisPool> for SessionRedisPool {
fn from(pool: SingleRedisPool) -> Self {
SessionRedisPool { pool }
}
}

impl std::fmt::Debug for SessionRedisPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionRedisPool").finish()
}
}

#[async_trait]
impl DatabasePool for SessionRedisPool {
async fn initiate(&self, _table_name: &str) -> Result<(), DatabaseError> {
// Redis does not actually use Tables so there is no way we can make one.
Ok(())
}

async fn delete_by_expiry(&self, _table_name: &str) -> Result<Vec<String>, DatabaseError> {
// Redis does this for use using the Expiry Options.
Ok(Vec::new())
}

async fn count(&self, table_name: &str) -> Result<i64, DatabaseError> {
let mut con = match self.pool.get().await {
Ok(v) => v,
Err(err) => return Err(DatabaseError::GenericAquire(err.to_string())),
};

let count: i64 = if table_name.is_empty() {
match redis::cmd("DBSIZE").query_async(&mut *con).await {
Ok(v) => v,
Err(err) => return Err(DatabaseError::GenericSelectError(err.to_string())),
}
} else {
// Assuming we have a table name, we need to count all the keys that match the table name.
// We can't use DBSIZE because that would count all the keys in the database.
let keys =
match super::redis_bb8_tools::scan_keys(&mut *con, &format!("{}:*", table_name))
.await
{
Ok(v) => v,
Err(err) => return Err(DatabaseError::GenericSelectError(err.to_string())),
};
keys.len() as i64
};

Ok(count)
}

async fn store(
&self,
id: &str,
session: &str,
expires: i64,
table_name: &str,
) -> Result<(), DatabaseError> {
let id = if table_name.is_empty() {
id.to_string()
} else {
format!("{}:{}", table_name, id)
};
let mut con = self
.pool
.get()
.await
.map_err(|err| DatabaseError::GenericAquire(err.to_string()))?;
redis::pipe()
.atomic() //makes this a transation.
.set(&id, session)
.ignore()
.expire_at(&id, expires)
.ignore()
.query_async::<()>(&mut *con)
.await
.map_err(|err| DatabaseError::GenericSelectError(err.to_string()))?;
Ok(())
}

async fn load(&self, id: &str, table_name: &str) -> Result<Option<String>, DatabaseError> {
let mut con = self
.pool
.get()
.await
.map_err(|err| DatabaseError::GenericAquire(err.to_string()))?;
let id = if table_name.is_empty() {
id.to_string()
} else {
format!("{}:{}", table_name, id)
};
let result: String = redis::cmd("GET")
.arg(id)
.query_async(&mut *con)
.await
.map_err(|err| DatabaseError::GenericSelectError(err.to_string()))?;
Ok(Some(result))
}

async fn delete_one_by_id(&self, id: &str, table_name: &str) -> Result<(), DatabaseError> {
let mut con = self
.pool
.get()
.await
.map_err(|err| DatabaseError::GenericAquire(err.to_string()))?;
let id = if table_name.is_empty() {
id.to_string()
} else {
format!("{}:{}", table_name, id)
};
redis::cmd("DEL")
.arg(id)
.query_async::<()>(&mut *con)
.await
.map_err(|err| DatabaseError::GenericDeleteError(err.to_string()))?;
Ok(())
}

async fn exists(&self, id: &str, table_name: &str) -> Result<bool, DatabaseError> {
let mut con = self
.pool
.get()
.await
.map_err(|err| DatabaseError::GenericAquire(err.to_string()))?;
let id = if table_name.is_empty() {
id.to_string()
} else {
format!("{}:{}", table_name, id)
};
let exists: bool = redis::cmd("EXISTS")
.arg(id)
.query_async(&mut *con)
.await
.map_err(|err| DatabaseError::GenericSelectError(err.to_string()))?;

Ok(exists)
}

async fn delete_all(&self, table_name: &str) -> Result<(), DatabaseError> {
let mut con = self
.pool
.get()
.await
.map_err(|err| DatabaseError::GenericAquire(err.to_string()))?;
if table_name.is_empty() {
redis::cmd("FLUSHDB")
.query_async::<()>(&mut *con)
.await
.map_err(|err| DatabaseError::GenericDeleteError(err.to_string()))?;
} else {
// Assuming we have a table name, we need to delete all the keys that match the table name.
// We can't use FLUSHDB because that would delete all the keys in the database.
let keys = super::redis_bb8_tools::scan_keys(&mut con, &format!("{}:*", table_name))
.await
.map_err(|err| DatabaseError::GenericSelectError(err.to_string()))?;

for key in keys {
redis::cmd("DEL")
.arg(key)
.query_async::<()>(&mut *con)
.await
.map_err(|err| DatabaseError::GenericDeleteError(err.to_string()))?;
}
}

Ok(())
}

async fn get_ids(&self, table_name: &str) -> Result<Vec<String>, DatabaseError> {
let mut con = self
.pool
.get()
.await
.map_err(|err| DatabaseError::GenericAquire(err.to_string()))?;
let table_name = if table_name.is_empty() {
"*".to_string()
} else {
format!("{}:0", table_name)
};

let result: Vec<String> =
super::redis_bb8_tools::scan_keys(&mut *con, &format!("{}:*", table_name))
.await
.map_err(|err| DatabaseError::GenericSelectError(err.to_string()))?;
Ok(result)
}

fn auto_handles_expiry(&self) -> bool {
true
}
}
33 changes: 33 additions & 0 deletions databases/redis-bb8-pool/src/redis_bb8_tools.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use axum_session::DatabaseError;

pub async fn scan_keys(
con: &mut redis::aio::MultiplexedConnection,
pattern: &str,
) -> Result<Vec<String>, DatabaseError> {
// SCAN works like KEYS but it is safe to use in production.
// Instead of blocking the server, it will only return a small
// amount of keys per iteration.
// https://redis.io/commands/scan

let mut keys: Vec<String> = Vec::new();
let mut cursor: i32 = 0;

loop {
let (new_cursor, new_keys): (i32, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(pattern)
.query_async(con)
.await
.map_err(|err| DatabaseError::GenericSelectError(err.to_string()))?;

keys.extend(new_keys);

cursor = new_cursor;
if cursor == 0 {
break;
}
}

Ok(keys)
}

0 comments on commit e0428c9

Please sign in to comment.