Skip to content

Commit

Permalink
Query refactorings and caching query initial template
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 6, 2023
1 parent 280228a commit b013209
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 50 deletions.
17 changes: 14 additions & 3 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ impl FirestoreMemoryCacheBackend {
Self::with_collection_options(config, |_| FirestoreMemCache::builder().max_capacity(10000))
}

pub fn with_collection_options(
pub fn with_collection_options<FN>(
config: FirestoreCacheConfiguration,
collection_mem_options: fn(&str) -> FirestoreMemCacheOptions,
) -> FirestoreResult<Self> {
collection_mem_options: FN,
) -> FirestoreResult<Self>
where
FN: Fn(&str) -> FirestoreMemCacheOptions,
{
let collection_caches = config
.collections
.keys()
Expand Down Expand Up @@ -201,4 +204,12 @@ impl FirestoreCacheDocsByPathSupport for FirestoreMemoryCacheBackend {
None => Ok(Box::pin(futures::stream::empty())),
}
}

async fn query_docs(
&self,
collection_path: &str,
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>> {
todo!()
}
}
8 changes: 8 additions & 0 deletions src/cache/backends/persistent_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,14 @@ impl FirestoreCacheDocsByPathSupport for FirestorePersistentCacheBackend {

Ok(Box::pin(futures::stream::iter(docs)))
}

async fn query_docs(
&self,
collection_path: &str,
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>> {
todo!()
}
}

impl From<redb::Error> for FirestoreError {
Expand Down
6 changes: 6 additions & 0 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,10 @@ pub trait FirestoreCacheDocsByPathSupport {
&self,
collection_path: &str,
) -> FirestoreResult<BoxStream<FirestoreResult<FirestoreDocument>>>;

async fn query_docs(
&self,
collection_path: &str,
query: &FirestoreQueryParams,
) -> FirestoreResult<FirestoreCachedValue<BoxStream<FirestoreResult<FirestoreDocument>>>>;
}
90 changes: 43 additions & 47 deletions src/db/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ impl FirestoreDb {
span: Span,
) -> BoxFuture<FirestoreResult<BoxStream<FirestoreResult<Option<Document>>>>> {
async move {
#[cfg(feature = "caching")]
{
if let FirestoreCachedValue::UseCached(stream) =
self.query_docs_from_cache(&params).await?
{
return Ok(stream);
}
}

let query_request = self.create_query_request(params.clone())?;
let begin_query_utc: DateTime<Utc> = Utc::now();

Expand Down Expand Up @@ -164,59 +155,55 @@ impl FirestoreDb {
async fn query_docs_from_cache<'a>(
&'a self,
params: &FirestoreQueryParams,
) -> FirestoreResult<
FirestoreCachedValue<BoxStream<'a, FirestoreResult<Option<FirestoreDocument>>>>,
> {
Ok(FirestoreCachedValue::SkipCache)
) -> FirestoreResult<FirestoreCachedValue<BoxStream<'a, FirestoreResult<FirestoreDocument>>>>
{
match &params.collection_id {
FirestoreQueryCollection::Group(_) => Ok(FirestoreCachedValue::SkipCache),
FirestoreQueryCollection::Single(collection_id) => {
if let FirestoreDbSessionCacheMode::ReadCachedOnly(ref cache) =
self.session_params.cache_mode
{
let span = span!(
Level::DEBUG,
"Firestore Query Cached",
"/firestore/collection_name" = collection_id.as_str(),
);

span.in_scope(|| {
debug!("Querying {} documents from cache", collection_id);
});

let collection_path = if let Some(parent) = params.parent.as_ref() {
format!("{}/{}", parent, collection_id)
} else {
format!("{}/{}", self.get_documents_path(), collection_id.as_str())
};

cache.query_docs(&collection_path, &params).await
} else {
Ok(FirestoreCachedValue::SkipCache)
}
}
}
}
}

#[async_trait]
impl FirestoreQuerySupport for FirestoreDb {
async fn query_doc(&self, params: FirestoreQueryParams) -> FirestoreResult<Vec<Document>> {
let span = span!(
Level::DEBUG,
"Firestore Query",
"/firestore/collection_name" = params.collection_id.to_string().as_str(),
"/firestore/response_time" = field::Empty
);

let doc_stream = self
.stream_query_doc_with_retries(params, 0, span)
.await?
.filter_map(|doc_res| {
future::ready(match doc_res {
Ok(Some(doc)) => Some(Ok(doc)),
Ok(None) => None,
Err(err) => {
error!("Error occurred while consuming query: {}", err);
Some(Err(err))
}
})
});

let doc_stream = self.stream_query_doc_with_errors(params).await?;
Ok(doc_stream.try_collect::<Vec<Document>>().await?)
}

async fn stream_query_doc<'a>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'a, Document>> {
let collection_str = params.collection_id.to_string();

let span = span!(
Level::DEBUG,
"Firestore Streaming Query",
"/firestore/collection_name" = collection_str.as_str(),
"/firestore/response_time" = field::Empty
);

let doc_stream = self.stream_query_doc_with_retries(params, 0, span).await?;
let doc_stream = self.stream_query_doc_with_errors(params).await?;

Ok(Box::pin(doc_stream.filter_map(|doc_res| {
future::ready(match doc_res {
Ok(Some(doc)) => Some(doc),
Ok(None) => None,
Ok(doc) => Some(doc),
Err(err) => {
error!("Error occurred while consuming query: {}", err);
None
Expand All @@ -229,6 +216,15 @@ impl FirestoreQuerySupport for FirestoreDb {
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'a, FirestoreResult<Document>>> {
#[cfg(feature = "caching")]
{
if let FirestoreCachedValue::UseCached(stream) =
self.query_docs_from_cache(&params).await?
{
return Ok(stream);
}
}

let collection_str = params.collection_id.to_string();

let span = span!(
Expand Down Expand Up @@ -276,7 +272,7 @@ impl FirestoreQuerySupport for FirestoreDb {
Ok(obj) => Some(obj),
Err(err) => {
error!(
"Error occurred while consuming query document as a stream: {}",
"Error occurred while converting query document in a stream: {}",
err
);
None
Expand Down

0 comments on commit b013209

Please sign in to comment.