Skip to content

Commit

Permalink
Query doc refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Oct 6, 2023
1 parent 58ce73e commit 280228a
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 148 deletions.
54 changes: 29 additions & 25 deletions examples/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create an instance
let db = FirestoreDb::new(&config_env_var("PROJECT_ID")?).await?;

const TEST_COLLECTION_NAME: &'static str = "test";
const TEST_COLLECTION_NAME: &'static str = "test-query";

println!("Populating a test collection");
for i in 0..10 {
let my_struct = MyTestStructure {
some_id: format!("test-{}", i),
some_string: "Test".to_string(),
one_more_string: "Test2".to_string(),
some_num: 42 - i,
created_at: Utc::now(),
};
if db
.fluent()
.select()
.by_id_in(TEST_COLLECTION_NAME)
.one("test-0")
.await?
.is_none()
{
println!("Populating a test collection");
let batch_writer = db.create_simple_batch_writer().await?;
let mut current_batch = batch_writer.new_batch();

// Remove if it already exist
db.fluent()
.delete()
.from(TEST_COLLECTION_NAME)
.document_id(&my_struct.some_id)
.execute()
.await?;
for i in 0..500 {
let my_struct = MyTestStructure {
some_id: format!("test-{}", i),
some_string: "Test".to_string(),
one_more_string: "Test2".to_string(),
some_num: i,
created_at: Utc::now(),
};

// Let's insert some data
db.fluent()
.insert()
.into(TEST_COLLECTION_NAME)
.document_id(&my_struct.some_id)
.object(&my_struct)
.execute()
.await?;
// Let's insert some data
db.fluent()
.update()
.in_col(TEST_COLLECTION_NAME)
.document_id(&my_struct.some_id)
.object(&my_struct)
.add_to_batch(&mut current_batch)?;
}
current_batch.write().await?;
}

println!("Querying a test collection as a stream using Fluent API");
Expand Down
171 changes: 68 additions & 103 deletions src/db/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
FirestoreDb, FirestoreError, FirestorePartition, FirestorePartitionQueryParams,
FirestoreQueryCursor, FirestoreQueryParams, FirestoreResult,
};
use crate::*;
use async_trait::async_trait;
use chrono::prelude::*;
use futures::future::BoxFuture;
Expand All @@ -21,33 +18,33 @@ pub type PeekableBoxStream<'a, T> = futures::stream::Peekable<BoxStream<'a, T>>;
pub trait FirestoreQuerySupport {
async fn query_doc(&self, params: FirestoreQueryParams) -> FirestoreResult<Vec<Document>>;

async fn stream_query_doc<'b>(
&self,
async fn stream_query_doc<'a>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, Document>>;
) -> FirestoreResult<BoxStream<'a, Document>>;

async fn stream_query_doc_with_errors<'b>(
&self,
async fn stream_query_doc_with_errors<'a>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<Document>>>;
) -> FirestoreResult<BoxStream<'a, FirestoreResult<Document>>>;

async fn query_obj<T>(&self, params: FirestoreQueryParams) -> FirestoreResult<Vec<T>>
where
for<'de> T: Deserialize<'de>;
async fn stream_query_obj<'b, T>(
&self,
async fn stream_query_obj<'a, T>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, T>>
) -> FirestoreResult<BoxStream<'a, T>>
where
for<'de> T: Deserialize<'de>;

async fn stream_query_obj_with_errors<'b, T>(
&self,
async fn stream_query_obj_with_errors<'a, T>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<T>>>
) -> FirestoreResult<BoxStream<'a, FirestoreResult<T>>>
where
for<'de> T: Deserialize<'de>,
T: Send + 'b;
T: Send + 'a;

fn stream_partition_cursors_with_errors(
&self,
Expand Down Expand Up @@ -91,13 +88,22 @@ impl FirestoreDb {
}))
}

