Skip to content

Commit

Permalink
Query caching initial support
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 6, 2023
1 parent 384e664 commit 835b7d8
Show file tree
Hide file tree
Showing 8 changed files with 588 additions and 11 deletions.
6 changes: 6 additions & 0 deletions examples/caching_memory_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.fluent()
.select()
.from(TEST_COLLECTION_NAME)
.filter(|q| {
q.for_all(
q.field(path!(MyTestStructure::some_num))
.greater_than_or_equal(2),
)
})
.obj::<MyTestStructure>()
.stream_query_with_errors()
.await?;
Expand Down
6 changes: 6 additions & 0 deletions examples/caching_persistent_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.fluent()
.select()
.from(TEST_COLLECTION_NAME)
.filter(|q| {
q.for_all(
q.field(path!(MyTestStructure::some_num))
.greater_than_or_equal(250),
)
})
.obj::<MyTestStructure>()
.stream_query_with_errors()
.await?;
Expand Down
51 changes: 40 additions & 11 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use chrono::Utc;
use futures::stream::BoxStream;
use moka::future::{Cache, CacheBuilder};

use crate::cache::cache_query_engine::FirestoreCacheQueryEngine;
use futures::TryStreamExt;
use futures::{future, StreamExt};
use std::collections::HashMap;
use tracing::*;

Expand Down Expand Up @@ -99,6 +101,40 @@ impl FirestoreMemoryCacheBackend {
}
Ok(())
}

async fn query_cached_docs(
&self,
collection_path: &str,
query_engine: FirestoreCacheQueryEngine,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>> {
match self.collection_caches.get(collection_path) {
Some(mem_cache) => Ok(Box::pin(
futures::stream::unfold(
(query_engine, mem_cache.iter()),
|(query_engine, mut iter)| async move {
match iter.next() {
Some((_, doc)) => {
if query_engine.matches_doc(&doc) {
Some((Ok(Some(doc)), (query_engine, iter)))
} else {
Some((Ok(None), (query_engine, iter)))
}
}
None => None,
}
},
)
.filter_map(|doc_res| {
future::ready(match doc_res {
Ok(Some(doc)) => Some(Ok(doc)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}),
)),
None => Ok(Box::pin(futures::stream::empty())),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -221,18 +257,11 @@ impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend {
collection_path: &str,
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>> {
// For now only basic/simple query all supported
if query.all_descendants.iter().all(|x| !*x)
&& query.order_by.is_none()
&& query.filter.is_none()
&& query.start_at.is_none()
&& query.end_at.is_none()
&& query.offset.is_none()
&& query.limit.is_none()
&& query.return_only_fields.is_none()
{
let simple_query_engine = FirestoreCacheQueryEngine::new(query);
if simple_query_engine.params_supported() {
Ok(FirestoreCachedValue::UseCached(
self.list_all_docs(collection_path).await?,
self.query_cached_docs(collection_path, simple_query_engine)
.await?,
))
} else {
Ok(FirestoreCachedValue::SkipCache)
Expand Down
Loading

0 comments on commit 835b7d8

Please sign in to comment.