Skip to content

Commit

Permalink
Support for nested collections and performance optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 6, 2023
1 parent a7aa97f commit e3a89b5
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 286 deletions.
9 changes: 6 additions & 3 deletions examples/caching_memory_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
&db,
FirestoreMemoryCacheBackend::new(
FirestoreCacheConfiguration::new().add_collection_config(
TEST_COLLECTION_NAME,
FirestoreListenerTarget::new(1000),
FirestoreCacheCollectionLoadMode::PreloadNone,
&db,
FirestoreCacheCollectionConfiguration::new(
TEST_COLLECTION_NAME,
FirestoreListenerTarget::new(1000),
FirestoreCacheCollectionLoadMode::PreloadNone,
),
),
)?,
FirestoreMemListenStateStorage::new(),
Expand Down
9 changes: 6 additions & 3 deletions examples/caching_persistent_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
&db,
FirestorePersistentCacheBackend::new(
FirestoreCacheConfiguration::new().add_collection_config(
TEST_COLLECTION_NAME,
FirestoreListenerTarget::new(1000),
FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty,
&db,
FirestoreCacheCollectionConfiguration::new(
TEST_COLLECTION_NAME,
FirestoreListenerTarget::new(1000),
FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty,
),
),
)?,
FirestoreTempFilesListenStateStorage::new(),
Expand Down
125 changes: 55 additions & 70 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub type FirestoreMemCacheOptions = CacheBuilder<String, FirestoreDocument, Fire
pub struct FirestoreMemoryCacheBackend {
pub config: FirestoreCacheConfiguration,
collection_caches: HashMap<String, FirestoreMemCache>,
collection_targets: HashMap<FirestoreListenerTarget, String>,
}

