diff --git a/examples/caching_memory_collections.rs b/examples/caching_memory_collections.rs index 3ede991..5d57795 100644 --- a/examples/caching_memory_collections.rs +++ b/examples/caching_memory_collections.rs @@ -36,9 +36,12 @@ async fn main() -> Result<(), Box> { &db, FirestoreMemoryCacheBackend::new( FirestoreCacheConfiguration::new().add_collection_config( - TEST_COLLECTION_NAME, - FirestoreListenerTarget::new(1000), - FirestoreCacheCollectionLoadMode::PreloadNone, + &db, + FirestoreCacheCollectionConfiguration::new( + TEST_COLLECTION_NAME, + FirestoreListenerTarget::new(1000), + FirestoreCacheCollectionLoadMode::PreloadNone, + ), ), )?, FirestoreMemListenStateStorage::new(), diff --git a/examples/caching_persistent_collections.rs b/examples/caching_persistent_collections.rs index 741f340..6c53ab7 100644 --- a/examples/caching_persistent_collections.rs +++ b/examples/caching_persistent_collections.rs @@ -36,9 +36,12 @@ async fn main() -> Result<(), Box> { &db, FirestorePersistentCacheBackend::new( FirestoreCacheConfiguration::new().add_collection_config( - TEST_COLLECTION_NAME, - FirestoreListenerTarget::new(1000), - FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty, + &db, + FirestoreCacheCollectionConfiguration::new( + TEST_COLLECTION_NAME, + FirestoreListenerTarget::new(1000), + FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty, + ), ), )?, FirestoreTempFilesListenStateStorage::new(), diff --git a/src/cache/backends/memory_backend.rs b/src/cache/backends/memory_backend.rs index 7248aff..7e37922 100644 --- a/src/cache/backends/memory_backend.rs +++ b/src/cache/backends/memory_backend.rs @@ -16,7 +16,6 @@ pub type FirestoreMemCacheOptions = CacheBuilder, - collection_targets: HashMap, } impl FirestoreMemoryCacheBackend { @@ -31,49 +30,43 @@ impl FirestoreMemoryCacheBackend { let collection_caches = config .collections .keys() - .map(|collection| { + .map(|collection_path| { ( - collection.clone(), - collection_mem_options(collection.as_str()).build(), + collection_path.clone(), + collection_mem_options(collection_path.as_str()).build(), ) }) .collect(); - let collection_targets = config - .collections - .iter() - .map(|(collection, collection_config)| { - ( - collection_config.listener_target.clone(), - collection.clone(), - ) - }) - .collect(); Ok(Self { config, collection_caches, - collection_targets, }) } async fn preload_collections(&self, db: &FirestoreDb) -> Result<(), FirestoreError> { - for (collection, config) in &self.config.collections { + for (collection_path, config) in &self.config.collections { match config.collection_load_mode { FirestoreCacheCollectionLoadMode::PreloadAllDocs | FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty => { - if let Some(mem_cache) = self.collection_caches.get(collection.as_str()) { - debug!("Preloading {}", collection.as_str()); - let stream = db - .fluent() - .list() - .from(collection.as_str()) - .page_size(1000) - .stream_all_with_errors() - .await?; + if let Some(mem_cache) = self.collection_caches.get(collection_path.as_str()) { + debug!("Preloading {}", collection_path.as_str()); + + let params = if let Some(parent) = &config.parent { + db.fluent() + .list() + .from(&config.collection_name) + .parent(parent) + } else { + db.fluent().list().from(&config.collection_name) + }; + + let stream = params.page_size(1000).stream_all_with_errors().await?; stream .try_for_each_concurrent(2, |doc| async move { - mem_cache.insert(doc.name.clone(), doc).await; + let (_, document_id) = split_document_path(&doc.name); + mem_cache.insert(document_id.to_string(), doc).await; Ok(()) }) .await?; @@ -82,7 +75,7 @@ impl FirestoreMemoryCacheBackend { info!( "Preloading collection `{}` has been finished. Loaded: {} entries", - collection.as_str(), + collection_path.as_str(), mem_cache.entry_count() ); } @@ -108,13 +101,16 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend { Ok(self .config .collections - .iter() - .map(|(collection, collection_config)| { + .values() + .map(|collection_config| { FirestoreListenerTargetParams::new( collection_config.listener_target.clone(), - FirestoreTargetType::Query(FirestoreQueryParams::new( - collection.as_str().into(), - )), + FirestoreTargetType::Query( + FirestoreQueryParams::new( + collection_config.collection_name.as_str().into(), + ) + .opt_parent(collection_config.parent.clone()), + ), HashMap::new(), ) .with_resume_type(FirestoreListenerTargetResumeType::ReadTime(read_from_time)) @@ -123,8 +119,8 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend { } async fn invalidate_all(&self) -> FirestoreResult<()> { - for (collection_id, mem_cache) in &self.collection_caches { - debug!("Invalidating cache for {}", collection_id); + for (collection_path, mem_cache) in &self.collection_caches { + debug!("Invalidating cache for {}", collection_path); mem_cache.invalidate_all(); mem_cache.run_pending_tasks().await; } @@ -139,35 +135,25 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend { match event { FirestoreListenEvent::DocumentChange(doc_change) => { if let Some(doc) = doc_change.document { - if let Some(target_id) = doc_change.target_ids.first() { - if let Some(mem_cache_name) = - self.collection_targets.get(&(*target_id as u32).into()) - { - if let Some(mem_cache) = self.collection_caches.get(mem_cache_name) { - trace!( - "Writing document to cache due to listener event: {:?}", - doc.name - ); - mem_cache.insert(doc.name.clone(), doc).await; - } - } + let (collection_path, document_id) = split_document_path(&doc.name); + if let Some(mem_cache) = self.collection_caches.get(collection_path) { + trace!( + "Writing document to cache due to listener event: {:?}", + doc.name + ); + mem_cache.insert(document_id.to_string(), doc).await; } } Ok(()) } FirestoreListenEvent::DocumentDelete(doc_deleted) => { - if let Some(target_id) = doc_deleted.removed_target_ids.first() { - if let Some(mem_cache_name) = - self.collection_targets.get(&(*target_id as u32).into()) - { - if let Some(mem_cache) = self.collection_caches.get(mem_cache_name) { - trace!( - "Removing document from cache due to listener event: {:?}", - doc_deleted.document.as_str() - ); - mem_cache.remove(&doc_deleted.document).await; - } - } + let (collection_path, document_id) = split_document_path(&doc_deleted.document); + if let Some(mem_cache) = self.collection_caches.get(collection_path) { + trace!( + "Removing document from cache due to listener event: {:?}", + doc_deleted.document.as_str() + ); + mem_cache.remove(document_id).await; } Ok(()) } @@ -180,24 +166,23 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend { impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend { async fn get_doc_by_path( &self, - collection_id: &str, document_path: &str, ) -> FirestoreResult> { - match self.collection_caches.get(collection_id) { - Some(mem_cache) => Ok(mem_cache.get(document_path).await), + let (collection_path, document_id) = split_document_path(document_path); + + match self.collection_caches.get(collection_path) { + Some(mem_cache) => Ok(mem_cache.get(document_id).await), None => Ok(None), } } - async fn update_doc_by_path( - &self, - collection_id: &str, - document: &FirestoreDocument, - ) -> FirestoreResult<()> { - match self.collection_caches.get(collection_id) { + async fn update_doc_by_path(&self, document: &FirestoreDocument) -> FirestoreResult<()> { + let (collection_path, document_id) = split_document_path(&document.name); + + match self.collection_caches.get(collection_path) { Some(mem_cache) => { mem_cache - .insert(document.name.clone(), document.clone()) + .insert(document_id.to_string(), document.clone()) .await; Ok(()) } @@ -207,9 +192,9 @@ impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend { async fn list_all_docs( &self, - collection_id: &str, + collection_path: &str, ) -> FirestoreResult>> { - match self.collection_caches.get(collection_id) { + match self.collection_caches.get(collection_path) { Some(mem_cache) => Ok(Box::pin(futures::stream::iter( mem_cache.iter().map(|(_, doc)| Ok(doc)), ))), diff --git a/src/cache/backends/persistent_backend.rs b/src/cache/backends/persistent_backend.rs index bf0c096..f33b752 100644 --- a/src/cache/backends/persistent_backend.rs +++ b/src/cache/backends/persistent_backend.rs @@ -14,7 +14,6 @@ use tracing::*; pub struct FirestorePersistentCacheBackend { pub config: FirestoreCacheConfiguration, - collection_targets: HashMap, redb: Database, } @@ -43,17 +42,6 @@ impl FirestorePersistentCacheBackend { config: FirestoreCacheConfiguration, data_file_path: PathBuf, ) -> FirestoreResult { - let collection_targets = config - .collections - .iter() - .map(|(collection, collection_config)| { - ( - collection_config.listener_target.clone(), - collection.clone(), - ) - }) - .collect(); - if data_file_path.exists() { debug!( "Opening database for persistent cache {:?}...", @@ -71,16 +59,12 @@ impl FirestorePersistentCacheBackend { db.compact()?; info!("Successfully opened database for persistent cache"); - Ok(Self { - config, - collection_targets, - redb: db, - }) + Ok(Self { config, redb: db }) } async fn preload_collections(&self, db: &FirestoreDb) -> Result<(), FirestoreError> { - for (collection, config) in &self.config.collections { - let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection.as_str()); + for (collection_path, config) in &self.config.collections { + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path.as_str()); match config.collection_load_mode { FirestoreCacheCollectionLoadMode::PreloadAllDocs @@ -89,7 +73,7 @@ impl FirestorePersistentCacheBackend { let read_tx = self.redb.begin_read()?; if read_tx .list_tables()? - .any(|t| t.name() == collection.as_str()) + .any(|t| t.name() == collection_path.as_str()) { read_tx.open_table(td)?.len()? } else { @@ -104,25 +88,28 @@ impl FirestorePersistentCacheBackend { { info!( "Preloading collection `{}` has been skipped. Already loaded: {} entries", - collection.as_str(), + collection_path.as_str(), existing_records ); continue; } - debug!("Preloading {}", collection.as_str()); - let stream = db - .fluent() - .list() - .from(collection.as_str()) - .page_size(1000) - .stream_all() - .await?; + debug!("Preloading {}", collection_path.as_str()); + let params = if let Some(parent) = &config.parent { + db.fluent() + .list() + .from(&config.collection_name) + .parent(parent) + } else { + db.fluent().list().from(&config.collection_name) + }; + + let stream = params.page_size(1000).stream_all().await?; stream .ready_chunks(100) .for_each(|docs| async move { - if let Err(err) = self.write_batch_docs(collection, docs) { + if let Err(err) = self.write_batch_docs(docs) { error!("Error while preloading collection: {}", err); } }) @@ -142,13 +129,16 @@ impl FirestorePersistentCacheBackend { info!( "Preloading collection `{}` has been finished. Loaded: {} entries", - collection.as_str(), + collection_path.as_str(), updated_records ); } FirestoreCacheCollectionLoadMode::PreloadNone => { let tx = self.redb.begin_write()?; - debug!("Creating corresponding collection table `{}`", collection); + debug!( + "Creating corresponding collection table `{}`", + collection_path + ); tx.open_table(td)?; tx.commit()?; } @@ -157,16 +147,15 @@ impl FirestorePersistentCacheBackend { Ok(()) } - fn write_batch_docs(&self, collection: &str, docs: Vec) -> FirestoreResult<()> { - let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection); - + fn write_batch_docs(&self, docs: Vec) -> FirestoreResult<()> { let write_txn = self.redb.begin_write()?; for doc in docs { + let (collection_path, document_id) = split_document_path(&doc.name); + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path); let mut table = write_txn.open_table(td)?; - let doc_key = &doc.name; let doc_bytes = Self::document_to_buf(&doc)?; - table.insert(doc_key.as_str(), doc_bytes.as_slice())?; + table.insert(document_id, doc_bytes.as_slice())?; } write_txn.commit()?; @@ -187,15 +176,16 @@ impl FirestorePersistentCacheBackend { Ok(doc) } - fn write_document(&self, doc: &Document, collection_id: &str) -> FirestoreResult<()> { - let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_id); + fn write_document(&self, doc: &Document) -> FirestoreResult<()> { + let (collection_path, document_id) = split_document_path(&doc.name); + + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path); let write_txn = self.redb.begin_write()?; { let mut table = write_txn.open_table(td)?; - let doc_key = &doc.name; let doc_bytes = Self::document_to_buf(doc)?; - table.insert(doc_key.as_str(), doc_bytes.as_slice())?; + table.insert(document_id, doc_bytes.as_slice())?; } write_txn.commit()?; Ok(()) @@ -224,8 +214,8 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { .config .collections .iter() - .map(|(collection, collection_config)| { - let collection_table_len = self.table_len(collection).ok().unwrap_or(0); + .map(|(collection_path, collection_config)| { + let collection_table_len = self.table_len(collection_path).ok().unwrap_or(0); let resume_type = if collection_table_len == 0 { Some(FirestoreListenerTargetResumeType::ReadTime(read_from_time)) } else { @@ -233,9 +223,12 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { }; FirestoreListenerTargetParams::new( collection_config.listener_target.clone(), - FirestoreTargetType::Query(FirestoreQueryParams::new( - collection.as_str().into(), - )), + FirestoreTargetType::Query( + FirestoreQueryParams::new( + collection_config.collection_name.as_str().into(), + ) + .opt_parent(collection_config.parent.clone()), + ), HashMap::new(), ) .opt_resume_type(resume_type) @@ -244,14 +237,14 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { } async fn invalidate_all(&self) -> FirestoreResult<()> { - for collection_id in self.config.collections.keys() { - let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_id.as_str()); + for collection_path in self.config.collections.keys() { + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path.as_str()); let write_txn = self.redb.begin_write()?; { debug!( "Invalidating {} and draining the corresponding table", - collection_id + collection_path ); let mut table = write_txn.open_table(td)?; table.drain::<&str>(..)?; @@ -270,36 +263,24 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { match event { FirestoreListenEvent::DocumentChange(doc_change) => { if let Some(doc) = doc_change.document { - if let Some(target_id) = doc_change.target_ids.first() { - if let Some(collection_id) = - self.collection_targets.get(&(*target_id as u32).into()) - { - trace!( - "Writing document to cache due to listener event: {:?}", - doc.name - ); - self.write_document(&doc, collection_id)?; - } - } + trace!( + "Writing document to cache due to listener event: {:?}", + doc.name + ); + self.write_document(&doc)?; } Ok(()) } FirestoreListenEvent::DocumentDelete(doc_deleted) => { - if let Some(target_id) = doc_deleted.removed_target_ids.first() { - if let Some(collection_id) = - self.collection_targets.get(&(*target_id as u32).into()) - { - let write_txn = self.redb.begin_write()?; - let td: TableDefinition<&str, &[u8]> = - TableDefinition::new(collection_id.as_str()); - let mut table = write_txn.open_table(td)?; - trace!( - "Removing document from cache due to listener event: {:?}", - doc_deleted.document.as_str() - ); - table.remove(doc_deleted.document.as_str())?; - } - } + let (collection_path, document_id) = split_document_path(&doc_deleted.document); + let write_txn = self.redb.begin_write()?; + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path); + let mut table = write_txn.open_table(td)?; + trace!( + "Removing document from cache due to listener event: {:?}", + doc_deleted.document.as_str() + ); + table.remove(document_id)?; Ok(()) } _ => Ok(()), @@ -311,30 +292,27 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend { async fn get_doc_by_path( &self, - collection_id: &str, document_path: &str, ) -> FirestoreResult> { - let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_id); + let (collection_path, document_id) = split_document_path(document_path); + + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path); let read_tx = self.redb.begin_read()?; let table = read_tx.open_table(td)?; - let value = table.get(document_path)?; + let value = table.get(document_id)?; value.map(|v| Self::buf_to_document(v.value())).transpose() } - async fn update_doc_by_path( - &self, - collection_id: &str, - document: &FirestoreDocument, - ) -> FirestoreResult<()> { - self.write_document(document, collection_id)?; + async fn update_doc_by_path(&self, document: &FirestoreDocument) -> FirestoreResult<()> { + self.write_document(document)?; Ok(()) } async fn list_all_docs( &self, - collection_id: &str, + collection_path: &str, ) -> FirestoreResult>> { - let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_id); + let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path); let read_tx = self.redb.begin_read()?; let table = read_tx.open_table(td)?; diff --git a/src/cache/configuration.rs b/src/cache/configuration.rs index 45ec286..1c8cdfa 100644 --- a/src/cache/configuration.rs +++ b/src/cache/configuration.rs @@ -1,5 +1,4 @@ -use crate::FirestoreListenerTarget; -use rsb_derive::Builder; +use crate::{FirestoreDb, FirestoreListenerTarget}; use std::collections::HashMap; #[derive(Clone)] @@ -16,40 +15,66 @@ impl FirestoreCacheConfiguration { } #[inline] - pub fn add_collection_config( + pub fn add_collection_config( mut self, - collection_path: S, - listener_target: FirestoreListenerTarget, - collection_load_mode: FirestoreCacheCollectionLoadMode, - ) -> Self - where - S: AsRef, - { - let collection_name_str = collection_path.as_ref().to_string(); + db: &FirestoreDb, + config: FirestoreCacheCollectionConfiguration, + ) -> Self { + let collection_path = { + if let Some(ref parent) = config.parent { + format!("{}/{}", parent, config.collection_name) + } else { + format!("{}/{}", db.get_documents_path(), config.collection_name) + } + }; self.collections.extend( - [( - collection_name_str.clone(), - FirestoreCacheCollectionConfiguration::new( - collection_name_str.to_string(), - listener_target, - collection_load_mode, - ), - )] - .into_iter() - .collect::>(), + [(collection_path, config)] + .into_iter() + .collect::>(), ); self } } -#[derive(Debug, Builder, Clone)] +#[derive(Debug, Clone)] pub struct FirestoreCacheCollectionConfiguration { - pub collection: String, + pub collection_name: String, + pub parent: Option, pub listener_target: FirestoreListenerTarget, pub collection_load_mode: FirestoreCacheCollectionLoadMode, } +impl FirestoreCacheCollectionConfiguration { + #[inline] + pub fn new( + collection_name: S, + listener_target: FirestoreListenerTarget, + collection_load_mode: FirestoreCacheCollectionLoadMode, + ) -> Self + where + S: AsRef, + { + Self { + collection_name: collection_name.as_ref().to_string(), + parent: None, + listener_target, + collection_load_mode, + } + } + + #[inline] + pub fn with_parent(self, parent: S) -> Self + where + S: AsRef, + { + Self { + parent: Some(parent.as_ref().to_string()), + ..self + } + } +} + #[derive(Debug, Clone)] pub enum FirestoreCacheCollectionLoadMode { PreloadAllDocs, diff --git a/src/cache/mod.rs b/src/cache/mod.rs index e51d090..576901c 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -148,41 +148,32 @@ pub trait FirestoreCacheBackend: FirestoreCacheDocsByPathSupport { pub trait FirestoreCacheDocsByPathSupport { async fn get_doc_by_path( &self, - collection_id: &str, document_path: &str, ) -> FirestoreResult>; async fn get_docs_by_paths<'a>( &'a self, - collection_id: &str, full_doc_ids: &'a [String], ) -> FirestoreResult)>>> where Self: Sync, { - let collection_id = collection_id.to_string(); Ok(Box::pin( futures::stream::iter(full_doc_ids.clone()).filter_map({ - move |document_path| { - let collection_id = collection_id.to_string(); - async move { - match self - .get_doc_by_path(collection_id.as_str(), document_path.as_str()) - .await - { - Ok(maybe_doc) => maybe_doc.map(|document| { - let doc_id = document - .name - .split('/') - .last() - .map(|s| s.to_string()) - .unwrap_or_else(|| document.name.clone()); - Ok((doc_id, Some(document))) - }), - Err(err) => { - error!("Error occurred while reading from cache: {}", err); - None - } + move |document_path| async move { + match self.get_doc_by_path(document_path.as_str()).await { + Ok(maybe_doc) => maybe_doc.map(|document| { + let doc_id = document + .name + .split('/') + .last() + .map(|s| s.to_string()) + .unwrap_or_else(|| document.name.clone()); + Ok((doc_id, Some(document))) + }), + Err(err) => { + error!("Error occurred while reading from cache: {}", err); + None } } } @@ -190,14 +181,10 @@ pub trait FirestoreCacheDocsByPathSupport { )) } - async fn update_doc_by_path( - &self, - collection_id: &str, - document: &FirestoreDocument, - ) -> FirestoreResult<()>; + async fn update_doc_by_path(&self, document: &FirestoreDocument) -> FirestoreResult<()>; async fn list_all_docs( &self, - collection_id: &str, + collection_path: &str, ) -> FirestoreResult>>; } diff --git a/src/db/get.rs b/src/db/get.rs index 8d57cbf..6a896dc 100644 --- a/src/db/get.rs +++ b/src/db/get.rs @@ -621,8 +621,7 @@ impl FirestoreDb { let doc = doc_response.into_inner(); #[cfg(feature = "caching")] if _return_only_fields_empty { - self.offer_doc_update_to_cache(collection_id.as_str(), &doc) - .await?; + self.offer_doc_update_to_cache(&doc).await?; } Ok(doc) } @@ -696,49 +695,40 @@ impl FirestoreDb { span.in_scope(|| debug!("Start consuming a batch of documents by ids")); let stream = response .into_inner() - .filter_map(move |r| { - #[cfg(feature = "caching")] - let collection_id = collection_id.clone(); - async move { - match r { - Ok(doc_response) => match doc_response.result { - Some(batch_get_documents_response::Result::Found(document)) => { - let doc_id = document - .name - .split('/') - .last() - .map(|s| s.to_string()) - .unwrap_or_else(|| document.name.clone()); - #[cfg(feature = "caching")] - { - self.offer_doc_update_to_cache( - collection_id.as_str(), - &document, - ) - .await - .ok(); - - Some(Ok((doc_id, Some(document)))) - } - #[cfg(not(feature = "caching"))] - { - Some(Ok((doc_id, Some(document)))) - } + .filter_map(move |r| async move { + match r { + Ok(doc_response) => match doc_response.result { + Some(batch_get_documents_response::Result::Found(document)) => { + let doc_id = document + .name + .split('/') + .last() + .map(|s| s.to_string()) + .unwrap_or_else(|| document.name.clone()); + #[cfg(feature = "caching")] + { + self.offer_doc_update_to_cache(&document).await.ok(); + + Some(Ok((doc_id, Some(document)))) } - Some(batch_get_documents_response::Result::Missing( - full_doc_id, - )) => { - let doc_id = full_doc_id - .split('/') - .last() - .map(|s| s.to_string()) - .unwrap_or_else(|| full_doc_id); - Some(Ok((doc_id, None))) + #[cfg(not(feature = "caching"))] + { + Some(Ok((doc_id, Some(document)))) } - None => None, - }, - Err(err) => Some(Err(err.into())), - } + } + Some(batch_get_documents_response::Result::Missing( + full_doc_id, + )) => { + let doc_id = full_doc_id + .split('/') + .last() + .map(|s| s.to_string()) + .unwrap_or_else(|| full_doc_id); + Some(Ok((doc_id, None))) + } + None => None, + }, + Err(err) => Some(Err(err.into())), } }) .boxed(); @@ -760,7 +750,7 @@ impl FirestoreDb { { let begin_query_utc: DateTime = Utc::now(); - let cache_response = cache.get_doc_by_path(collection_id, document_path).await?; + let cache_response = cache.get_doc_by_path(document_path).await?; let end_query_utc: DateTime = Utc::now(); let query_duration = end_query_utc.signed_duration_since(begin_query_utc); @@ -826,7 +816,7 @@ impl FirestoreDb { ); let cached_stream: BoxStream)>> = - cache.get_docs_by_paths(collection_id, full_doc_ids).await?; + cache.get_docs_by_paths(full_doc_ids).await?; let cached_vec: Vec<(String, Option)> = cached_stream.try_collect::>().await?; @@ -860,13 +850,12 @@ impl FirestoreDb { #[inline] pub(crate) async fn offer_doc_update_to_cache( &self, - collection_id: &str, document: &FirestoreDocument, ) -> FirestoreResult<()> { if let FirestoreDbSessionCacheMode::ReadThroughCache(ref cache) = self.session_params.cache_mode { - cache.update_doc_by_path(collection_id, document).await?; + cache.update_doc_by_path(document).await?; } Ok(()) } diff --git a/src/db/list.rs b/src/db/list.rs index 3651482..14ff3f6 100644 --- a/src/db/list.rs +++ b/src/db/list.rs @@ -117,10 +117,8 @@ impl FirestoreListingSupport for FirestoreDb { ) -> FirestoreResult>> { #[cfg(feature = "caching")] { - let collection_id = params.collection_id.clone(); - if let FirestoreCachedValue::UseCached(stream) = self - .list_docs_from_cache(collection_id, ¶ms.return_only_fields) - .await? + if let FirestoreCachedValue::UseCached(stream) = + self.list_docs_from_cache(¶ms).await? { return Ok(stream); } @@ -486,8 +484,7 @@ impl FirestoreDb { #[inline] pub async fn list_docs_from_cache<'a>( &'a self, - collection_id: String, - _return_only_fields: &Option>, + params: &FirestoreListDocParams, ) -> FirestoreResult>>> { if let FirestoreDbSessionCacheMode::ReadCachedOnly(ref cache) = @@ -496,14 +493,24 @@ impl FirestoreDb { let span = span!( Level::DEBUG, "Firestore List Cached", - "/firestore/collection_name" = collection_id, + "/firestore/collection_name" = params.collection_id, ); span.in_scope(|| { - debug!("Reading all {} documents from cache", collection_id); + debug!("Reading all {} documents from cache", params.collection_id); }); - let stream = cache.list_all_docs(collection_id.as_str()).await?; + let collection_path = if let Some(parent) = params.parent.as_ref() { + format!("{}/{}", parent, params.collection_id.as_str()) + } else { + format!( + "{}/{}", + self.get_documents_path(), + params.collection_id.as_str() + ) + }; + + let stream = cache.list_all_docs(&collection_path).await?; return Ok(FirestoreCachedValue::UseCached(stream)); } Ok(FirestoreCachedValue::SkipCache) diff --git a/src/db/mod.rs b/src/db/mod.rs index d9fac6a..44a2469 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -229,16 +229,16 @@ impl FirestoreDb { #[inline] pub fn parent_path( &self, - parent_collection_name: &str, - parent_document_id: S, + collection_name: &str, + document_id: S, ) -> FirestoreResult where S: AsRef, { Ok(ParentPathBuilder::new(safe_document_path( self.inner.doc_path.as_str(), - parent_collection_name, - parent_document_id.as_ref(), + collection_name, + document_id.as_ref(), )?)) } @@ -356,6 +356,16 @@ where } } +pub(crate) fn split_document_path(path: &str) -> (&str, &str) { + // Return string range the last part after '/' + let split_pos = path.rfind('/').map(|pos| pos + 1).unwrap_or(0); + if split_pos == 0 { + ("", path) + } else { + (&path[0..split_pos - 1], &path[split_pos..]) + } +} + #[cfg(test)] mod tests { use super::*; @@ -410,4 +420,15 @@ mod tests { "http://invalid:localhost:8080" ); } + + #[test] + fn test_split_document_path() { + assert_eq!( + split_document_path("projects/test-project/databases/(default)/documents/test/test1"), + ( + "projects/test-project/databases/(default)/documents/test", + "test1" + ) + ); + } } diff --git a/tests/caching_memory_test.rs b/tests/caching_memory_test.rs index 65e5eb2..8842212 100644 --- a/tests/caching_memory_test.rs +++ b/tests/caching_memory_test.rs @@ -49,14 +49,20 @@ async fn precondition_tests() -> Result<(), Box Result<(), Box