Skip to content

Commit

Permalink
feat: organize code in modules
Browse files Browse the repository at this point in the history
  • Loading branch information
Cyrix126 committed Jun 14, 2024
1 parent a6fd9de commit c9b68b5
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 266 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Mnemosyne is a http request caching in memory proxy API.
Work in progress, not functional.
### TODO
- [x] allows to limit cache by size
- [x] organize code in modules
- [ ] remove allocation when possible
- [ ] organize code in modules
- [ ] tracing
- [ ] tests
- [ ] benchmarks
Expand Down
115 changes: 115 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::str::FromStr;

use axum::{
body::to_bytes,
extract::{Path, Request, State},
http::{HeaderMap, HeaderValue},
response::IntoResponse,
};
use enclose::enc;
use reqwest::{
header::{ETAG, VARY},
StatusCode,
};
use tokio::spawn;
use uuid::Uuid;

use crate::{
index_cache::{headers_match_vary, IndexCache},
AppState,
};

// handle delete endpoint
// will also delete from index by iterating over the entries to find the method/path
pub async fn delete_entry(
Path(path): Path<String>,
State(state): State<AppState>,
) -> impl IntoResponse {
if let Ok(uuid) = Uuid::from_str(&path) {
state.cache.invalidate(&uuid).await;
state.index_cache.lock().await.delete_uuid_from_index(&uuid);
return StatusCode::OK;
}
StatusCode::NOT_FOUND
}
// handle delete_all endpoint
pub async fn delete_all(State(state): State<AppState>) -> impl IntoResponse {
state.cache.invalidate_all();
*state.index_cache.lock().await = IndexCache::new();
StatusCode::OK
}

