From 70e8efee0b7cb57a96015616cb41e0b2d4d410c7 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Fri, 6 Oct 2023 19:40:31 +0200 Subject: [PATCH] Query caching support --- README.md | 3 ++ examples/caching_persistent_collections.rs | 4 ++ src/cache/backends/memory_backend.rs | 52 +++++++++++-------- src/cache/backends/persistent_backend.rs | 45 +++++++++++++---- src/cache/cache_filter_engine.rs | 40 +++++++-------- src/cache/cache_query_engine.rs | 58 +++++++++++++++++++++- src/firestore_document_functions.rs | 5 +- tests/caching_persistent_test.rs | 26 ++++++++++ 8 files changed, 176 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index ca3bcc7..9243163 100644 --- a/README.md +++ b/README.md @@ -608,6 +608,9 @@ Caching works on the document level. The cache will be used for the following operations: - Reading documents by IDs (get and batch get); - Listing all documents in a collection; +- Partial support for querying documents in a collection: + - Filtering; + - Ordering; (Caching other operations may be extended in the future). diff --git a/examples/caching_persistent_collections.rs b/examples/caching_persistent_collections.rs index 4650a6e..d9794ab 100644 --- a/examples/caching_persistent_collections.rs +++ b/examples/caching_persistent_collections.rs @@ -170,6 +170,10 @@ async fn main() -> Result<(), Box> { .greater_than_or_equal(250), ) }) + .order_by([( + path!(MyTestStructure::some_num), + FirestoreQueryDirection::Ascending, + )]) .obj::() .stream_query_with_errors() .await?; diff --git a/src/cache/backends/memory_backend.rs b/src/cache/backends/memory_backend.rs index 33fabf7..6cecc7f 100644 --- a/src/cache/backends/memory_backend.rs +++ b/src/cache/backends/memory_backend.rs @@ -108,30 +108,38 @@ impl FirestoreMemoryCacheBackend { query_engine: FirestoreCacheQueryEngine, ) -> FirestoreResult>> { match self.collection_caches.get(collection_path) { - Some(mem_cache) => Ok(Box::pin( - futures::stream::unfold( - (query_engine, mem_cache.iter()), - |(query_engine, mut iter)| async move { - match iter.next() { - Some((_, doc)) => { - if query_engine.matches_doc(&doc) { - Some((Ok(Some(doc)), (query_engine, iter))) - } else { - Some((Ok(None), (query_engine, iter))) + Some(mem_cache) => { + let filtered_stream = Box::pin( + futures::stream::unfold( + (query_engine.clone(), mem_cache.iter()), + |(query_engine, mut iter)| async move { + match iter.next() { + Some((_, doc)) => { + if query_engine.matches_doc(&doc) { + Some((Ok(Some(doc)), (query_engine, iter))) + } else { + Some((Ok(None), (query_engine, iter))) + } } + None => None, } - None => None, - } - }, - ) - .filter_map(|doc_res| { - future::ready(match doc_res { - Ok(Some(doc)) => Some(Ok(doc)), - Ok(None) => None, - Err(err) => Some(Err(err)), - }) - }), - )), + }, + ) + .filter_map(|doc_res| { + future::ready(match doc_res { + Ok(Some(doc)) => Some(Ok(doc)), + Ok(None) => None, + Err(err) => Some(Err(err)), + }) + }), + ); + + if query_engine.query.order_by.is_some() { + Ok(query_engine.sort_stream(filtered_stream).await?) + } else { + Ok(filtered_stream) + } + } None => Ok(Box::pin(futures::stream::empty())), } } diff --git a/src/cache/backends/persistent_backend.rs b/src/cache/backends/persistent_backend.rs index c23564e..0c7d9a7 100644 --- a/src/cache/backends/persistent_backend.rs +++ b/src/cache/backends/persistent_backend.rs @@ -3,6 +3,7 @@ use crate::*; use async_trait::async_trait; use futures::stream::BoxStream; +use crate::cache::cache_query_engine::FirestoreCacheQueryEngine; use chrono::Utc; use futures::StreamExt; use gcloud_sdk::google::firestore::v1::Document; @@ -200,6 +201,36 @@ impl FirestorePersistentCacheBackend { let len = read_tx.open_table(td)?.len()?; Ok(len) } + + async fn query_cached_docs( + &self, + collection_path: &str, + query_engine: FirestoreCacheQueryEngine, + ) -> FirestoreResult>> { + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path); + + let read_tx = self.redb.begin_read()?; + let table = read_tx.open_table(td)?; + let iter = table.iter()?; + + // It seems there is no way to work with streaming for redb, so this is not efficient + let mut docs: Vec> = Vec::new(); + for record in iter { + let (_, v) = record?; + let doc = Self::buf_to_document(v.value())?; + if query_engine.matches_doc(&doc) { + docs.push(Ok(doc)); + } + } + + let filtered_stream = Box::pin(futures::stream::iter(docs)); + + if query_engine.query.order_by.is_some() { + Ok(query_engine.sort_stream(filtered_stream).await?) + } else { + Ok(filtered_stream) + } + } } #[async_trait] @@ -338,17 +369,11 @@ impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend { query: &FirestoreQueryParams, ) -> FirestoreResult>>> { // For now only basic/simple query all supported - if query.all_descendants.iter().all(|x| !*x) - && query.order_by.is_none() - && query.filter.is_none() - && query.start_at.is_none() - && query.end_at.is_none() - && query.offset.is_none() - && query.limit.is_none() - && query.return_only_fields.is_none() - { + let simple_query_engine = FirestoreCacheQueryEngine::new(query); + if simple_query_engine.params_supported() { Ok(FirestoreCachedValue::UseCached( - self.list_all_docs(collection_path).await?, + self.query_cached_docs(collection_path, simple_query_engine) + .await?, )) } else { Ok(FirestoreCachedValue::SkipCache) diff --git a/src/cache/cache_filter_engine.rs b/src/cache/cache_filter_engine.rs index a5fa2a7..dfc8092 100644 --- a/src/cache/cache_filter_engine.rs +++ b/src/cache/cache_filter_engine.rs @@ -11,7 +11,7 @@ impl<'a> FirestoreCacheFilterEngine<'a> { } pub fn matches_doc(&'a self, doc: &FirestoreDocument) -> bool { - Self::matches_doc_filter(doc, &self.filter) + Self::matches_doc_filter(doc, self.filter) } pub fn matches_doc_filter(doc: &FirestoreDocument, filter: &FirestoreQueryFilter) -> bool { @@ -63,17 +63,21 @@ impl<'a> FirestoreCacheFilterEngine<'a> { } FirestoreQueryFilterUnary::IsNull(field_path) => { firestore_doc_get_field_by_path(doc, field_path) - .map(|field_value| match field_value { - gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_) => true, - _ => false, + .map(|field_value| { + matches!( + field_value, + gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_) + ) }) .unwrap_or(true) } FirestoreQueryFilterUnary::IsNotNull(field_path) => { firestore_doc_get_field_by_path(doc, field_path) - .map(|field_value| match field_value { - gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_) => false, - _ => true, + .map(|field_value| { + !matches!( + field_value, + gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_) + ) }) .unwrap_or(false) } @@ -240,7 +244,7 @@ impl<'a> FirestoreCacheFilterEngine<'a> { } } -enum CompareOp { +pub(super) enum CompareOp { Equal, NotEqual, LessThan, @@ -253,7 +257,7 @@ enum CompareOp { NotIn, } -fn compare_values( +pub(super) fn compare_values( op: CompareOp, a: &gcloud_sdk::google::firestore::v1::value::ValueType, b: &gcloud_sdk::google::firestore::v1::value::ValueType, @@ -404,14 +408,12 @@ fn compare_values( ) => a_val .values .iter() - .map(|v| &v.value_type) - .flatten() + .flat_map(|v| &v.value_type) .any(|a_val| { b_val .values .iter() - .map(|v| &v.value_type) - .flatten() + .flat_map(|v| &v.value_type) .all(|b_val| compare_values(CompareOp::Equal, a_val, b_val)) }), @@ -422,14 +424,12 @@ fn compare_values( ) => a_val .values .iter() - .map(|v| &v.value_type) - .flatten() + .flat_map(|v| &v.value_type) .any(|a_val| { b_val .values .iter() - .map(|v| &v.value_type) - .flatten() + .flat_map(|v| &v.value_type) .any(|b_val| compare_values(CompareOp::Equal, a_val, b_val)) }), @@ -440,8 +440,7 @@ fn compare_values( ) => a_val .values .iter() - .map(|v| &v.value_type) - .flatten() + .flat_map(|v| &v.value_type) .any(|a_val| compare_values(CompareOp::Equal, a_val, b_val)), ( @@ -451,8 +450,7 @@ fn compare_values( ) => a_val .values .iter() - .map(|v| &v.value_type) - .flatten() + .flat_map(|v| &v.value_type) .any(|a_val| !compare_values(CompareOp::Equal, a_val, b_val)), // Any other combinations result in false diff --git a/src/cache/cache_query_engine.rs b/src/cache/cache_query_engine.rs index ecede2d..0ad77b5 100644 --- a/src/cache/cache_query_engine.rs +++ b/src/cache/cache_query_engine.rs @@ -1,8 +1,12 @@ use crate::cache::cache_filter_engine::FirestoreCacheFilterEngine; use crate::*; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use std::cmp::Ordering; +#[derive(Clone)] pub struct FirestoreCacheQueryEngine { - query: FirestoreQueryParams, + pub query: FirestoreQueryParams, } impl FirestoreCacheQueryEngine { @@ -30,4 +34,56 @@ impl FirestoreCacheQueryEngine { true } } + + pub async fn sort_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(order_by) = &self.query.order_by { + let mut collected: Vec = input.try_collect().await?; + + collected.sort_by(|doc_a, doc_b| { + let mut current_ordering = Ordering::Equal; + for sort_field in order_by { + match ( + firestore_doc_get_field_by_path(doc_a, &sort_field.field_name), + firestore_doc_get_field_by_path(doc_b, &sort_field.field_name), + ) { + (Some(field_a), Some(field_b)) => { + if cache::cache_filter_engine::compare_values( + cache::cache_filter_engine::CompareOp::Equal, + field_a, + field_b, + ) { + continue; + } + + if cache::cache_filter_engine::compare_values( + cache::cache_filter_engine::CompareOp::LessThan, + field_a, + field_b, + ) { + current_ordering = match sort_field.direction { + FirestoreQueryDirection::Ascending => Ordering::Less, + FirestoreQueryDirection::Descending => Ordering::Greater, + } + } else { + current_ordering = match sort_field.direction { + FirestoreQueryDirection::Ascending => Ordering::Greater, + FirestoreQueryDirection::Descending => Ordering::Less, + } + } + } + (None, None) => current_ordering = Ordering::Equal, + (None, Some(_)) => current_ordering = Ordering::Equal, + (Some(_), None) => current_ordering = Ordering::Equal, + } + } + current_ordering + }); + Ok(futures::stream::iter(collected.into_iter().map(Ok)).boxed()) + } else { + Ok(input) + } + } } diff --git a/src/firestore_document_functions.rs b/src/firestore_document_functions.rs index d1ee983..1d58780 100644 --- a/src/firestore_document_functions.rs +++ b/src/firestore_document_functions.rs @@ -6,9 +6,8 @@ pub fn firestore_doc_get_field_by_path<'d>( field_path: &str, ) -> Option<&'d gcloud_sdk::google::firestore::v1::value::ValueType> { let field_path: Vec = field_path - .split(".") - .into_iter() - .map(|s| s.to_string().replace("`", "")) + .split('.') + .map(|s| s.to_string().replace('`', "")) .collect(); firestore_doc_get_field_by_path_arr(&doc.fields, &field_path) } diff --git a/tests/caching_persistent_test.rs b/tests/caching_persistent_test.rs index d100b1a..c301d6e 100644 --- a/tests/caching_persistent_test.rs +++ b/tests/caching_persistent_test.rs @@ -10,6 +10,7 @@ use firestore::*; #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] struct MyTestStructure { some_id: String, + some_num: u64, some_string: String, } @@ -28,6 +29,7 @@ async fn precondition_tests() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box() + .stream_query_with_errors() + .await?; + + let queried_items = queried.try_collect::>().await?; + assert_eq!(queried_items.len(), 5); + assert_eq!(queried_items.first().map(|d| d.some_num), Some(9)); + cache.shutdown().await?; Ok(())