Skip to content

Commit

Permalink
Query caching support
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 6, 2023
1 parent 835b7d8 commit 70e8efe
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 57 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ Caching works on the document level.
The cache will be used for the following operations:
- Reading documents by IDs (get and batch get);
- Listing all documents in a collection;
- Partial support for querying documents in a collection:
- Filtering;
- Ordering;

(Caching other operations may be extended in the future).

Expand Down
4 changes: 4 additions & 0 deletions examples/caching_persistent_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.greater_than_or_equal(250),
)
})
.order_by([(
path!(MyTestStructure::some_num),
FirestoreQueryDirection::Ascending,
)])
.obj::<MyTestStructure>()
.stream_query_with_errors()
.await?;
Expand Down
52 changes: 30 additions & 22 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,30 +108,38 @@ impl FirestoreMemoryCacheBackend {
query_engine: FirestoreCacheQueryEngine,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>> {
match self.collection_caches.get(collection_path) {
Some(mem_cache) => Ok(Box::pin(
futures::stream::unfold(
(query_engine, mem_cache.iter()),
|(query_engine, mut iter)| async move {
match iter.next() {
Some((_, doc)) => {
if query_engine.matches_doc(&doc) {
Some((Ok(Some(doc)), (query_engine, iter)))
} else {
Some((Ok(None), (query_engine, iter)))
Some(mem_cache) => {
let filtered_stream = Box::pin(
futures::stream::unfold(
(query_engine.clone(), mem_cache.iter()),
|(query_engine, mut iter)| async move {
match iter.next() {
Some((_, doc)) => {
if query_engine.matches_doc(&doc) {
Some((Ok(Some(doc)), (query_engine, iter)))
} else {
Some((Ok(None), (query_engine, iter)))
}
}
None => None,
}
None => None,
}
},
)
.filter_map(|doc_res| {
future::ready(match doc_res {
Ok(Some(doc)) => Some(Ok(doc)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}),
)),
},
)
.filter_map(|doc_res| {
future::ready(match doc_res {
Ok(Some(doc)) => Some(Ok(doc)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}),
);

if query_engine.query.order_by.is_some() {
Ok(query_engine.sort_stream(filtered_stream).await?)
} else {
Ok(filtered_stream)
}
}
None => Ok(Box::pin(futures::stream::empty())),
}
}
Expand Down
45 changes: 35 additions & 10 deletions src/cache/backends/persistent_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::*;
use async_trait::async_trait;
use futures::stream::BoxStream;

use crate::cache::cache_query_engine::FirestoreCacheQueryEngine;
use chrono::Utc;
use futures::StreamExt;
use gcloud_sdk::google::firestore::v1::Document;
Expand Down Expand Up @@ -200,6 +201,36 @@ impl FirestorePersistentCacheBackend {
let len = read_tx.open_table(td)?.len()?;
Ok(len)
}

async fn query_cached_docs(
&self,
collection_path: &str,
query_engine: FirestoreCacheQueryEngine,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>> {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);

let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let iter = table.iter()?;

// It seems there is no way to work with streaming for redb, so this is not efficient
let mut docs: Vec<FirestoreResult<FirestoreDocument>> = Vec::new();
for record in iter {
let (_, v) = record?;
let doc = Self::buf_to_document(v.value())?;
if query_engine.matches_doc(&doc) {
docs.push(Ok(doc));
}
}

let filtered_stream = Box::pin(futures::stream::iter(docs));

if query_engine.query.order_by.is_some() {
Ok(query_engine.sort_stream(filtered_stream).await?)
} else {
Ok(filtered_stream)
}
}
}

#[async_trait]
Expand Down Expand Up @@ -338,17 +369,11 @@ impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend {
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>> {
// For now only basic/simple query all supported
if query.all_descendants.iter().all(|x| !*x)
&& query.order_by.is_none()
&& query.filter.is_none()
&& query.start_at.is_none()
&& query.end_at.is_none()
&& query.offset.is_none()
&& query.limit.is_none()
&& query.return_only_fields.is_none()
{
let simple_query_engine = FirestoreCacheQueryEngine::new(query);
if simple_query_engine.params_supported() {
Ok(FirestoreCachedValue::UseCached(
self.list_all_docs(collection_path).await?,
self.query_cached_docs(collection_path, simple_query_engine)
.await?,
))
} else {
Ok(FirestoreCachedValue::SkipCache)
Expand Down
40 changes: 19 additions & 21 deletions src/cache/cache_filter_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl<'a> FirestoreCacheFilterEngine<'a> {
}

pub fn matches_doc(&'a self, doc: &FirestoreDocument) -> bool {
Self::matches_doc_filter(doc, &self.filter)
Self::matches_doc_filter(doc, self.filter)
}

pub fn matches_doc_filter(doc: &FirestoreDocument, filter: &FirestoreQueryFilter) -> bool {
Expand Down Expand Up @@ -63,17 +63,21 @@ impl<'a> FirestoreCacheFilterEngine<'a> {
}
FirestoreQueryFilterUnary::IsNull(field_path) => {
firestore_doc_get_field_by_path(doc, field_path)
.map(|field_value| match field_value {
gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_) => true,
_ => false,
.map(|field_value| {
matches!(
field_value,
gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_)
)
})
.unwrap_or(true)
}
FirestoreQueryFilterUnary::IsNotNull(field_path) => {
firestore_doc_get_field_by_path(doc, field_path)
.map(|field_value| match field_value {
gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_) => false,
_ => true,
.map(|field_value| {
!matches!(
field_value,
gcloud_sdk::google::firestore::v1::value::ValueType::NullValue(_)
)
})
.unwrap_or(false)
}
Expand Down Expand Up @@ -240,7 +244,7 @@ impl<'a> FirestoreCacheFilterEngine<'a> {
}
}

enum CompareOp {
pub(super) enum CompareOp {
Equal,
NotEqual,
LessThan,
Expand All @@ -253,7 +257,7 @@ enum CompareOp {
NotIn,
}

fn compare_values(
pub(super) fn compare_values(
op: CompareOp,
a: &gcloud_sdk::google::firestore::v1::value::ValueType,
b: &gcloud_sdk::google::firestore::v1::value::ValueType,
Expand Down Expand Up @@ -404,14 +408,12 @@ fn compare_values(
) => a_val
.values
.iter()
.map(|v| &v.value_type)
.flatten()
.flat_map(|v| &v.value_type)
.any(|a_val| {
b_val
.values
.iter()
.map(|v| &v.value_type)
.flatten()
.flat_map(|v| &v.value_type)
.all(|b_val| compare_values(CompareOp::Equal, a_val, b_val))
}),

Expand All @@ -422,14 +424,12 @@ fn compare_values(
) => a_val
.values
.iter()
.map(|v| &v.value_type)
.flatten()
.flat_map(|v| &v.value_type)
.any(|a_val| {
b_val
.values
.iter()
.map(|v| &v.value_type)
.flatten()
.flat_map(|v| &v.value_type)
.any(|b_val| compare_values(CompareOp::Equal, a_val, b_val))
}),

Expand All @@ -440,8 +440,7 @@ fn compare_values(
) => a_val
.values
.iter()
.map(|v| &v.value_type)
.flatten()
.flat_map(|v| &v.value_type)
.any(|a_val| compare_values(CompareOp::Equal, a_val, b_val)),

(
Expand All @@ -451,8 +450,7 @@ fn compare_values(
) => a_val
.values
.iter()
.map(|v| &v.value_type)
.flatten()
.flat_map(|v| &v.value_type)
.any(|a_val| !compare_values(CompareOp::Equal, a_val, b_val)),

// Any other combinations result in false
Expand Down
58 changes: 57 additions & 1 deletion src/cache/cache_query_engine.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use crate::cache::cache_filter_engine::FirestoreCacheFilterEngine;
use crate::*;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use std::cmp::Ordering;

#[derive(Clone)]
pub struct FirestoreCacheQueryEngine {
query: FirestoreQueryParams,
pub query: FirestoreQueryParams,
}

impl FirestoreCacheQueryEngine {
Expand Down Expand Up @@ -30,4 +34,56 @@ impl FirestoreCacheQueryEngine {
true
}
}

pub async fn sort_stream<'a, 'b>(
&'a self,
input: BoxStream<'b, FirestoreResult<FirestoreDocument>>,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<FirestoreDocument>>> {
if let Some(order_by) = &self.query.order_by {
let mut collected: Vec<FirestoreDocument> = input.try_collect().await?;

collected.sort_by(|doc_a, doc_b| {
let mut current_ordering = Ordering::Equal;
for sort_field in order_by {
match (
firestore_doc_get_field_by_path(doc_a, &sort_field.field_name),
firestore_doc_get_field_by_path(doc_b, &sort_field.field_name),
) {
(Some(field_a), Some(field_b)) => {
if cache::cache_filter_engine::compare_values(
cache::cache_filter_engine::CompareOp::Equal,
field_a,
field_b,
) {
continue;
}

if cache::cache_filter_engine::compare_values(
cache::cache_filter_engine::CompareOp::LessThan,
field_a,
field_b,
) {
current_ordering = match sort_field.direction {
FirestoreQueryDirection::Ascending => Ordering::Less,
FirestoreQueryDirection::Descending => Ordering::Greater,
}
} else {
current_ordering = match sort_field.direction {
FirestoreQueryDirection::Ascending => Ordering::Greater,
FirestoreQueryDirection::Descending => Ordering::Less,
}
}
}
(None, None) => current_ordering = Ordering::Equal,
(None, Some(_)) => current_ordering = Ordering::Equal,
(Some(_), None) => current_ordering = Ordering::Equal,
}
}
current_ordering
});
Ok(futures::stream::iter(collected.into_iter().map(Ok)).boxed())
} else {
Ok(input)
}
}
}
5 changes: 2 additions & 3 deletions src/firestore_document_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ pub fn firestore_doc_get_field_by_path<'d>(
field_path: &str,
) -> Option<&'d gcloud_sdk::google::firestore::v1::value::ValueType> {
let field_path: Vec<String> = field_path
.split(".")
.into_iter()
.map(|s| s.to_string().replace("`", ""))
.split('.')
.map(|s| s.to_string().replace('`', ""))
.collect();
firestore_doc_get_field_by_path_arr(&doc.fields, &field_path)
}
Expand Down
Loading

0 comments on commit 70e8efe

Please sign in to comment.