fn stream_query_doc_with_retries<'a, 'b>(
&'a self,
fn stream_query_doc_with_retries(
&self,
params: FirestoreQueryParams,
retries: usize,
span: &'a Span,
) -> BoxFuture<'a, FirestoreResult<BoxStream<'b, FirestoreResult<Option<Document>>>>> {
span: Span,
) -> BoxFuture<FirestoreResult<BoxStream<FirestoreResult<Option<Document>>>>> {
async move {
#[cfg(feature = "caching")]
{
if let FirestoreCachedValue::UseCached(stream) =
self.query_docs_from_cache(&params).await?
{
return Ok(stream);
}
}

let query_request = self.create_query_request(params.clone())?;
let begin_query_utc: DateTime<Utc> = Utc::now();

Expand Down Expand Up @@ -153,87 +159,49 @@ impl FirestoreDb {
.boxed()
}

fn query_doc_with_retries<'a>(
#[cfg(feature = "caching")]
#[inline]
async fn query_docs_from_cache<'a>(
&'a self,
params: FirestoreQueryParams,
retries: usize,
span: &'a Span,
) -> BoxFuture<'a, FirestoreResult<Vec<Document>>> {
async move {
let collection_id = params.collection_id.to_string();
let query_request = self.create_query_request(params.clone())?;
let begin_query_utc: DateTime<Utc> = Utc::now();

match self
.client()
.get()
.run_query(query_request)
.map_err(|e| e.into())
.await
{
Ok(query_response) => {
let query_stream = query_response
.into_inner()
.map_ok(|rs| rs.document)
.try_collect::<Vec<Option<Document>>>()
.await?
.into_iter()
.flatten()
.collect();
let end_query_utc: DateTime<Utc> = Utc::now();
let query_duration = end_query_utc.signed_duration_since(begin_query_utc);

span.record(
"/firestore/response_time",
query_duration.num_milliseconds(),
);
span.in_scope(|| {
debug!(
"Querying documents in {:?} took {}ms",
collection_id,
query_duration.num_milliseconds()
);
});

Ok(query_stream)
}
Err(err) => match err {
FirestoreError::DatabaseError(ref db_err)
if db_err.retry_possible && retries < self.inner.options.max_retries =>
{
warn!(
"Failed with {}. Retrying: {}/{}",
db_err,
retries + 1,
self.inner.options.max_retries
);
self.query_doc_with_retries(params, retries + 1, span).await
}
_ => Err(err),
},
}
}
.boxed()
params: &FirestoreQueryParams,
) -> FirestoreResult<
FirestoreCachedValue<BoxStream<'a, FirestoreResult<Option<FirestoreDocument>>>>,
> {
Ok(FirestoreCachedValue::SkipCache)
}
}

#[async_trait]
impl FirestoreQuerySupport for FirestoreDb {
async fn query_doc(&self, params: FirestoreQueryParams) -> FirestoreResult<Vec<Document>> {
let collection_str = params.collection_id.to_string();
let span = span!(
Level::DEBUG,
"Firestore Query",
"/firestore/collection_name" = collection_str.as_str(),
"/firestore/collection_name" = params.collection_id.to_string().as_str(),
"/firestore/response_time" = field::Empty
);
self.query_doc_with_retries(params, 0, &span).await

let doc_stream = self
.stream_query_doc_with_retries(params, 0, span)
.await?
.filter_map(|doc_res| {
future::ready(match doc_res {
Ok(Some(doc)) => Some(Ok(doc)),
Ok(None) => None,
Err(err) => {
error!("Error occurred while consuming query: {}", err);
Some(Err(err))
}
})
});

Ok(doc_stream.try_collect::<Vec<Document>>().await?)
}

async fn stream_query_doc<'b>(
&self,
async fn stream_query_doc<'a>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, Document>> {
) -> FirestoreResult<BoxStream<'a, Document>> {
let collection_str = params.collection_id.to_string();