// handle request
pub async fn handler(State(state): State<AppState>, request: Request) -> impl IntoResponse {
// check if etag is present in headers
if state.cache.check_etag(request.headers()) {
// respond 304 if etag is present in cache
return StatusCode::NOT_MODIFIED.into_response();
}

// if response is in cache with valid header if any, return response from cache
let index = state.index_cache;
if let Some(uuid) = index.lock().await.request_to_uuid(&request) {
if let Some(rep) = state.cache.get(&uuid).await {
return rep.into_response();
} else {
// present in index_cache but not in cache, it means it was automatically invalidated.
// must update index cache.
index.lock().await.delete_uuid_from_index(&uuid);
}
}

// if not in cache, make the request to backend service
let req_method = request.method().to_owned();
let req_headers = request.headers().to_owned();
let req_uri = request.uri().to_owned();
match state
.client
.request(
request.method().to_owned(),
state.config.to_backend_uri(request.uri()),
)
.headers(request.headers().to_owned())
.body(to_bytes(request.into_body(), usize::MAX).await.unwrap())
.send()
.await
{
Ok(mut rep) => {
// first send Response and then cache so client wait as little as possible.
// need to add Etag headers to response
let uuid = Uuid::new_v4();

let cache = state.cache.clone();
rep.headers_mut()
.insert(ETAG, HeaderValue::from_str(&uuid.to_string()).unwrap());
let headers = rep.headers().to_owned();
let req_headers_match_vary = match headers_match_vary(&req_headers, headers.get(VARY)) {
Ok(h) => h,
Err(_err) => {
// seems backend service response contains malformated header value for Vary
HeaderMap::new()
}
};

let axum_rep = (
rep.status(),
rep.headers().to_owned(),
rep.bytes().await.unwrap(),
);

spawn(enc!((uuid, axum_rep, index) async move {
// add entry to index cache
index.lock().await.add_entry(uuid, req_method, req_uri, req_headers_match_vary);
// add response to cache
cache.insert(uuid, axum_rep).await;

}));
axum_rep.into_response()
}
Err(_err) => {
// the request to the backend failed

StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
50 changes: 50 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::str::FromStr;

use axum::body::Bytes;
use derive_more::{Deref, DerefMut};
use moka::future::Cache as MokaCache;
use reqwest::header::HeaderMap;
use reqwest::StatusCode;
use typesize::TypeSize;
use uuid::Uuid;

use crate::config::Config;
#[derive(Deref, DerefMut, Clone)]
pub struct Cache(pub MokaCache<Uuid, (StatusCode, HeaderMap, Bytes), ahash::RandomState>);

impl Cache {
pub fn new(config: &Config) -> Cache {
Self(
MokaCache::builder()
.name("mnemosyne")
.time_to_idle(config.cache.expiration)
.weigher(
|_key: &Uuid, (s, h, b): &(StatusCode, HeaderMap, Bytes)| -> u32 {
let s = s.to_string().get_size() as u32;
let h = h.iter().fold(0, |acc, x| {
acc + (x.0.to_string().get_size()
+ x.1.to_str().unwrap().to_string().get_size())
as u32
});
let b = b.len() as u32;
// note that the size overhead of the index cache is not taken into account.
// could take about 100B per entry.
s + h + b
},
)
// This cache will hold up to 32MiB of values.
.max_capacity(config.cache.size_limit * 1024 * 1024)
.build_with_hasher(ahash::RandomState::new()),
)
}
pub fn check_etag(&self, headers: &HeaderMap) -> bool {
if let Some(etag) = headers.get("Etag") {
if let Ok(str) = etag.to_str() {
if let Ok(uuid) = Uuid::from_str(str) {
return self.contains_key(&uuid);
}
}
}
false
}
}
18 changes: 18 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
time::Duration,
};

use axum::http::Uri;
use reqwest::Url;
use serde::{Deserialize, Serialize};
/// configuration struct.
Expand Down Expand Up @@ -34,6 +35,23 @@ impl Default for Config {
}
}

impl Config {
pub fn to_backend_uri(&self, uri_request: &Uri) -> Url {
if let Some((endpoint, url)) = self
.endpoints
.iter()
.find(|b| uri_request.to_string().contains(&format!("^{}", b.0)))
{
let new_uri = uri_request.to_string().replace(endpoint, "");
Url::parse(&format!("{}{}", url, new_uri).replace("//", "/"))
.expect("could not parse to Url")
} else {
// no uri recognized, using fallback backend
self.fall_back_endpoint.to_owned()
}
}
}

#[derive(Serialize, Deserialize, Default, Clone)]
pub struct CacheConfig {
/// cache expiration after last request
Expand Down
88 changes: 88 additions & 0 deletions src/index_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use ahash::HashMap;
use ahash::HashMapExt;
use axum::body::Body;
use axum::http::HeaderValue;
use axum::http::{HeaderMap, Request, Uri};
use derive_more::{Deref, DerefMut};
use reqwest::Method;
use uuid::Uuid;
#[derive(Deref, DerefMut, Clone)]
/// IndexCache will store entry for each combination of uri/method with a vec of uuid per HeaderMap. HeaderMap here are request headers that match the headers name in the Vary header value response.
pub struct IndexCache(pub HashMap<(axum::http::Method, Uri), Vec<(Uuid, HeaderMap)>>);

impl IndexCache {
pub fn new() -> Self {
IndexCache(HashMap::new())
}
pub fn add_entry(
&mut self,
uuid: Uuid,
req_method: Method,
req_uri: Uri,
req_headers_match_vary: HeaderMap,
) {
let key = (req_method, req_uri);
let value = (uuid, req_headers_match_vary);
// check if entry exist for method/uri

if let Some(v) = self.get_mut(&key) {
// if entry exist, push into vec
v.push(value);
} else {
// if no entries, create one.
self.insert(key, vec![value]);
}
}
/// will search for an entry in cache based on a request. Will check that request headers includes the ones associated in this entry if any.
/// Will return the uuid of the entry.
pub fn request_to_uuid(&self, request: &Request<Body>) -> Option<Uuid> {
let method = request.method().to_owned();
let uri = request.uri().to_owned();
let headermap = request.headers();
if let Some(uuids) = self.get(&(method, uri)) {
return uuids
.iter()
.find(|(_, headermap_object)| {
headermap_object
.iter()
.all(|x| headermap.get(x.0).is_some_and(|value| value == x.1))
})
.map(|v| v.0);
}
None
}
pub fn delete_uuid_from_index(&mut self, uuid: &Uuid) {
// remove uuid entry from vec
self.iter_mut().for_each(|v| v.1.retain(|c| &c.0 != uuid));
// check if the entry for method/uri is now empty and delete it if that's the case.
let key = self.iter().find_map(|(key, value)| {
if value.is_empty() {
Some(key.to_owned())
} else {
None
}
});
if let Some(key) = key {
self.remove(&key);
}
}
}
/// from a request, keep only headers that are present in Vary response header
pub fn headers_match_vary(
request_headers: &HeaderMap,
vary_header: Option<&HeaderValue>,
) -> anyhow::Result<HeaderMap> {
if let Some(vary) = vary_header {
let mut h_vary = vary.to_str()?.split(',');
let mut headers = HeaderMap::new();
request_headers
.iter()
.filter(|h_req| h_vary.any(|name| name == h_req.0.as_str()))
.for_each(|header| {
headers.insert(header.0, header.1.clone());
});
Ok(headers)
} else {
Ok(HeaderMap::new())
}
}
Loading

0 comments on commit c9b68b5

Please sign in to comment.