impl FirestoreMemoryCacheBackend {
Expand All @@ -31,49 +30,43 @@ impl FirestoreMemoryCacheBackend {
let collection_caches = config
.collections
.keys()
.map(|collection| {
.map(|collection_path| {
(
collection.clone(),
collection_mem_options(collection.as_str()).build(),
collection_path.clone(),
collection_mem_options(collection_path.as_str()).build(),
)
})
.collect();

let collection_targets = config
.collections
.iter()
.map(|(collection, collection_config)| {
(
collection_config.listener_target.clone(),
collection.clone(),
)
})
.collect();
Ok(Self {
config,
collection_caches,
collection_targets,
})
}

async fn preload_collections(&self, db: &FirestoreDb) -> Result<(), FirestoreError> {
for (collection, config) in &self.config.collections {
for (collection_path, config) in &self.config.collections {
match config.collection_load_mode {
FirestoreCacheCollectionLoadMode::PreloadAllDocs
| FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty => {
if let Some(mem_cache) = self.collection_caches.get(collection.as_str()) {
debug!("Preloading {}", collection.as_str());
let stream = db
.fluent()
.list()
.from(collection.as_str())
.page_size(1000)
.stream_all_with_errors()
.await?;
if let Some(mem_cache) = self.collection_caches.get(collection_path.as_str()) {
debug!("Preloading {}", collection_path.as_str());

let params = if let Some(parent) = &config.parent {
db.fluent()
.list()
.from(&config.collection_name)
.parent(parent)
} else {
db.fluent().list().from(&config.collection_name)
};

let stream = params.page_size(1000).stream_all_with_errors().await?;

stream
.try_for_each_concurrent(2, |doc| async move {
mem_cache.insert(doc.name.clone(), doc).await;
let (_, document_id) = split_document_path(&doc.name);
mem_cache.insert(document_id.to_string(), doc).await;
Ok(())
})
.await?;
Expand All @@ -82,7 +75,7 @@ impl FirestoreMemoryCacheBackend {

info!(
"Preloading collection `{}` has been finished. Loaded: {} entries",
collection.as_str(),
collection_path.as_str(),
mem_cache.entry_count()
);
}
Expand All @@ -108,13 +101,16 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {
Ok(self
.config
.collections
.iter()
.map(|(collection, collection_config)| {
.values()
.map(|collection_config| {
FirestoreListenerTargetParams::new(
collection_config.listener_target.clone(),
FirestoreTargetType::Query(FirestoreQueryParams::new(
collection.as_str().into(),
)),
FirestoreTargetType::Query(
FirestoreQueryParams::new(
collection_config.collection_name.as_str().into(),
)
.opt_parent(collection_config.parent.clone()),
),
HashMap::new(),
)
.with_resume_type(FirestoreListenerTargetResumeType::ReadTime(read_from_time))
Expand All @@ -123,8 +119,8 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {
}

async fn invalidate_all(&self) -> FirestoreResult<()> {
for (collection_id, mem_cache) in &self.collection_caches {
debug!("Invalidating cache for {}", collection_id);
for (collection_path, mem_cache) in &self.collection_caches {
debug!("Invalidating cache for {}", collection_path);
mem_cache.invalidate_all();
mem_cache.run_pending_tasks().await;
}
Expand All @@ -139,35 +135,25 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {
match event {
FirestoreListenEvent::DocumentChange(doc_change) => {
if let Some(doc) = doc_change.document {
if let Some(target_id) = doc_change.target_ids.first() {
if let Some(mem_cache_name) =
self.collection_targets.get(&(*target_id as u32).into())
{
if let Some(mem_cache) = self.collection_caches.get(mem_cache_name) {
trace!(
"Writing document to cache due to listener event: {:?}",
doc.name
);
mem_cache.insert(doc.name.clone(), doc).await;
}
}
let (collection_path, document_id) = split_document_path(&doc.name);
if let Some(mem_cache) = self.collection_caches.get(collection_path) {
trace!(
"Writing document to cache due to listener event: {:?}",
doc.name
);
mem_cache.insert(document_id.to_string(), doc).await;
}
}
Ok(())
}
FirestoreListenEvent::DocumentDelete(doc_deleted) => {
if let Some(target_id) = doc_deleted.removed_target_ids.first() {
if let Some(mem_cache_name) =
self.collection_targets.get(&(*target_id as u32).into())
{
if let Some(mem_cache) = self.collection_caches.get(mem_cache_name) {
trace!(
"Removing document from cache due to listener event: {:?}",
doc_deleted.document.as_str()
);
mem_cache.remove(&doc_deleted.document).await;
}
}
let (collection_path, document_id) = split_document_path(&doc_deleted.document);
if let Some(mem_cache) = self.collection_caches.get(collection_path) {
trace!(
"Removing document from cache due to listener event: {:?}",
doc_deleted.document.as_str()
);
mem_cache.remove(document_id).await;
}
Ok(())
}
Expand All @@ -180,24 +166,23 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {
impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend {
async fn get_doc_by_path(
&self,
collection_id: &str,
document_path: &str,
) -> FirestoreResult<Option<FirestoreDocument>> {
match self.collection_caches.get(collection_id) {
Some(mem_cache) => Ok(mem_cache.get(document_path).await),
let (collection_path, document_id) = split_document_path(document_path);

match self.collection_caches.get(collection_path) {
Some(mem_cache) => Ok(mem_cache.get(document_id).await),
None => Ok(None),
}
}

async fn update_doc_by_path(
&self,
collection_id: &str,
document: &FirestoreDocument,
) -> FirestoreResult<()> {
match self.collection_caches.get(collection_id) {
async fn update_doc_by_path(&self, document: &FirestoreDocument) -> FirestoreResult<()> {
let (collection_path, document_id) = split_document_path(&document.name);

match self.collection_caches.get(collection_path) {
Some(mem_cache) => {
mem_cache
.insert(document.name.clone(), document.clone())
.insert(document_id.to_string(), document.clone())
.await;
Ok(())
}
Expand All @@ -207,9 +192,9 @@ impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend {

async fn list_all_docs(
&self,
collection_id: &str,
collection_path: &str,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>> {
match self.collection_caches.get(collection_id) {
match self.collection_caches.get(collection_path) {
Some(mem_cache) => Ok(Box::pin(futures::stream::iter(
mem_cache.iter().map(|(_, doc)| Ok(doc)),
))),
Expand Down
Loading

0 comments on commit e3a89b5

Please sign in to comment.