Skip to content

Commit

Permalink
Faster preloading with query (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence authored Oct 7, 2023
1 parent 60d77ab commit 081f480
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 14 deletions.
25 changes: 17 additions & 8 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use futures::stream::BoxStream;
use moka::future::{Cache, CacheBuilder};

use crate::cache::cache_query_engine::FirestoreCacheQueryEngine;
use futures::TryStreamExt;
use futures::{future, StreamExt};
use std::collections::HashMap;
use tracing::*;
Expand Down Expand Up @@ -70,22 +69,32 @@ impl FirestoreMemoryCacheBackend {

let params = if let Some(parent) = &config.parent {
db.fluent()
.list()
.from(&config.collection_name)
.select()
.from(config.collection_name.as_str())
.parent(parent)
} else {
db.fluent().list().from(&config.collection_name)
db.fluent().select().from(config.collection_name.as_str())
};

let stream = params.page_size(1000).stream_all_with_errors().await?;
let stream = params.stream_query().await?;

stream
.try_for_each_concurrent(2, |doc| async move {
.enumerate()
.map(|(index, docs)| {
if index > 0 && index % 5000 == 0 {
debug!(
"Preloading collection `{}`: {} entries loaded",
collection_path.as_str(),
index
);
}
docs
})
.for_each_concurrent(1, |doc| async move {
let (_, document_id) = split_document_path(&doc.name);
mem_cache.insert(document_id.to_string(), doc).await;
Ok(())
})
.await?;
.await;

mem_cache.run_pending_tasks().await;

Expand Down
19 changes: 15 additions & 4 deletions src/cache/backends/persistent_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,27 @@ impl FirestorePersistentCacheBackend {
debug!("Preloading {}", collection_path.as_str());
let params = if let Some(parent) = &config.parent {
db.fluent()
.list()
.from(&config.collection_name)
.select()
.from(config.collection_name.as_str())
.parent(parent)
} else {
db.fluent().list().from(&config.collection_name)
db.fluent().select().from(config.collection_name.as_str())
};

let stream = params.page_size(1000).stream_all().await?;
let stream = params.stream_query().await?;

stream
.enumerate()
.map(|(index, docs)| {
if index > 0 && index % 5000 == 0 {
debug!(
"Preloading collection `{}`: {} entries loaded",
collection_path.as_str(),
index
);
}
docs
})
.ready_chunks(100)
.for_each(|docs| async move {
if let Err(err) = self.write_batch_docs(collection_path, docs) {
Expand Down
37 changes: 37 additions & 0 deletions src/cache/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct FirestoreCacheCollectionConfiguration {
pub parent: Option<String>,
pub listener_target: FirestoreListenerTarget,
pub collection_load_mode: FirestoreCacheCollectionLoadMode,
pub indices: Vec<FirestoreCacheIndexConfiguration>,
}

impl FirestoreCacheCollectionConfiguration {
Expand All @@ -60,6 +61,7 @@ impl FirestoreCacheCollectionConfiguration {
parent: None,
listener_target,
collection_load_mode,
indices: Vec::new(),
}
}

Expand All @@ -73,6 +75,13 @@ impl FirestoreCacheCollectionConfiguration {
..self
}
}

#[inline]
pub fn with_index(self, index: FirestoreCacheIndexConfiguration) -> Self {
let mut indices = self.indices;
indices.push(index);
Self { indices, ..self }
}
}

#[derive(Debug, Clone)]
Expand All @@ -81,3 +90,31 @@ pub enum FirestoreCacheCollectionLoadMode {
PreloadAllIfEmpty,
PreloadNone,
}

#[derive(Debug, Clone)]
pub struct FirestoreCacheIndexConfiguration {
pub fields: Vec<String>,
pub unique: bool,
}

impl FirestoreCacheIndexConfiguration {
#[inline]
pub fn new<I>(fields: I) -> Self
where
I: IntoIterator,
I::Item: AsRef<str>,
{
Self {
fields: fields.into_iter().map(|s| s.as_ref().to_string()).collect(),
unique: false,
}
}

#[inline]
pub fn unique(self, value: bool) -> Self {
Self {
unique: value,
..self
}
}
}
5 changes: 3 additions & 2 deletions src/db/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,10 @@ impl FirestoreDb {
);
span.in_scope(|| {
debug!(
"Listing documents in {:?} took {}ms",
"Listing documents in {:?} took {}ms. Returned: {}",
params.collection_id,
listing_duration.num_milliseconds()
listing_duration.num_milliseconds(),
result.documents.len()
);
});

Expand Down

0 comments on commit 081f480

Please sign in to comment.