Skip to content

Commit

Permalink
More query params support
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 7, 2023
1 parent 8f328d9 commit 90caa15
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 18 deletions.
8 changes: 6 additions & 2 deletions examples/caching_persistent_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
})
.order_by([(
path!(MyTestStructure::some_num),
FirestoreQueryDirection::Descending,
FirestoreQueryDirection::Ascending,
)])
.obj::<MyTestStructure>()
.stream_query_with_errors()
.await?;

let queried_items = all_items_stream.try_collect::<Vec<_>>().await?;
println!("{:?}", queried_items.len());
println!(
"{:?} {:?}...",
queried_items.len(),
queried_items.iter().take(5).collect::<Vec<_>>()
);

cache.shutdown().await?;

Expand Down
8 changes: 3 additions & 5 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,9 @@ impl FirestoreMemoryCacheBackend {
}),
);

if query_engine.query.order_by.is_some() {
Ok(query_engine.sort_stream(filtered_stream).await?)
} else {
Ok(filtered_stream)
}
let output_stream = query_engine.process_query_stream(filtered_stream).await?;

Ok(output_stream)
}
None => Ok(Box::pin(futures::stream::empty())),
}
Expand Down
7 changes: 2 additions & 5 deletions src/cache/backends/persistent_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,9 @@ impl FirestorePersistentCacheBackend {
}

let filtered_stream = Box::pin(futures::stream::iter(docs));
let output_stream = query_engine.process_query_stream(filtered_stream).await?;

if query_engine.query.order_by.is_some() {
Ok(query_engine.sort_stream(filtered_stream).await?)
} else {
Ok(filtered_stream)
}
Ok(output_stream)
}
}

Expand Down
205 changes: 199 additions & 6 deletions src/cache/cache_query_engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::cache::cache_filter_engine::FirestoreCacheFilterEngine;
use crate::*;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::stream::StreamExt;
use futures::{future, TryStreamExt};
use std::cmp::Ordering;

#[derive(Clone)]
Expand All @@ -18,11 +19,6 @@ impl FirestoreCacheQueryEngine {

pub fn params_supported(&self) -> bool {
self.query.all_descendants.iter().all(|x| !*x)
&& self.query.start_at.is_none()
&& self.query.end_at.is_none()
&& self.query.offset.is_none()
&& self.query.limit.is_none()
&& self.query.return_only_fields.is_none()
}

pub fn matches_doc(&self, doc: &FirestoreDocument) -> bool {
Expand Down Expand Up @@ -84,4 +80,201 @@ impl FirestoreCacheQueryEngine {
Ok(input)
}
}

pub async fn limit_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(limit) = self.query.limit {
Ok(input
.scan(0_u32, move |index, doc| {
if *index < limit {
*index += 1;
future::ready(Some(doc))
} else {
future::ready(None)
}
})
.boxed())
} else {
Ok(input)
}
}

pub async fn offset_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(offset) = self.query.offset {
Ok(input.skip(offset as usize).boxed())
} else {
Ok(input)
}
}

pub async fn start_at_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(start_at) = &self.query.start_at {
if let Some(order_by) = &self.query.order_by {
let start_at = start_at.clone();
let order_by = order_by.clone();
Ok(input
.skip_while(move |doc_res| match doc_res {
Ok(doc) => match &start_at {
FirestoreQueryCursor::BeforeValue(values) => {
let result = values.iter().zip(&order_by).any(
|(value, ordered_field)| {
let order_by_comp = match ordered_field.direction {
FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThan,
FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThan
};
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
order_by_comp,
field_a,
field_b,
)
}
(_, _) => false,
}
},
);
future::ready(result)
}
FirestoreQueryCursor::AfterValue(values) => {
let result = values.iter().zip(&order_by).any(
|(value, ordered_field)| {
let order_by_comp = match ordered_field.direction {
FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThanOrEqual,
FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThanOrEqual
};
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
order_by_comp,
field_a,
field_b,
)
}
(_, _) => false,
}
},
);
future::ready(result)
}
},
Err(_) => future::ready(false),
})
.boxed())
} else {
Ok(input)
}
} else {
Ok(input)
}
}

pub async fn end_at_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(end_at) = &self.query.end_at {
if let Some(order_by) = &self.query.order_by {
let end_at = end_at.clone();
let order_by = order_by.clone();
Ok(input
.take_while(move |doc_res| match doc_res {
Ok(doc) => match &end_at {
FirestoreQueryCursor::BeforeValue(values) => {
let result = values.iter().zip(&order_by).any(
|(value, ordered_field)| {
let order_by_comp = match ordered_field.direction {
FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThan,
FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThan
};
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
order_by_comp,
field_a,
field_b,
)
}
(_, _) => false,
}
},
);
future::ready(result)
}
FirestoreQueryCursor::AfterValue(values) => {
let result = values.iter().zip(&order_by).any(
|(value, ordered_field)| {
let order_by_comp = match ordered_field.direction {
FirestoreQueryDirection::Ascending => cache::cache_filter_engine::CompareOp::LessThanOrEqual,
FirestoreQueryDirection::Descending => cache::cache_filter_engine::CompareOp::GreaterThanOrEqual
};
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
order_by_comp,
field_a,
field_b,
)
}
(_, _) => false,
}
},
);
future::ready(result)
}
},
Err(_) => future::ready(false),
})
.boxed())
} else {
Ok(input)
}
} else {
Ok(input)
}
}

pub async fn process_query_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
let input = self.sort_stream(input).await?;
let input = self.limit_stream(input).await?;
let input = self.offset_stream(input).await?;
let input = self.start_at_stream(input).await?;
let input = self.end_at_stream(input).await?;
Ok(input)
}
}

0 comments on commit 90caa15

Please sign in to comment.