diff --git a/examples/query.rs b/examples/query.rs index 9d56fbf..34219ea 100644 --- a/examples/query.rs +++ b/examples/query.rs @@ -29,34 +29,38 @@ async fn main() -> Result<(), Box> { // Create an instance let db = FirestoreDb::new(&config_env_var("PROJECT_ID")?).await?; - const TEST_COLLECTION_NAME: &'static str = "test"; + const TEST_COLLECTION_NAME: &'static str = "test-query"; - println!("Populating a test collection"); - for i in 0..10 { - let my_struct = MyTestStructure { - some_id: format!("test-{}", i), - some_string: "Test".to_string(), - one_more_string: "Test2".to_string(), - some_num: 42 - i, - created_at: Utc::now(), - }; + if db + .fluent() + .select() + .by_id_in(TEST_COLLECTION_NAME) + .one("test-0") + .await? + .is_none() + { + println!("Populating a test collection"); + let batch_writer = db.create_simple_batch_writer().await?; + let mut current_batch = batch_writer.new_batch(); - // Remove if it already exist - db.fluent() - .delete() - .from(TEST_COLLECTION_NAME) - .document_id(&my_struct.some_id) - .execute() - .await?; + for i in 0..500 { + let my_struct = MyTestStructure { + some_id: format!("test-{}", i), + some_string: "Test".to_string(), + one_more_string: "Test2".to_string(), + some_num: i, + created_at: Utc::now(), + }; - // Let's insert some data - db.fluent() - .insert() - .into(TEST_COLLECTION_NAME) - .document_id(&my_struct.some_id) - .object(&my_struct) - .execute() - .await?; + // Let's insert some data + db.fluent() + .update() + .in_col(TEST_COLLECTION_NAME) + .document_id(&my_struct.some_id) + .object(&my_struct) + .add_to_batch(&mut current_batch)?; + } + current_batch.write().await?; } println!("Querying a test collection as a stream using Fluent API"); diff --git a/src/db/query.rs b/src/db/query.rs index 9453b5c..96b3599 100644 --- a/src/db/query.rs +++ b/src/db/query.rs @@ -1,7 +1,4 @@ -use crate::{ - FirestoreDb, FirestoreError, FirestorePartition, FirestorePartitionQueryParams, - FirestoreQueryCursor, FirestoreQueryParams, FirestoreResult, -}; +use crate::*; use async_trait::async_trait; use chrono::prelude::*; use futures::future::BoxFuture; @@ -21,33 +18,33 @@ pub type PeekableBoxStream<'a, T> = futures::stream::Peekable>; pub trait FirestoreQuerySupport { async fn query_doc(&self, params: FirestoreQueryParams) -> FirestoreResult>; - async fn stream_query_doc<'b>( - &self, + async fn stream_query_doc<'a>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult>; + ) -> FirestoreResult>; - async fn stream_query_doc_with_errors<'b>( - &self, + async fn stream_query_doc_with_errors<'a>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult>>; + ) -> FirestoreResult>>; async fn query_obj(&self, params: FirestoreQueryParams) -> FirestoreResult> where for<'de> T: Deserialize<'de>; - async fn stream_query_obj<'b, T>( - &self, + async fn stream_query_obj<'a, T>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult> + ) -> FirestoreResult> where for<'de> T: Deserialize<'de>; - async fn stream_query_obj_with_errors<'b, T>( - &self, + async fn stream_query_obj_with_errors<'a, T>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult>> + ) -> FirestoreResult>> where for<'de> T: Deserialize<'de>, - T: Send + 'b; + T: Send + 'a; fn stream_partition_cursors_with_errors( &self, @@ -91,13 +88,22 @@ impl FirestoreDb { })) } - fn stream_query_doc_with_retries<'a, 'b>( - &'a self, + fn stream_query_doc_with_retries( + &self, params: FirestoreQueryParams, retries: usize, - span: &'a Span, - ) -> BoxFuture<'a, FirestoreResult>>>> { + span: Span, + ) -> BoxFuture>>>> { async move { + #[cfg(feature = "caching")] + { + if let FirestoreCachedValue::UseCached(stream) = + self.query_docs_from_cache(¶ms).await? + { + return Ok(stream); + } + } + let query_request = self.create_query_request(params.clone())?; let begin_query_utc: DateTime = Utc::now(); @@ -153,87 +159,49 @@ impl FirestoreDb { .boxed() } - fn query_doc_with_retries<'a>( + #[cfg(feature = "caching")] + #[inline] + async fn query_docs_from_cache<'a>( &'a self, - params: FirestoreQueryParams, - retries: usize, - span: &'a Span, - ) -> BoxFuture<'a, FirestoreResult>> { - async move { - let collection_id = params.collection_id.to_string(); - let query_request = self.create_query_request(params.clone())?; - let begin_query_utc: DateTime = Utc::now(); - - match self - .client() - .get() - .run_query(query_request) - .map_err(|e| e.into()) - .await - { - Ok(query_response) => { - let query_stream = query_response - .into_inner() - .map_ok(|rs| rs.document) - .try_collect::>>() - .await? - .into_iter() - .flatten() - .collect(); - let end_query_utc: DateTime = Utc::now(); - let query_duration = end_query_utc.signed_duration_since(begin_query_utc); - - span.record( - "/firestore/response_time", - query_duration.num_milliseconds(), - ); - span.in_scope(|| { - debug!( - "Querying documents in {:?} took {}ms", - collection_id, - query_duration.num_milliseconds() - ); - }); - - Ok(query_stream) - } - Err(err) => match err { - FirestoreError::DatabaseError(ref db_err) - if db_err.retry_possible && retries < self.inner.options.max_retries => - { - warn!( - "Failed with {}. Retrying: {}/{}", - db_err, - retries + 1, - self.inner.options.max_retries - ); - self.query_doc_with_retries(params, retries + 1, span).await - } - _ => Err(err), - }, - } - } - .boxed() + params: &FirestoreQueryParams, + ) -> FirestoreResult< + FirestoreCachedValue>>>, + > { + Ok(FirestoreCachedValue::SkipCache) } } #[async_trait] impl FirestoreQuerySupport for FirestoreDb { async fn query_doc(&self, params: FirestoreQueryParams) -> FirestoreResult> { - let collection_str = params.collection_id.to_string(); let span = span!( Level::DEBUG, "Firestore Query", - "/firestore/collection_name" = collection_str.as_str(), + "/firestore/collection_name" = params.collection_id.to_string().as_str(), "/firestore/response_time" = field::Empty ); - self.query_doc_with_retries(params, 0, &span).await + + let doc_stream = self + .stream_query_doc_with_retries(params, 0, span) + .await? + .filter_map(|doc_res| { + future::ready(match doc_res { + Ok(Some(doc)) => Some(Ok(doc)), + Ok(None) => None, + Err(err) => { + error!("Error occurred while consuming query: {}", err); + Some(Err(err)) + } + }) + }); + + Ok(doc_stream.try_collect::>().await?) } - async fn stream_query_doc<'b>( - &self, + async fn stream_query_doc<'a>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult> { + ) -> FirestoreResult> { let collection_str = params.collection_id.to_string(); let span = span!( @@ -243,7 +211,7 @@ impl FirestoreQuerySupport for FirestoreDb { "/firestore/response_time" = field::Empty ); - let doc_stream = self.stream_query_doc_with_retries(params, 0, &span).await?; + let doc_stream = self.stream_query_doc_with_retries(params, 0, span).await?; Ok(Box::pin(doc_stream.filter_map(|doc_res| { future::ready(match doc_res { @@ -257,10 +225,10 @@ impl FirestoreQuerySupport for FirestoreDb { }))) } - async fn stream_query_doc_with_errors<'b>( - &self, + async fn stream_query_doc_with_errors<'a>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult>> { + ) -> FirestoreResult>> { let collection_str = params.collection_id.to_string(); let span = span!( @@ -270,7 +238,7 @@ impl FirestoreQuerySupport for FirestoreDb { "/firestore/response_time" = field::Empty ); - let doc_stream = self.stream_query_doc_with_retries(params, 0, &span).await?; + let doc_stream = self.stream_query_doc_with_retries(params, 0, span).await?; Ok(Box::pin(doc_stream.filter_map(|doc_res| { future::ready(match doc_res { @@ -295,10 +263,10 @@ impl FirestoreQuerySupport for FirestoreDb { .collect() } - async fn stream_query_obj<'b, T>( - &self, + async fn stream_query_obj<'a, T>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult> + ) -> FirestoreResult> where for<'de> T: Deserialize<'de>, { @@ -317,13 +285,13 @@ impl FirestoreQuerySupport for FirestoreDb { }))) } - async fn stream_query_obj_with_errors<'b, T>( - &self, + async fn stream_query_obj_with_errors<'a, T>( + &'a self, params: FirestoreQueryParams, - ) -> FirestoreResult>> + ) -> FirestoreResult>> where for<'de> T: Deserialize<'de>, - T: Send + 'b, + T: Send + 'a, { let doc_stream = self.stream_query_doc_with_errors(params).await?; Ok(Box::pin(doc_stream.and_then(|doc| { @@ -470,10 +438,7 @@ impl FirestoreQuerySupport for FirestoreDb { Some(parallelism), |(cursor_pair, tx, partition_params, span)| async move { span.in_scope(|| { - debug!( - "Streaming partition cursor {:?}", - cursor_pair - ) + debug!("Streaming partition cursor {:?}",cursor_pair) }); let mut params_with_cursors = partition_params.query_params; diff --git a/src/fluent_api/select_builder.rs b/src/fluent_api/select_builder.rs index e39a8f6..b633259 100644 --- a/src/fluent_api/select_builder.rs +++ b/src/fluent_api/select_builder.rs @@ -229,13 +229,13 @@ where self.db.query_doc(self.params).await } - pub async fn stream_query<'b>(self) -> FirestoreResult> { + pub async fn stream_query(self) -> FirestoreResult> { self.db.stream_query_doc(self.params).await } - pub async fn stream_query_with_errors<'b>( + pub async fn stream_query_with_errors( self, - ) -> FirestoreResult>> { + ) -> FirestoreResult>> { self.db.stream_query_doc_with_errors(self.params).await } } @@ -273,15 +273,15 @@ where self.db.query_obj(self.params).await } - pub async fn stream_query<'b>(self) -> FirestoreResult> { + pub async fn stream_query(self) -> FirestoreResult> { self.db.stream_query_obj(self.params).await } - pub async fn stream_query_with_errors<'b>( + pub async fn stream_query_with_errors( self, - ) -> FirestoreResult>> + ) -> FirestoreResult>> where - T: 'b, + T: 'a, { self.db.stream_query_obj_with_errors(self.params).await } diff --git a/src/fluent_api/tests/mockdb.rs b/src/fluent_api/tests/mockdb.rs index 337f176..21bb0b0 100644 --- a/src/fluent_api/tests/mockdb.rs +++ b/src/fluent_api/tests/mockdb.rs @@ -15,17 +15,17 @@ impl FirestoreQuerySupport for MockDatabase { unreachable!() } - async fn stream_query_doc<'b>( - &self, + async fn stream_query_doc<'a>( + &'a self, _params: FirestoreQueryParams, - ) -> FirestoreResult> { + ) -> FirestoreResult> { unreachable!() } - async fn stream_query_doc_with_errors<'b>( - &self, + async fn stream_query_doc_with_errors<'a>( + &'a self, _params: FirestoreQueryParams, - ) -> FirestoreResult>> { + ) -> FirestoreResult>> { unreachable!() } @@ -36,23 +36,23 @@ impl FirestoreQuerySupport for MockDatabase { unreachable!() } - async fn stream_query_obj<'b, T>( - &self, + async fn stream_query_obj<'a, T>( + &'a self, _params: FirestoreQueryParams, - ) -> FirestoreResult> + ) -> FirestoreResult> where for<'de> T: Deserialize<'de>, { unreachable!() } - async fn stream_query_obj_with_errors<'b, T>( - &self, + async fn stream_query_obj_with_errors<'a, T>( + &'a self, _params: FirestoreQueryParams, - ) -> FirestoreResult>> + ) -> FirestoreResult>> where for<'de> T: Deserialize<'de>, - T: Send + 'b, + T: Send + 'a, { unreachable!() }