let span = span!(
Expand All @@ -243,7 +211,7 @@ impl FirestoreQuerySupport for FirestoreDb {
"/firestore/response_time" = field::Empty
);

let doc_stream = self.stream_query_doc_with_retries(params, 0, &span).await?;
let doc_stream = self.stream_query_doc_with_retries(params, 0, span).await?;

Ok(Box::pin(doc_stream.filter_map(|doc_res| {
future::ready(match doc_res {
Expand All @@ -257,10 +225,10 @@ impl FirestoreQuerySupport for FirestoreDb {
})))
}

async fn stream_query_doc_with_errors<'b>(
&self,
async fn stream_query_doc_with_errors<'a>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<Document>>> {
) -> FirestoreResult<BoxStream<'a, FirestoreResult<Document>>> {
let collection_str = params.collection_id.to_string();

let span = span!(
Expand All @@ -270,7 +238,7 @@ impl FirestoreQuerySupport for FirestoreDb {
"/firestore/response_time" = field::Empty
);

let doc_stream = self.stream_query_doc_with_retries(params, 0, &span).await?;
let doc_stream = self.stream_query_doc_with_retries(params, 0, span).await?;

Ok(Box::pin(doc_stream.filter_map(|doc_res| {
future::ready(match doc_res {
Expand All @@ -295,10 +263,10 @@ impl FirestoreQuerySupport for FirestoreDb {
.collect()
}

async fn stream_query_obj<'b, T>(
&self,
async fn stream_query_obj<'a, T>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, T>>
) -> FirestoreResult<BoxStream<'a, T>>
where
for<'de> T: Deserialize<'de>,
{
Expand All @@ -317,13 +285,13 @@ impl FirestoreQuerySupport for FirestoreDb {
})))
}

async fn stream_query_obj_with_errors<'b, T>(
&self,
async fn stream_query_obj_with_errors<'a, T>(
&'a self,
params: FirestoreQueryParams,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<T>>>
) -> FirestoreResult<BoxStream<'a, FirestoreResult<T>>>
where
for<'de> T: Deserialize<'de>,
T: Send + 'b,
T: Send + 'a,
{
let doc_stream = self.stream_query_doc_with_errors(params).await?;
Ok(Box::pin(doc_stream.and_then(|doc| {
Expand Down Expand Up @@ -470,10 +438,7 @@ impl FirestoreQuerySupport for FirestoreDb {
Some(parallelism),
|(cursor_pair, tx, partition_params, span)| async move {
span.in_scope(|| {
debug!(
"Streaming partition cursor {:?}",
cursor_pair
)
debug!("Streaming partition cursor {:?}",cursor_pair)
});

let mut params_with_cursors = partition_params.query_params;
Expand Down
14 changes: 7 additions & 7 deletions src/fluent_api/select_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ where
self.db.query_doc(self.params).await
}

pub async fn stream_query<'b>(self) -> FirestoreResult<BoxStream<'b, Document>> {
pub async fn stream_query(self) -> FirestoreResult<BoxStream<'a, Document>> {
self.db.stream_query_doc(self.params).await
}

pub async fn stream_query_with_errors<'b>(
pub async fn stream_query_with_errors(
self,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<Document>>> {
) -> FirestoreResult<BoxStream<'a, FirestoreResult<Document>>> {
self.db.stream_query_doc_with_errors(self.params).await
}
}
Expand Down Expand Up @@ -273,15 +273,15 @@ where
self.db.query_obj(self.params).await
}

pub async fn stream_query<'b>(self) -> FirestoreResult<BoxStream<'b, T>> {
pub async fn stream_query(self) -> FirestoreResult<BoxStream<'a, T>> {
self.db.stream_query_obj(self.params).await
}

pub async fn stream_query_with_errors<'b>(
pub async fn stream_query_with_errors(
self,
) -> FirestoreResult<BoxStream<'b, FirestoreResult<T>>>
) -> FirestoreResult<BoxStream<'a, FirestoreResult<T>>>
where
T: 'b,
T: 'a,
{
self.db.stream_query_obj_with_errors(self.params).await
}
Expand Down
Loading

0 comments on commit 280228a

Please sign in to comment.