Skip to content

Commit

Permalink
List docs cache internal API updated the same way as for query
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 7, 2023
1 parent 90caa15 commit 8cf7ce3
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 43 deletions.
8 changes: 4 additions & 4 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend {
async fn list_all_docs(
&self,
collection_path: &str,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>> {
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>> {
match self.collection_caches.get(collection_path) {
Some(mem_cache) => Ok(Box::pin(futures::stream::iter(
mem_cache.iter().map(|(_, doc)| Ok(doc)),
Some(mem_cache) => Ok(FirestoreCachedValue::UseCached(Box::pin(
futures::stream::iter(mem_cache.iter().map(|(_, doc)| Ok(doc))),
))),
None => Ok(Box::pin(futures::stream::empty())),
None => Ok(FirestoreCachedValue::SkipCache),
}
}

Expand Down
87 changes: 52 additions & 35 deletions src/cache/backends/persistent_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,20 @@ impl FirestorePersistentCacheBackend {
fn write_document(&self, doc: &Document) -> FirestoreResult<()> {
let (collection_path, document_id) = split_document_path(&doc.name);

let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
if self.config.collections.get(collection_path).is_some() {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);

let write_txn = self.redb.begin_write()?;
{
let mut table = write_txn.open_table(td)?;
let doc_bytes = Self::document_to_buf(doc)?;
table.insert(document_id, doc_bytes.as_slice())?;
let write_txn = self.redb.begin_write()?;
{
let mut table = write_txn.open_table(td)?;
let doc_bytes = Self::document_to_buf(doc)?;
table.insert(document_id, doc_bytes.as_slice())?;
}
write_txn.commit()?;
Ok(())
} else {
Ok(())
}
write_txn.commit()?;
Ok(())
}

fn table_len(&self, collection_id: &str) -> FirestoreResult<u64> {
Expand Down Expand Up @@ -326,12 +330,15 @@ impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend {
document_path: &str,
) -> FirestoreResult<Option<FirestoreDocument>> {
let (collection_path, document_id) = split_document_path(document_path);

let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let value = table.get(document_id)?;
value.map(|v| Self::buf_to_document(v.value())).transpose()
if self.config.collections.get(collection_path).is_some() {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let value = table.get(document_id)?;
value.map(|v| Self::buf_to_document(v.value())).transpose()
} else {
Ok(None)
}
}

async fn update_doc_by_path(&self, document: &FirestoreDocument) -> FirestoreResult<()> {
Expand All @@ -342,36 +349,46 @@ impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend {
async fn list_all_docs(
&self,
collection_path: &str,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>> {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);

let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let iter = table.iter()?;
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>> {
if self.config.collections.get(collection_path).is_some() {
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);

let read_tx = self.redb.begin_read()?;
let table = read_tx.open_table(td)?;
let iter = table.iter()?;

// It seems there is no way to work with streaming for redb, so this is not efficient
let mut docs: Vec<FirestoreResult<FirestoreDocument>> = Vec::new();
for record in iter {
let (_, v) = record?;
let doc = Self::buf_to_document(v.value())?;
docs.push(Ok(doc));
}

// It seems there is no way to work with streaming for redb, so this is not efficient
let mut docs: Vec<FirestoreResult<FirestoreDocument>> = Vec::new();
for record in iter {
let (_, v) = record?;
let doc = Self::buf_to_document(v.value())?;
docs.push(Ok(doc));
Ok(FirestoreCachedValue::UseCached(Box::pin(
futures::stream::iter(docs),
)))
} else {
Ok(FirestoreCachedValue::SkipCache)
}

Ok(Box::pin(futures::stream::iter(docs)))
}

async fn query_docs(
&self,
collection_path: &str,
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>> {
// For now only basic/simple query all supported
let simple_query_engine = FirestoreCacheQueryEngine::new(query);
if simple_query_engine.params_supported() {
Ok(FirestoreCachedValue::UseCached(
self.query_cached_docs(collection_path, simple_query_engine)
.await?,
))
if self.config.collections.get(collection_path).is_some() {
// For now only basic/simple query all supported
let simple_query_engine = FirestoreCacheQueryEngine::new(query);
if simple_query_engine.params_supported() {
Ok(FirestoreCachedValue::UseCached(
self.query_cached_docs(collection_path, simple_query_engine)
.await?,
))
} else {
Ok(FirestoreCachedValue::SkipCache)
}
} else {
Ok(FirestoreCachedValue::SkipCache)
}
Expand Down
2 changes: 1 addition & 1 deletion src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub trait FirestoreCacheDocsByPathSupport {
async fn list_all_docs(
&self,
collection_path: &str,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>>;
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>>;

async fn query_docs(
&self,
Expand Down
2 changes: 1 addition & 1 deletion src/db/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ impl FirestoreDb {
} else {
span.record("/firestore/cache_result", "miss");
span.in_scope(|| {
info!("Not all documents were found in cache. Reading from Firestore.")
debug!("Not all documents were found in cache. Reading from Firestore.")
});
return Ok(FirestoreCachedValue::SkipCache);
}
Expand Down
4 changes: 2 additions & 2 deletions src/db/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,8 @@ impl FirestoreDb {
)
};

let stream = cache.list_all_docs(&collection_path).await?;
return Ok(FirestoreCachedValue::UseCached(stream));
let cached_result = cache.list_all_docs(&collection_path).await?;
return Ok(cached_result);
}
Ok(FirestoreCachedValue::SkipCache)
}
Expand Down

0 comments on commit 8cf7ce3

Please sign in to comment.