From 081f480c4d2acf0855973c94e68a2b917074e193 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Sat, 7 Oct 2023 15:47:56 +0200 Subject: [PATCH] Faster preloading with query (#132) --- src/cache/backends/memory_backend.rs | 25 +++++++++++----- src/cache/backends/persistent_backend.rs | 19 +++++++++--- src/cache/configuration.rs | 37 ++++++++++++++++++++++++ src/db/list.rs | 5 ++-- 4 files changed, 72 insertions(+), 14 deletions(-) diff --git a/src/cache/backends/memory_backend.rs b/src/cache/backends/memory_backend.rs index 33b38b1..d9758ab 100644 --- a/src/cache/backends/memory_backend.rs +++ b/src/cache/backends/memory_backend.rs @@ -6,7 +6,6 @@ use futures::stream::BoxStream; use moka::future::{Cache, CacheBuilder}; use crate::cache::cache_query_engine::FirestoreCacheQueryEngine; -use futures::TryStreamExt; use futures::{future, StreamExt}; use std::collections::HashMap; use tracing::*; @@ -70,22 +69,32 @@ impl FirestoreMemoryCacheBackend { let params = if let Some(parent) = &config.parent { db.fluent() - .list() - .from(&config.collection_name) + .select() + .from(config.collection_name.as_str()) .parent(parent) } else { - db.fluent().list().from(&config.collection_name) + db.fluent().select().from(config.collection_name.as_str()) }; - let stream = params.page_size(1000).stream_all_with_errors().await?; + let stream = params.stream_query().await?; stream - .try_for_each_concurrent(2, |doc| async move { + .enumerate() + .map(|(index, docs)| { + if index > 0 && index % 5000 == 0 { + debug!( + "Preloading collection `{}`: {} entries loaded", + collection_path.as_str(), + index + ); + } + docs + }) + .for_each_concurrent(1, |doc| async move { let (_, document_id) = split_document_path(&doc.name); mem_cache.insert(document_id.to_string(), doc).await; - Ok(()) }) - .await?; + .await; mem_cache.run_pending_tasks().await; diff --git a/src/cache/backends/persistent_backend.rs b/src/cache/backends/persistent_backend.rs index b5ea9ca..1d9fe59 100644 --- a/src/cache/backends/persistent_backend.rs +++ b/src/cache/backends/persistent_backend.rs @@ -98,16 +98,27 @@ impl FirestorePersistentCacheBackend { debug!("Preloading {}", collection_path.as_str()); let params = if let Some(parent) = &config.parent { db.fluent() - .list() - .from(&config.collection_name) + .select() + .from(config.collection_name.as_str()) .parent(parent) } else { - db.fluent().list().from(&config.collection_name) + db.fluent().select().from(config.collection_name.as_str()) }; - let stream = params.page_size(1000).stream_all().await?; + let stream = params.stream_query().await?; stream + .enumerate() + .map(|(index, docs)| { + if index > 0 && index % 5000 == 0 { + debug!( + "Preloading collection `{}`: {} entries loaded", + collection_path.as_str(), + index + ); + } + docs + }) .ready_chunks(100) .for_each(|docs| async move { if let Err(err) = self.write_batch_docs(collection_path, docs) { diff --git a/src/cache/configuration.rs b/src/cache/configuration.rs index 1c8cdfa..baa634f 100644 --- a/src/cache/configuration.rs +++ b/src/cache/configuration.rs @@ -43,6 +43,7 @@ pub struct FirestoreCacheCollectionConfiguration { pub parent: Option, pub listener_target: FirestoreListenerTarget, pub collection_load_mode: FirestoreCacheCollectionLoadMode, + pub indices: Vec, } impl FirestoreCacheCollectionConfiguration { @@ -60,6 +61,7 @@ impl FirestoreCacheCollectionConfiguration { parent: None, listener_target, collection_load_mode, + indices: Vec::new(), } } @@ -73,6 +75,13 @@ impl FirestoreCacheCollectionConfiguration { ..self } } + + #[inline] + pub fn with_index(self, index: FirestoreCacheIndexConfiguration) -> Self { + let mut indices = self.indices; + indices.push(index); + Self { indices, ..self } + } } #[derive(Debug, Clone)] @@ -81,3 +90,31 @@ pub enum FirestoreCacheCollectionLoadMode { PreloadAllIfEmpty, PreloadNone, } + +#[derive(Debug, Clone)] +pub struct FirestoreCacheIndexConfiguration { + pub fields: Vec, + pub unique: bool, +} + +impl FirestoreCacheIndexConfiguration { + #[inline] + pub fn new(fields: I) -> Self + where + I: IntoIterator, + I::Item: AsRef, + { + Self { + fields: fields.into_iter().map(|s| s.as_ref().to_string()).collect(), + unique: false, + } + } + + #[inline] + pub fn unique(self, value: bool) -> Self { + Self { + unique: value, + ..self + } + } +} diff --git a/src/db/list.rs b/src/db/list.rs index eedfac1..e6a9125 100644 --- a/src/db/list.rs +++ b/src/db/list.rs @@ -371,9 +371,10 @@ impl FirestoreDb { ); span.in_scope(|| { debug!( - "Listing documents in {:?} took {}ms", + "Listing documents in {:?} took {}ms. Returned: {}", params.collection_id, - listing_duration.num_milliseconds() + listing_duration.num_milliseconds(), + result.documents.len() ); });