From b0132099dc2615aadfe45f16ada935cf720a9c53 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Fri, 6 Oct 2023 14:13:01 +0200 Subject: [PATCH] Query refactorings and caching query initial template --- src/cache/backends/memory_backend.rs | 17 ++++- src/cache/backends/persistent_backend.rs | 8 +++ src/cache/mod.rs | 6 ++ src/db/query.rs | 90 +++++++++++------------- 4 files changed, 71 insertions(+), 50 deletions(-) diff --git a/src/cache/backends/memory_backend.rs b/src/cache/backends/memory_backend.rs index 7e37922..2ffe659 100644 --- a/src/cache/backends/memory_backend.rs +++ b/src/cache/backends/memory_backend.rs @@ -23,10 +23,13 @@ impl FirestoreMemoryCacheBackend { Self::with_collection_options(config, |_| FirestoreMemCache::builder().max_capacity(10000)) } - pub fn with_collection_options( + pub fn with_collection_options( config: FirestoreCacheConfiguration, - collection_mem_options: fn(&str) -> FirestoreMemCacheOptions, - ) -> FirestoreResult { + collection_mem_options: FN, + ) -> FirestoreResult + where + FN: Fn(&str) -> FirestoreMemCacheOptions, + { let collection_caches = config .collections .keys() @@ -201,4 +204,12 @@ impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend { None => Ok(Box::pin(futures::stream::empty())), } } + + async fn query_docs( + &self, + collection_path: &str, + query: &FirestoreQueryParams, + ) -> FirestoreResult>>> { + todo!() + } } diff --git a/src/cache/backends/persistent_backend.rs b/src/cache/backends/persistent_backend.rs index dc5b3fe..9effcd4 100644 --- a/src/cache/backends/persistent_backend.rs +++ b/src/cache/backends/persistent_backend.rs @@ -331,6 +331,14 @@ impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend { Ok(Box::pin(futures::stream::iter(docs))) } + + async fn query_docs( + &self, + collection_path: &str, + query: &FirestoreQueryParams, + ) -> FirestoreResult>>> { + todo!() + } } impl From for FirestoreError { diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 576901c..748edff 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -187,4 +187,10 @@ pub trait FirestoreCacheDocsByPathSupport { &self, collection_path: &str, ) -> FirestoreResult>>; + + async fn query_docs( + &self, + collection_path: &str, + query: &FirestoreQueryParams, + ) -> FirestoreResult>>>; } diff --git a/src/db/query.rs b/src/db/query.rs index 96b3599..82b5539 100644 --- a/src/db/query.rs +++ b/src/db/query.rs @@ -95,15 +95,6 @@ impl FirestoreDb { span: Span, ) -> BoxFuture>>>> { async move { - #[cfg(feature = "caching")] - { - if let FirestoreCachedValue::UseCached(stream) = - self.query_docs_from_cache(¶ms).await? - { - return Ok(stream); - } - } - let query_request = self.create_query_request(params.clone())?; let begin_query_utc: DateTime = Utc::now(); @@ -164,37 +155,43 @@ impl FirestoreDb { async fn query_docs_from_cache<'a>( &'a self, params: &FirestoreQueryParams, - ) -> FirestoreResult< - FirestoreCachedValue>>>, - > { - Ok(FirestoreCachedValue::SkipCache) + ) -> FirestoreResult>>> + { + match ¶ms.collection_id { + FirestoreQueryCollection::Group(_) => Ok(FirestoreCachedValue::SkipCache), + FirestoreQueryCollection::Single(collection_id) => { + if let FirestoreDbSessionCacheMode::ReadCachedOnly(ref cache) = + self.session_params.cache_mode + { + let span = span!( + Level::DEBUG, + "Firestore Query Cached", + "/firestore/collection_name" = collection_id.as_str(), + ); + + span.in_scope(|| { + debug!("Querying {} documents from cache", collection_id); + }); + + let collection_path = if let Some(parent) = params.parent.as_ref() { + format!("{}/{}", parent, collection_id) + } else { + format!("{}/{}", self.get_documents_path(), collection_id.as_str()) + }; + + cache.query_docs(&collection_path, ¶ms).await + } else { + Ok(FirestoreCachedValue::SkipCache) + } + } + } } } #[async_trait] impl FirestoreQuerySupport for FirestoreDb { async fn query_doc(&self, params: FirestoreQueryParams) -> FirestoreResult> { - let span = span!( - Level::DEBUG, - "Firestore Query", - "/firestore/collection_name" = params.collection_id.to_string().as_str(), - "/firestore/response_time" = field::Empty - ); - - let doc_stream = self - .stream_query_doc_with_retries(params, 0, span) - .await? - .filter_map(|doc_res| { - future::ready(match doc_res { - Ok(Some(doc)) => Some(Ok(doc)), - Ok(None) => None, - Err(err) => { - error!("Error occurred while consuming query: {}", err); - Some(Err(err)) - } - }) - }); - + let doc_stream = self.stream_query_doc_with_errors(params).await?; Ok(doc_stream.try_collect::>().await?) } @@ -202,21 +199,11 @@ impl FirestoreQuerySupport for FirestoreDb { &'a self, params: FirestoreQueryParams, ) -> FirestoreResult> { - let collection_str = params.collection_id.to_string(); - - let span = span!( - Level::DEBUG, - "Firestore Streaming Query", - "/firestore/collection_name" = collection_str.as_str(), - "/firestore/response_time" = field::Empty - ); - - let doc_stream = self.stream_query_doc_with_retries(params, 0, span).await?; + let doc_stream = self.stream_query_doc_with_errors(params).await?; Ok(Box::pin(doc_stream.filter_map(|doc_res| { future::ready(match doc_res { - Ok(Some(doc)) => Some(doc), - Ok(None) => None, + Ok(doc) => Some(doc), Err(err) => { error!("Error occurred while consuming query: {}", err); None @@ -229,6 +216,15 @@ impl FirestoreQuerySupport for FirestoreDb { &'a self, params: FirestoreQueryParams, ) -> FirestoreResult>> { + #[cfg(feature = "caching")] + { + if let FirestoreCachedValue::UseCached(stream) = + self.query_docs_from_cache(¶ms).await? + { + return Ok(stream); + } + } + let collection_str = params.collection_id.to_string(); let span = span!( @@ -276,7 +272,7 @@ impl FirestoreQuerySupport for FirestoreDb { Ok(obj) => Some(obj), Err(err) => { error!( - "Error occurred while consuming query document as a stream: {}", + "Error occurred while converting query document in a stream: {}", err ); None