diff --git a/examples/caching_persistent_collections.rs b/examples/caching_persistent_collections.rs index 13fbec2..869a6e9 100644 --- a/examples/caching_persistent_collections.rs +++ b/examples/caching_persistent_collections.rs @@ -172,14 +172,18 @@ async fn main() -> Result<(), Box> { }) .order_by([( path!(MyTestStructure::some_num), - FirestoreQueryDirection::Descending, + FirestoreQueryDirection::Ascending, )]) .obj::() .stream_query_with_errors() .await?; let queried_items = all_items_stream.try_collect::>().await?; - println!("{:?}", queried_items.len()); + println!( + "{:?} {:?}...", + queried_items.len(), + queried_items.iter().take(5).collect::>() + ); cache.shutdown().await?; diff --git a/src/cache/backends/memory_backend.rs b/src/cache/backends/memory_backend.rs index 6cecc7f..14bca00 100644 --- a/src/cache/backends/memory_backend.rs +++ b/src/cache/backends/memory_backend.rs @@ -134,11 +134,9 @@ impl FirestoreMemoryCacheBackend { }), ); - if query_engine.query.order_by.is_some() { - Ok(query_engine.sort_stream(filtered_stream).await?) - } else { - Ok(filtered_stream) - } + let output_stream = query_engine.process_query_stream(filtered_stream).await?; + + Ok(output_stream) } None => Ok(Box::pin(futures::stream::empty())), } diff --git a/src/cache/backends/persistent_backend.rs b/src/cache/backends/persistent_backend.rs index 0c7d9a7..cf2c00a 100644 --- a/src/cache/backends/persistent_backend.rs +++ b/src/cache/backends/persistent_backend.rs @@ -224,12 +224,9 @@ impl FirestorePersistentCacheBackend { } let filtered_stream = Box::pin(futures::stream::iter(docs)); + let output_stream = query_engine.process_query_stream(filtered_stream).await?; - if query_engine.query.order_by.is_some() { - Ok(query_engine.sort_stream(filtered_stream).await?) - } else { - Ok(filtered_stream) - } + Ok(output_stream) } } diff --git a/src/cache/cache_query_engine.rs b/src/cache/cache_query_engine.rs index f602170..ea4a11c 100644 --- a/src/cache/cache_query_engine.rs +++ b/src/cache/cache_query_engine.rs @@ -1,7 +1,8 @@ use crate::cache::cache_filter_engine::FirestoreCacheFilterEngine; use crate::*; use futures::stream::BoxStream; -use futures::{StreamExt, TryStreamExt}; +use futures::stream::StreamExt; +use futures::{future, TryStreamExt}; use std::cmp::Ordering; #[derive(Clone)] @@ -18,11 +19,6 @@ impl FirestoreCacheQueryEngine { pub fn params_supported(&self) -> bool { self.query.all_descendants.iter().all(|x| !*x) - && self.query.start_at.is_none() - && self.query.end_at.is_none() - && self.query.offset.is_none() - && self.query.limit.is_none() - && self.query.return_only_fields.is_none() } pub fn matches_doc(&self, doc: &FirestoreDocument) -> bool { @@ -84,4 +80,201 @@ impl FirestoreCacheQueryEngine { Ok(input) } } + + pub async fn limit_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(limit) = self.query.limit { + Ok(input + .scan(0_u32, move |index, doc| { + if *index < limit { + *index += 1; + future::ready(Some(doc)) + } else { + future::ready(None) + } + }) + .boxed()) + } else { + Ok(input) + } + } + + pub async fn offset_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(offset) = self.query.offset { + Ok(input.skip(offset as usize).boxed()) + } else { + Ok(input) + } + } + + pub async fn start_at_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(start_at) = &self.query.start_at { + if let Some(order_by) = &self.query.order_by { + let start_at = start_at.clone(); + let order_by = order_by.clone(); + Ok(input + .skip_while(move |doc_res| match doc_res { + Ok(doc) => match &start_at { + FirestoreQueryCursor::BeforeValue(values) => { + let result = values.iter().zip(&order_by).any( + |(value, ordered_field)| { + let order_by_comp = match ordered_field.direction { + FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThan, + FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThan + }; + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + order_by_comp, + field_a, + field_b, + ) + } + (_, _) => false, + } + }, + ); + future::ready(result) + } + FirestoreQueryCursor::AfterValue(values) => { + let result = values.iter().zip(&order_by).any( + |(value, ordered_field)| { + let order_by_comp = match ordered_field.direction { + FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThanOrEqual, + FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThanOrEqual + }; + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + order_by_comp, + field_a, + field_b, + ) + } + (_, _) => false, + } + }, + ); + future::ready(result) + } + }, + Err(_) => future::ready(false), + }) + .boxed()) + } else { + Ok(input) + } + } else { + Ok(input) + } + } + + pub async fn end_at_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(end_at) = &self.query.end_at { + if let Some(order_by) = &self.query.order_by { + let end_at = end_at.clone(); + let order_by = order_by.clone(); + Ok(input + .take_while(move |doc_res| match doc_res { + Ok(doc) => match &end_at { + FirestoreQueryCursor::BeforeValue(values) => { + let result = values.iter().zip(&order_by).any( + |(value, ordered_field)| { + let order_by_comp = match ordered_field.direction { + FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThan, + FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThan + }; + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + order_by_comp, + field_a, + field_b, + ) + } + (_, _) => false, + } + }, + ); + future::ready(result) + } + FirestoreQueryCursor::AfterValue(values) => { + let result = values.iter().zip(&order_by).any( + |(value, ordered_field)| { + let order_by_comp = match ordered_field.direction { + FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThanOrEqual, + FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThanOrEqual + }; + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + order_by_comp, + field_a, + field_b, + ) + } + (_, _) => false, + } + }, + ); + future::ready(result) + } + }, + Err(_) => future::ready(false), + }) + .boxed()) + } else { + Ok(input) + } + } else { + Ok(input) + } + } + + pub async fn process_query_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + let input = self.sort_stream(input).await?; + let input = self.limit_stream(input).await?; + let input = self.offset_stream(input).await?; + let input = self.start_at_stream(input).await?; + let input = self.end_at_stream(input).await?; + Ok(input) + } }