diff --git a/src/cache/backends/memory_backend.rs b/src/cache/backends/memory_backend.rs index 6cecc7f..c88473e 100644 --- a/src/cache/backends/memory_backend.rs +++ b/src/cache/backends/memory_backend.rs @@ -134,11 +134,13 @@ impl FirestoreMemoryCacheBackend { }), ); - if query_engine.query.order_by.is_some() { - Ok(query_engine.sort_stream(filtered_stream).await?) - } else { - Ok(filtered_stream) - } + let ordered_stream = query_engine.sort_stream(filtered_stream).await?; + let limited_stream = query_engine.limit_stream(ordered_stream).await?; + let offset_stream = query_engine.offset_stream(limited_stream).await?; + let start_at_stream = query_engine.start_at_stream(offset_stream).await?; + let end_at_stream = query_engine.end_at_stream(start_at_stream).await?; + + Ok(end_at_stream) } None => Ok(Box::pin(futures::stream::empty())), } diff --git a/src/cache/backends/persistent_backend.rs b/src/cache/backends/persistent_backend.rs index 0c7d9a7..dd758bb 100644 --- a/src/cache/backends/persistent_backend.rs +++ b/src/cache/backends/persistent_backend.rs @@ -225,11 +225,13 @@ impl FirestorePersistentCacheBackend { let filtered_stream = Box::pin(futures::stream::iter(docs)); - if query_engine.query.order_by.is_some() { - Ok(query_engine.sort_stream(filtered_stream).await?) - } else { - Ok(filtered_stream) - } + let ordered_stream = query_engine.sort_stream(filtered_stream).await?; + let limited_stream = query_engine.limit_stream(ordered_stream).await?; + let offset_stream = query_engine.offset_stream(limited_stream).await?; + let start_at_stream = query_engine.start_at_stream(offset_stream).await?; + let end_at_stream = query_engine.end_at_stream(start_at_stream).await?; + + Ok(end_at_stream) } } diff --git a/src/cache/cache_query_engine.rs b/src/cache/cache_query_engine.rs index f602170..093581b 100644 --- a/src/cache/cache_query_engine.rs +++ b/src/cache/cache_query_engine.rs @@ -1,7 +1,8 @@ use crate::cache::cache_filter_engine::FirestoreCacheFilterEngine; use crate::*; use futures::stream::BoxStream; -use futures::{StreamExt, TryStreamExt}; +use futures::stream::StreamExt; +use futures::{future, TryStreamExt}; use std::cmp::Ordering; #[derive(Clone)] @@ -18,11 +19,6 @@ impl FirestoreCacheQueryEngine { pub fn params_supported(&self) -> bool { self.query.all_descendants.iter().all(|x| !*x) - && self.query.start_at.is_none() - && self.query.end_at.is_none() - && self.query.offset.is_none() - && self.query.limit.is_none() - && self.query.return_only_fields.is_none() } pub fn matches_doc(&self, doc: &FirestoreDocument) -> bool { @@ -84,4 +80,165 @@ impl FirestoreCacheQueryEngine { Ok(input) } } + + pub async fn limit_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(limit) = self.query.limit { + Ok(input + .scan(0_u32, move |index, doc| { + if *index < limit { + *index += 1; + future::ready(Some(doc)) + } else { + future::ready(None) + } + }) + .boxed()) + } else { + Ok(input) + } + } + + pub async fn offset_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(offset) = self.query.offset { + Ok(input.skip(offset as usize).boxed()) + } else { + Ok(input) + } + } + + pub async fn start_at_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(start_at) = &self.query.start_at { + if let Some(order_by) = &self.query.order_by { + let start_at = start_at.clone(); + let order_by = order_by.clone(); + Ok(input + .skip_while(move |doc_res| match doc_res { + Ok(doc) => match &start_at { + FirestoreQueryCursor::BeforeValue(values) => { + let result = values.iter().zip(&order_by).any(|(value, ordered_field)| { + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + cache::cache_filter_engine::CompareOp::GreaterThanOrEqual, + field_a, + field_b, + ) + } + (_, _) => false, + } + }); + future::ready(result) + }, + FirestoreQueryCursor::AfterValue(values) => { + let result = values.iter().zip(&order_by).any(|(value, ordered_field)| { + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + cache::cache_filter_engine::CompareOp::LessThan, + field_a, + field_b, + ) + } + (_, _) => false, + } + }); + future::ready(result) + } + }, + Err(_) => future::ready(false), + }) + .boxed()) + } else { + Ok(input) + } + } else { + Ok(input) + } + } + + pub async fn end_at_stream<'a, 'b>( + &'a self, + input: BoxStream<'b, FirestoreResult>, + ) -> FirestoreResult>> { + if let Some(end_at) = &self.query.end_at { + if let Some(order_by) = &self.query.order_by { + let end_at = end_at.clone(); + let order_by = order_by.clone(); + Ok(input + .take_while(move |doc_res| match doc_res { + Ok(doc) => match &end_at { + FirestoreQueryCursor::BeforeValue(values) => { + let result = values.iter().zip(&order_by).any(|(value, ordered_field)| { + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + cache::cache_filter_engine::CompareOp::LessThan, + field_a, + field_b, + ) + } + (_, _) => false, + } + }); + future::ready(result) + }, + FirestoreQueryCursor::AfterValue(values) => { + let result = values.iter().zip(&order_by).any(|(value, ordered_field)| { + match ( + firestore_doc_get_field_by_path( + doc, + &ordered_field.field_name, + ), + &value.value.value_type, + ) { + (Some(field_a), Some(field_b)) => { + cache::cache_filter_engine::compare_values( + cache::cache_filter_engine::CompareOp::GreaterThanOrEqual, + field_a, + field_b, + ) + } + (_, _) => false, + } + }); + future::ready(result) + } + }, + Err(_) => future::ready(false), + }) + .boxed()) + } else { + Ok(input) + } + } else { + Ok(input) + } + } }