Skip to content

Commit

Permalink
More query params support
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 6, 2023
1 parent 8f328d9 commit e45d061
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 16 deletions.
12 changes: 7 additions & 5 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,13 @@ impl FirestoreMemoryCacheBackend {
}),
);

if query_engine.query.order_by.is_some() {
Ok(query_engine.sort_stream(filtered_stream).await?)
} else {
Ok(filtered_stream)
}
let ordered_stream = query_engine.sort_stream(filtered_stream).await?;
let limited_stream = query_engine.limit_stream(ordered_stream).await?;
let offset_stream = query_engine.offset_stream(limited_stream).await?;
let start_at_stream = query_engine.start_at_stream(offset_stream).await?;
let end_at_stream = query_engine.end_at_stream(start_at_stream).await?;

Ok(end_at_stream)
}
None => Ok(Box::pin(futures::stream::empty())),
}
Expand Down
12 changes: 7 additions & 5 deletions src/cache/backends/persistent_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,13 @@ impl FirestorePersistentCacheBackend {

let filtered_stream = Box::pin(futures::stream::iter(docs));

if query_engine.query.order_by.is_some() {
Ok(query_engine.sort_stream(filtered_stream).await?)
} else {
Ok(filtered_stream)
}
let ordered_stream = query_engine.sort_stream(filtered_stream).await?;
let limited_stream = query_engine.limit_stream(ordered_stream).await?;
let offset_stream = query_engine.offset_stream(limited_stream).await?;
let start_at_stream = query_engine.start_at_stream(offset_stream).await?;
let end_at_stream = query_engine.end_at_stream(start_at_stream).await?;

Ok(end_at_stream)
}
}

Expand Down
169 changes: 163 additions & 6 deletions src/cache/cache_query_engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::cache::cache_filter_engine::FirestoreCacheFilterEngine;
use crate::*;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::stream::StreamExt;
use futures::{future, TryStreamExt};
use std::cmp::Ordering;

#[derive(Clone)]
Expand All @@ -18,11 +19,6 @@ impl FirestoreCacheQueryEngine {

pub fn params_supported(&self) -> bool {
self.query.all_descendants.iter().all(|x| !*x)
&& self.query.start_at.is_none()
&& self.query.end_at.is_none()
&& self.query.offset.is_none()
&& self.query.limit.is_none()
&& self.query.return_only_fields.is_none()
}

pub fn matches_doc(&self, doc: &FirestoreDocument) -> bool {
Expand Down Expand Up @@ -84,4 +80,165 @@ impl FirestoreCacheQueryEngine {
Ok(input)
}
}

pub async fn limit_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(limit) = self.query.limit {
Ok(input
.scan(0_u32, move |index, doc| {
if *index < limit {
*index += 1;
future::ready(Some(doc))
} else {
future::ready(None)
}
})
.boxed())
} else {
Ok(input)
}
}

pub async fn offset_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(offset) = self.query.offset {
Ok(input.skip(offset as usize).boxed())
} else {
Ok(input)
}
}

pub async fn start_at_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(start_at) = &self.query.start_at {
if let Some(order_by) = &self.query.order_by {
let start_at = start_at.clone();
let order_by = order_by.clone();
Ok(input
.skip_while(move |doc_res| match doc_res {
Ok(doc) => match &start_at {
FirestoreQueryCursor::BeforeValue(values) => {
let result = values.iter().zip(&order_by).any(|(value, ordered_field)| {
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
cache::cache_filter_engine::CompareOp::GreaterThanOrEqual,
field_a,
field_b,
)
}
(_, _) => false,
}
});
future::ready(result)
},
FirestoreQueryCursor::AfterValue(values) => {
let result = values.iter().zip(&order_by).any(|(value, ordered_field)| {
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
cache::cache_filter_engine::CompareOp::LessThan,
field_a,
field_b,
)
}
(_, _) => false,
}
});
future::ready(result)
}
},
Err(_) => future::ready(false),
})
.boxed())
} else {
Ok(input)
}
} else {
Ok(input)
}
}

pub async fn end_at_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(end_at) = &self.query.end_at {
if let Some(order_by) = &self.query.order_by {
let end_at = end_at.clone();
let order_by = order_by.clone();
Ok(input
.take_while(move |doc_res| match doc_res {
Ok(doc) => match &end_at {
FirestoreQueryCursor::BeforeValue(values) => {
let result = values.iter().zip(&order_by).any(|(value, ordered_field)| {
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
cache::cache_filter_engine::CompareOp::LessThan,
field_a,
field_b,
)
}
(_, _) => false,
}
});
future::ready(result)
},
FirestoreQueryCursor::AfterValue(values) => {
let result = values.iter().zip(&order_by).any(|(value, ordered_field)| {
match (
firestore_doc_get_field_by_path(
doc,
&ordered_field.field_name,
),
&value.value.value_type,
) {
(Some(field_a), Some(field_b)) => {
cache::cache_filter_engine::compare_values(
cache::cache_filter_engine::CompareOp::GreaterThanOrEqual,
field_a,
field_b,
)
}
(_, _) => false,
}
});
future::ready(result)
}
},
Err(_) => future::ready(false),
})
.boxed())
} else {
Ok(input)
}
} else {
Ok(input)
}
}
}

0 comments on commit e45d061

Please sign in to comment.