diff --git a/src/cache/backends/memory_backend.rs b/src/cache/backends/memory_backend.rs index d9758ab..b04a445 100644 --- a/src/cache/backends/memory_backend.rs +++ b/src/cache/backends/memory_backend.rs @@ -65,7 +65,7 @@ impl FirestoreMemoryCacheBackend { FirestoreCacheCollectionLoadMode::PreloadAllDocs | FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty => { if let Some(mem_cache) = self.collection_caches.get(collection_path.as_str()) { - debug!("Preloading {}", collection_path.as_str()); + debug!(collection_path, "Preloading collection."); let params = if let Some(parent) = &config.parent { db.fluent() @@ -83,9 +83,9 @@ impl FirestoreMemoryCacheBackend { .map(|(index, docs)| { if index > 0 && index % 5000 == 0 { debug!( - "Preloading collection `{}`: {} entries loaded", - collection_path.as_str(), - index + collection_path = collection_path.as_str(), + entries_loaded = index, + "Collection preload in progress...", ); } docs @@ -99,9 +99,9 @@ impl FirestoreMemoryCacheBackend { mem_cache.run_pending_tasks().await; info!( - "Preloading collection `{}` has been finished. Loaded: {} entries", - collection_path.as_str(), - mem_cache.entry_count() + collection_path = collection_path.as_str(), + entry_count = mem_cache.entry_count(), + "Preloading collection has been finished.", ); } } @@ -185,7 +185,7 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend { async fn invalidate_all(&self) -> FirestoreResult<()> { for (collection_path, mem_cache) in &self.collection_caches { - debug!("Invalidating cache for {}", collection_path); + debug!(collection_path, "Invalidating cache for collection."); mem_cache.invalidate_all(); mem_cache.run_pending_tasks().await; } @@ -203,8 +203,8 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend { let (collection_path, document_id) = split_document_path(&doc.name); if let Some(mem_cache) = self.collection_caches.get(collection_path) { trace!( - "Writing document to cache due to listener event: {:?}", - doc.name + doc_name = ?doc.name, + "Writing document to cache due to listener event.", ); mem_cache.insert(document_id.to_string(), doc).await; } @@ -215,8 +215,8 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend { let (collection_path, document_id) = split_document_path(&doc_deleted.document); if let Some(mem_cache) = self.collection_caches.get(collection_path) { trace!( - "Removing document from cache due to listener event: {:?}", - doc_deleted.document.as_str() + deleted_doc = ?doc_deleted.document.as_str(), + "Removing document from cache due to listener event.", ); mem_cache.remove(document_id).await; } diff --git a/src/cache/backends/persistent_backend.rs b/src/cache/backends/persistent_backend.rs index 1d9fe59..cc79215 100644 --- a/src/cache/backends/persistent_backend.rs +++ b/src/cache/backends/persistent_backend.rs @@ -26,14 +26,14 @@ impl FirestorePersistentCacheBackend { if !db_dir.exists() { debug!( - "Creating a temp directory to store persistent cache: {}", - db_dir.display() + directory = %db_dir.display(), + "Creating a temp directory to store persistent cache.", ); std::fs::create_dir_all(&db_dir)?; } else { debug!( - "Using a temp directory to store persistent cache: {}", - db_dir.display() + directory = %db_dir.display(), + "Using a temp directory to store persistent cache.", ); } Self::with_options(config, db_dir.join("redb")) @@ -44,21 +44,15 @@ impl FirestorePersistentCacheBackend { data_file_path: PathBuf, ) -> FirestoreResult { if data_file_path.exists() { - debug!( - "Opening database for persistent cache {:?}...", - data_file_path - ); + debug!(?data_file_path, "Opening database for persistent cache...",); } else { - debug!( - "Creating database for persistent cache {:?}...", - data_file_path - ); + debug!(?data_file_path, "Creating database for persistent cache...",); } let mut db = Database::create(data_file_path)?; db.compact()?; - info!("Successfully opened database for persistent cache"); + info!("Successfully opened database for persistent cache."); Ok(Self { config, redb: db }) } @@ -88,14 +82,18 @@ impl FirestorePersistentCacheBackend { ) && existing_records > 0 { info!( - "Preloading collection `{}` has been skipped. Already loaded: {} entries", - collection_path.as_str(), - existing_records - ); + collection_path = collection_path.as_str(), + entries_loaded = existing_records, + "Preloading collection has been skipped.", + ); continue; } - debug!("Preloading {}", collection_path.as_str()); + debug!( + collection_path = collection_path.as_str(), + "Preloading collection." + ); + let params = if let Some(parent) = &config.parent { db.fluent() .select() @@ -112,9 +110,9 @@ impl FirestorePersistentCacheBackend { .map(|(index, docs)| { if index > 0 && index % 5000 == 0 { debug!( - "Preloading collection `{}`: {} entries loaded", - collection_path.as_str(), - index + collection_path = collection_path.as_str(), + entries_loaded = index, + "Collection preload in progress...", ); } docs @@ -122,7 +120,7 @@ impl FirestorePersistentCacheBackend { .ready_chunks(100) .for_each(|docs| async move { if let Err(err) = self.write_batch_docs(collection_path, docs) { - error!("Error while preloading collection: {}", err); + error!(?err, "Error while preloading collection."); } }) .await; @@ -140,17 +138,13 @@ impl FirestorePersistentCacheBackend { }; info!( - "Preloading collection `{}` has been finished. Loaded: {} entries", - collection_path.as_str(), - updated_records + collection_path = collection_path.as_str(), + updated_records, "Preloading collection has been finished.", ); } FirestoreCacheCollectionLoadMode::PreloadNone => { let tx = self.redb.begin_write()?; - debug!( - "Creating corresponding collection table `{}`", - collection_path - ); + debug!(collection_path, "Creating corresponding collection table.",); tx.open_table(td)?; tx.commit()?; } @@ -289,8 +283,8 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { let write_txn = self.redb.begin_write()?; { debug!( - "Invalidating {} and draining the corresponding table", - collection_path + collection_path, + "Invalidating collection and draining the corresponding table.", ); let mut table = write_txn.open_table(td)?; table.drain::<&str>(..)?; @@ -310,9 +304,10 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { FirestoreListenEvent::DocumentChange(doc_change) => { if let Some(doc) = doc_change.document { trace!( - "Writing document to cache due to listener event: {:?}", - doc.name + doc_name = ?doc.name, + "Writing document to cache due to listener event.", ); + self.write_document(&doc)?; } Ok(()) @@ -322,10 +317,12 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend { let write_txn = self.redb.begin_write()?; let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path); let mut table = write_txn.open_table(td)?; + trace!( - "Removing document from cache due to listener event: {:?}", - doc_deleted.document.as_str() + deleted_doc = ?doc_deleted.document.as_str(), + "Removing document from cache due to listener event.", ); + table.remove(document_id)?; Ok(()) } diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 855721c..8ea45e1 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -108,7 +108,7 @@ where let backend = backend.clone(); async move { if let Err(err) = backend.on_listen_event(event).await { - error!("Error occurred while updating cache: {}", err); + error!(?err, "Error occurred while updating cache."); }; Ok(()) } @@ -175,7 +175,7 @@ pub trait FirestoreCacheDocsByPathSupport { Ok((doc_id, Some(document))) }), Err(err) => { - error!("Error occurred while reading from cache: {}", err); + error!(%err, "Error occurred while reading from cache."); None } } diff --git a/src/db/aggregated_query.rs b/src/db/aggregated_query.rs index c146a22..db9e95a 100644 --- a/src/db/aggregated_query.rs +++ b/src/db/aggregated_query.rs @@ -181,7 +181,7 @@ impl FirestoreAggregatedQuerySupport for FirestoreDb { Ok(Some(doc)) => Some(doc), Ok(None) => None, Err(err) => { - error!("Error occurred while consuming query: {}", err); + error!(%err, "Error occurred while consuming query."); None } }) @@ -210,7 +210,7 @@ impl FirestoreAggregatedQuerySupport for FirestoreDb { Ok(Some(doc)) => Some(Ok(doc)), Ok(None) => None, Err(err) => { - error!("Error occurred while consuming query: {}", err); + error!(%err, "Error occurred while consuming query."); Some(Err(err)) } }) @@ -244,8 +244,8 @@ impl FirestoreAggregatedQuerySupport for FirestoreDb { Ok(obj) => Some(obj), Err(err) => { error!( - "Error occurred while consuming query document as a stream: {}", - err + %err, + "Error occurred while consuming query document as a stream.", ); None } @@ -328,9 +328,9 @@ impl FirestoreDb { ); span.in_scope(|| { debug!( - "Querying stream of documents in {:?} took {}ms", - params.query_params.collection_id, - query_duration.num_milliseconds() + collection_id = ?params.query_params.collection_id, + duration_milliseconds = query_duration.num_milliseconds(), + "Querying stream of documents in specified collection.", ); }); @@ -341,10 +341,10 @@ impl FirestoreDb { if db_err.retry_possible && retries < self.inner.options.max_retries => { warn!( - "Failed with {}. Retrying: {}/{}", - db_err, - retries + 1, - self.inner.options.max_retries + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + "Failed to run aggregation query. Retrying up to the specified number of times.", ); self.stream_aggregated_query_doc_with_retries(params, retries + 1, span) @@ -392,9 +392,9 @@ impl FirestoreDb { ); span.in_scope(|| { debug!( - "Querying documents in {:?} took {}ms", - params.query_params.collection_id, - query_duration.num_milliseconds() + collection_id = ?params.query_params.collection_id, + duration_milliseconds = query_duration.num_milliseconds(), + "Querying documents in specified collection.", ); }); @@ -405,11 +405,12 @@ impl FirestoreDb { if db_err.retry_possible && retries < self.inner.options.max_retries => { warn!( - "Failed with {}. Retrying: {}/{}", - db_err, - retries + 1, - self.inner.options.max_retries + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + "Failed to run aggregation query. Retrying up to the specified number of times.", ); + self.aggregated_query_doc_with_retries(params, retries + 1, span) .await } diff --git a/src/db/batch_streaming_writer.rs b/src/db/batch_streaming_writer.rs index 562cfca..48a6a92 100644 --- a/src/db/batch_streaming_writer.rs +++ b/src/db/batch_streaming_writer.rs @@ -40,7 +40,8 @@ pub struct FirestoreStreamingBatchWriter { impl Drop for FirestoreStreamingBatchWriter { fn drop(&mut self) { if !self.finished.load(Ordering::Relaxed) { - self.batch_span.in_scope(|| warn!("Batch was not finished")); + self.batch_span + .in_scope(|| warn!("Batch was not finished.")); } } } @@ -122,8 +123,9 @@ impl FirestoreStreamingBatchWriter { } Err(err) => { error!( - "Batch write operation {} failed: {}", - received_counter, err + %err, + received_counter, + "Batch write operation failed.", ); responses_writer.send(Err(err)).ok(); break; @@ -142,7 +144,7 @@ impl FirestoreStreamingBatchWriter { break; } Err(err) if err.code() == gcloud_sdk::tonic::Code::Cancelled => { - debug!("Batch write operation finished on: {}", received_counter); + debug!(received_counter, "Batch write operation finished."); responses_writer .send(Ok(FirestoreBatchWriteResponse::new( received_counter - 1, @@ -154,8 +156,9 @@ impl FirestoreStreamingBatchWriter { } Err(err) => { error!( - "Batch write operation {} failed: {}", - received_counter, err + %err, + received_counter, + "Batch write operation failed.", ); responses_writer.send(Err(err.into())).ok(); break; @@ -182,7 +185,10 @@ impl FirestoreStreamingBatchWriter { } } Err(err) => { - error!("Batch write operation failed: {}", err); + error!( + %err, + "Batch write operation failed.", + ); responses_writer.send(Err(err.into())).ok(); } } @@ -228,7 +234,7 @@ impl FirestoreStreamingBatchWriter { > self.received_counter.load(Ordering::Relaxed) - 1 { drop(locked); - debug!("Still waiting receiving responses for batch writes"); + debug!("Still waiting to receive responses for batch writes."); self.init_wait_reader.recv().await; } else { drop(locked); diff --git a/src/db/create.rs b/src/db/create.rs index 80a214f..3cd05ed 100644 --- a/src/db/create.rs +++ b/src/db/create.rs @@ -130,9 +130,9 @@ impl FirestoreCreateSupport for FirestoreDb { span.in_scope(|| { debug!( - "Created a new document: {}/{:?}", collection_id, - document_id.as_ref().map(|id| id.as_ref()) + document_id = document_id.as_ref().map(|id| id.as_ref()), + "Created a new document.", ); }); diff --git a/src/db/delete.rs b/src/db/delete.rs index dd97eed..aa33d42 100644 --- a/src/db/delete.rs +++ b/src/db/delete.rs @@ -84,9 +84,9 @@ impl FirestoreDeleteSupport for FirestoreDb { span.in_scope(|| { debug!( - "Deleted a document: {}/{}", collection_id, - document_id.as_ref() + document_id = document_id.as_ref(), + "Deleted a document.", ); }); diff --git a/src/db/get.rs b/src/db/get.rs index e0fbd76..1b5aab4 100644 --- a/src/db/get.rs +++ b/src/db/get.rs @@ -380,8 +380,8 @@ impl FirestoreGetByIdSupport for FirestoreDb { Ok(doc_pair) => Some(doc_pair), Err(err) => { error!( - "Error occurred while consuming batch get as a stream: {}", - err + %err, + "Error occurred while consuming batch get as a stream.", ); None } @@ -470,8 +470,8 @@ impl FirestoreGetByIdSupport for FirestoreDb { Ok(obj) => Some((doc_id, Some(obj))), Err(err) => { error!( - "Error occurred while consuming batch documents as a stream: {}", - err + %err, + "Error occurred while consuming batch documents as a stream.", ); None } @@ -612,9 +612,9 @@ impl FirestoreDb { Ok(doc_response) => { span.in_scope(|| { debug!( - "Reading document {} took {}ms", document_path, - query_duration.num_milliseconds() + duration_milliseconds = query_duration.num_milliseconds(), + "Read document.", ); }); @@ -631,10 +631,10 @@ impl FirestoreDb { { span.in_scope(|| { warn!( - "Failed with {}. Retrying: {}/{}", - db_err, - retries + 1, - self.get_options().max_retries + err = %db_err, + current_retry = retries + 1, + max_retries = self.get_options().max_retries, + "Failed to get document. Retrying up to the specified number of times.", ); }); self.get_doc_by_path(collection_id, document_path, None, retries + 1) @@ -692,7 +692,7 @@ impl FirestoreDb { match self.client().get().batch_get_documents(request).await { Ok(response) => { - span.in_scope(|| debug!("Start consuming a batch of documents by ids")); + span.in_scope(|| debug!("Start consuming a batch of documents by IDs.")); let stream = response .into_inner() .filter_map(move |r| async move { @@ -768,9 +768,9 @@ impl FirestoreDb { span.record("/firestore/cache_result", "hit"); span.in_scope(|| { debug!( - "Reading document {} from cache took {}ms", document_path, - query_duration.num_milliseconds() + duration_milliseconds = query_duration.num_milliseconds(), + "Read document from cache.", ); }); @@ -778,7 +778,7 @@ impl FirestoreDb { } else { span.record("/firestore/cache_result", "miss"); span.in_scope(|| { - debug!("Missing document {} in cache", document_path); + debug!(document_path, "Missing document in cache."); }); if let FirestoreDbSessionCacheMode::ReadCachedOnly(_) = self.session_params.cache_mode @@ -786,7 +786,7 @@ impl FirestoreDb { return Err(FirestoreError::DataNotFoundError( FirestoreDataNotFoundError::new( FirestoreErrorPublicGenericDetails::new("CACHE_MISS".to_string()), - format!("Document {} not found in cache", document_path), + format!("Document {document_path} not found in cache"), ), )); } @@ -840,7 +840,10 @@ impl FirestoreDb { { span.record("/firestore/cache_result", "hit"); span.in_scope(|| { - debug!("Reading {} documents from cache", full_doc_ids.len()); + debug!( + num_documents = full_doc_ids.len(), + "Reading documents from cache." + ); }); return Ok(FirestoreCachedValue::UseCached(Box::pin( futures::stream::iter(cached_vec) diff --git a/src/db/list.rs b/src/db/list.rs index e6a9125..0a0536d 100644 --- a/src/db/list.rs +++ b/src/db/list.rs @@ -144,7 +144,7 @@ impl FirestoreListingSupport for FirestoreDb { } } Err(err) => { - error!("Error occurred while consuming documents: {}", err); + error!(%err, "Error occurred while consuming documents."); Some((Err(err), None)) } } @@ -176,7 +176,7 @@ impl FirestoreListingSupport for FirestoreDb { future::ready(match doc_res { Ok(doc) => Some(doc), Err(err) => { - error!("Error occurred while consuming documents: {}", err); + error!(%err, "Error occurred while consuming documents."); None } }) @@ -197,8 +197,8 @@ impl FirestoreListingSupport for FirestoreDb { Ok(obj) => Some(obj), Err(err) => { error!( - "Error occurred while consuming list document as a stream: {}", - err + %err, + "Error occurred while consuming list document as a stream.", ); None } @@ -243,7 +243,7 @@ impl FirestoreListingSupport for FirestoreDb { future::ready(match col_res { Ok(col) => Some(col), Err(err) => { - error!("Error occurred while consuming collection IDs: {}", err); + error!(%err, "Error occurred while consuming collection IDs."); None } }) @@ -275,7 +275,7 @@ impl FirestoreListingSupport for FirestoreDb { } } Err(err) => { - error!("Error occurred while consuming documents: {}", err); + error!(%err, "Error occurred while consuming documents."); Some((Err(err), None)) } } @@ -371,10 +371,10 @@ impl FirestoreDb { ); span.in_scope(|| { debug!( - "Listing documents in {:?} took {}ms. Returned: {}", - params.collection_id, - listing_duration.num_milliseconds(), - result.documents.len() + collection_id = params.collection_id, + duration_milliseconds = listing_duration.num_milliseconds(), + num_documents = result.documents.len(), + "Listed documents.", ); }); @@ -385,11 +385,12 @@ impl FirestoreDb { if db_err.retry_possible && retries < self.inner.options.max_retries => { warn!( - "Listing failed with {}. Retrying: {}/{}", - db_err, - retries + 1, - self.inner.options.max_retries + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + "Failed to list documents. Retrying up to the specified number of times.", ); + self.list_doc_with_retries(params, retries + 1, span).await } _ => Err(err), @@ -454,8 +455,8 @@ impl FirestoreDb { ); span.in_scope(|| { debug!( - "Listing collections took {}ms", - listing_duration.num_milliseconds() + duration_milliseconds = listing_duration.num_milliseconds(), + "Listed collections.", ); }); @@ -466,11 +467,12 @@ impl FirestoreDb { if db_err.retry_possible && retries < self.inner.options.max_retries => { warn!( - "Listing failed with {}. Retrying: {}/{}", - db_err, - retries + 1, - self.inner.options.max_retries + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + "Failed to list collection IDs. Retrying up to the specified number of times.", ); + self.list_collection_ids_with_retries(params, retries + 1, span) .await } @@ -525,7 +527,10 @@ impl FirestoreDb { FirestoreCachedValue::UseCached(stream) => { span.record("/firestore/cache_result", "hit"); span.in_scope(|| { - debug!("Reading all {} documents from cache", params.collection_id); + debug!( + collection_id = params.collection_id, + "Reading all documents from cache." + ); }); Ok(FirestoreCachedValue::UseCached(stream)) @@ -538,8 +543,8 @@ impl FirestoreDb { ) { span.in_scope(|| { debug!( - "Cache doesn't have suitable documents for {}, but cache mode is ReadCachedOnly so returning empty stream", - params.collection_id + collection_id = params.collection_id, + "Cache doesn't have suitable documents for specified collection, but cache mode is ReadCachedOnly so returning empty stream.", ); }); Ok(FirestoreCachedValue::UseCached(Box::pin( @@ -548,8 +553,8 @@ impl FirestoreDb { } else { span.in_scope(|| { debug!( - "Cache doesn't have suitable documents for {} skipping cache and reading from Firestore", - params.collection_id + collection_id = params.collection_id, + "Cache doesn't have suitable documents for specified collection, so skipping cache and reading from Firestore.", ); }); Ok(FirestoreCachedValue::SkipCache) diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index 1af517c..64ac761 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -287,8 +287,8 @@ where F: Future> + Send + 'static, { info!( - "Starting a Firestore listener for targets: {:?}...", - &self.targets.len() + num_targets = self.targets.len(), + "Starting a Firestore listener for targets...", ); let mut initial_states: HashMap = @@ -349,7 +349,7 @@ where } if let Some(signaller) = self.shutdown_handle.take() { if let Err(err) = signaller.await { - warn!("Firestore listener exit error: {}...", err); + warn!(%err, "Firestore listener exit error!"); }; } debug!("Shutting down Firestore listener has been finished..."); @@ -374,7 +374,10 @@ where .unwrap_or_else(|| std::time::Duration::from_secs(5)); while !shutdown_flag.load(Ordering::Relaxed) { - debug!("Start listening on {} targets ... ", targets_state.len()); + debug!( + num_targets = targets_state.len(), + "Start listening on targets..." + ); match db .listen_doc_changes(targets_state.values().cloned().collect()) @@ -388,7 +391,7 @@ where Ok(mut listen_stream) => loop { tokio::select! { _ = shutdown_receiver.recv() => { - debug!("Exiting from listener on {} targets...", targets_state.len()); + debug!(num_targets = targets_state.len(), "Exiting from listener on targets..."); shutdown_receiver.close(); break; } @@ -399,7 +402,8 @@ where else { match tried { Ok(Some(event)) => { - trace!("Received a listen response event to handle: {:?}", event); + trace!(?event, "Received a listen response event to handle."); + match event.response_type { Some(listen_response::ResponseType::TargetChange(ref target_change)) if !target_change.resume_token.is_empty() => @@ -411,7 +415,7 @@ where let new_token: FirestoreListenerToken = target_change.resume_token.clone().into(); if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await { - error!("Listener token storage error occurred {:?}.", err); + error!(%err, "Listener token storage error occurred."); break; } else { @@ -420,7 +424,7 @@ where } }, Err(err) => { - error!("Listener system error - unexpected target ID: {} {:?}.", target_id_num, err); + error!(%err, target_id_num, "Listener system error - unexpected target ID."); break; } } @@ -429,7 +433,7 @@ where } Some(response_type) => { if let Err(err) = cb(response_type).await { - error!("Listener callback function error occurred {:?}.", err); + error!(%err, "Listener callback function error occurred."); break; } } @@ -461,22 +465,22 @@ where if db_err.details.contains("unexpected end of file") || db_err.details.contains("stream error received") => { - debug!("Listen EOF ({:?}). Restarting in {:?}...", err, delay); + debug!(%err, ?delay, "Listen EOF.. Restarting after the specified delay..."); tokio::time::sleep(delay).await; false } FirestoreError::DatabaseError(ref db_err) if db_err.public.code.contains("InvalidArgument") => { - error!("Listen error {:?}. Exiting...", err); + error!(%err, "Listen error. Exiting..."); true } FirestoreError::InvalidParametersError(_) => { - error!("Listen error {:?}. Exiting...", err); + error!(%err, "Listen error. Exiting..."); true } _ => { - error!("Listen error {:?}. Restarting in {:?}...", err, delay); + error!(%err, ?delay, "Listen error. Restarting after the specified delay..."); tokio::time::sleep(delay).await; false } diff --git a/src/db/listen_changes_state_storage.rs b/src/db/listen_changes_state_storage.rs index abef153..f6c843f 100644 --- a/src/db/listen_changes_state_storage.rs +++ b/src/db/listen_changes_state_storage.rs @@ -33,9 +33,10 @@ impl FirestoreTempFilesListenStateStorage { pub fn with_temp_dir>(temp_dir: P) -> Self { debug!( - "Using temp dir for listen state storage: {:?}", - temp_dir.as_ref() + directory = ?temp_dir.as_ref(), + "Using temp dir for listen state storage.", ); + Self { temp_dir: Some(temp_dir.as_ref().to_path_buf()), } diff --git a/src/db/mod.rs b/src/db/mod.rs index 651b704..9c47ac8 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -148,10 +148,10 @@ impl FirestoreDb { .unwrap_or_else(|| GOOGLE_FIREBASE_API_URL.to_string()); info!( - "Creating a new DB client: {}. API: {} Token scopes: {}", - firestore_database_path, - effective_firebase_api_url, - token_scopes.join(", ") + database_path = firestore_database_path, + api_url = effective_firebase_api_url, + token_scopes = token_scopes.join(", "), + "Creating a new database client.", ); let client = GoogleApiClient::from_function_with_token_source( diff --git a/src/db/query.rs b/src/db/query.rs index f1a04cf..5f93bd5 100644 --- a/src/db/query.rs +++ b/src/db/query.rs @@ -121,9 +121,9 @@ impl FirestoreDb { ); span.in_scope(|| { debug!( - "Querying stream of documents in {:?} took {}ms", - params.collection_id, - query_duration.num_milliseconds() + collection_id = ?params.collection_id, + duration_milliseconds = query_duration.num_milliseconds(), + "Queried stream of documents.", ); }); @@ -134,10 +134,10 @@ impl FirestoreDb { if db_err.retry_possible && retries < self.inner.options.max_retries => { warn!( - "Failed with {}. Retrying: {}/{}", - db_err, - retries + 1, - self.inner.options.max_retries + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + "Failed to stream query. Retrying up to the specified number of times.", ); self.stream_query_doc_with_retries(params, retries + 1, span) @@ -193,7 +193,7 @@ impl FirestoreDb { FirestoreCachedValue::UseCached(stream) => { span.record("/firestore/cache_result", "hit"); span.in_scope(|| { - debug!("Querying {} documents from cache", collection_id); + debug!(collection_id, "Querying documents from cache."); }); Ok(FirestoreCachedValue::UseCached(stream)) } @@ -204,9 +204,8 @@ impl FirestoreDb { FirestoreDbSessionCacheMode::ReadCachedOnly(_) ) { span.in_scope(|| { - debug!( - "Cache doesn't have suitable documents for {}, but cache mode is ReadCachedOnly so returning empty stream", - collection_id.as_str() + debug!(collection_id, + "Cache doesn't have suitable documents, but cache mode is ReadCachedOnly so returning empty stream.", ); }); Ok(FirestoreCachedValue::UseCached(Box::pin( @@ -215,8 +214,8 @@ impl FirestoreDb { } else { span.in_scope(|| { debug!( - "Querying {} documents from cache skipped", - collection_id + collection_id, + "Querying documents from cache skipped.", ); }); Ok(FirestoreCachedValue::SkipCache) @@ -248,7 +247,7 @@ impl FirestoreQuerySupport for FirestoreDb { future::ready(match doc_res { Ok(doc) => Some(doc), Err(err) => { - error!("Error occurred while consuming query: {}", err); + error!(%err, "Error occurred while consuming query."); None } }) @@ -284,7 +283,7 @@ impl FirestoreQuerySupport for FirestoreDb { Ok(Some(doc)) => Some(Ok(doc)), Ok(None) => None, Err(err) => { - error!("Error occurred while consuming query: {}", err); + error!(%err, "Error occurred while consuming query."); Some(Err(err)) } }) @@ -315,8 +314,8 @@ impl FirestoreQuerySupport for FirestoreDb { Ok(obj) => Some(obj), Err(err) => { error!( - "Error occurred while converting query document in a stream: {}", - err + %err, + "Error occurred while converting query document in a stream.", ); None } @@ -437,8 +436,8 @@ impl FirestoreQuerySupport for FirestoreDb { span.in_scope(|| { debug!( - "Running query on partitions with max parallelism: {}", - parallelism + parallelism, + "Running query on partitions with specified max parallelism.", ) }); @@ -451,7 +450,7 @@ impl FirestoreQuerySupport for FirestoreDb { if cursors.is_empty() { span.in_scope(|| { debug!( - "The server detected the query has too few results to be partitioned. Falling back to normal query" + "The server detected the query has too few results to be partitioned. Falling back to normal query." ) }); let doc_stream = self @@ -472,13 +471,18 @@ impl FirestoreQuerySupport for FirestoreDb { mpsc::unbounded_channel::>(); futures::stream::iter(cursors_pairs.windows(2)) - .map(|cursor_pair| (cursor_pair, tx.clone(), partition_params.clone(), span.clone())) + .map(|cursor_pair| { + ( + cursor_pair, + tx.clone(), + partition_params.clone(), + span.clone(), + ) + }) .for_each_concurrent( Some(parallelism), |(cursor_pair, tx, partition_params, span)| async move { - span.in_scope(|| { - debug!("Streaming partition cursor {:?}",cursor_pair) - }); + span.in_scope(|| debug!(?cursor_pair, "Streaming partition cursor.")); let mut params_with_cursors = partition_params.query_params; if let Some(first_cursor) = cursor_pair.first() { @@ -488,40 +492,45 @@ impl FirestoreQuerySupport for FirestoreDb { params_with_cursors.mopt_end_at(last_cursor.clone()); } - let partition = FirestorePartition::new().opt_start_at(params_with_cursors.start_at.clone()).opt_end_at(params_with_cursors.end_at.clone()); + let partition = FirestorePartition::new() + .opt_start_at(params_with_cursors.start_at.clone()) + .opt_end_at(params_with_cursors.end_at.clone()); match self.stream_query_doc_with_errors(params_with_cursors).await { Ok(result_stream) => { result_stream - .map(|doc_res| (doc_res, tx.clone(), span.clone(), partition.clone())) + .map(|doc_res| { + (doc_res, tx.clone(), span.clone(), partition.clone()) + }) .for_each(|(doc_res, tx, span, partition)| async move { - let message = doc_res.map(|doc| (partition.clone(), doc)); if let Err(err) = tx.send(message) { span.in_scope(|| { warn!( - "Unable to send result for partition {:?}:{:?}", - partition, - err + %err, + ?partition, + "Unable to send result for partition.", ) }) }; - }).await; - }, + }) + .await; + } Err(err) => { if let Err(err) = tx.send(Err(err)) { span.in_scope(|| { warn!( - "Unable to send result for partition cursor {:?} error {:?}", - cursor_pair, - err - ) + ?err, + ?cursor_pair, + "Unable to send result for partition cursor.", + ) }) }; } } }, - ).await; + ) + .await; Ok(Box::pin( tokio_stream::wrappers::UnboundedReceiverStream::new(rx), diff --git a/src/db/transaction.rs b/src/db/transaction.rs index f60476e..d1234ef 100644 --- a/src/db/transaction.rs +++ b/src/db/transaction.rs @@ -50,7 +50,7 @@ impl<'a> FirestoreTransaction<'a> { transaction_span.record("/firestore/transaction_id", hex_trans_id); transaction_span.in_scope(|| { - debug!("Created a new transaction. Mode: {:?}", options.mode); + debug!(mode = ?options.mode, "Created a new transaction."); }); Ok(Self { @@ -81,7 +81,7 @@ impl<'a> FirestoreTransaction<'a> { if self.writes.is_empty() { self.transaction_span.in_scope(|| { - debug!("Transaction has been committed without any writes"); + debug!("Transaction has been committed without any writes."); }); return Ok(FirestoreTransactionResponse::new(Vec::new())); @@ -110,7 +110,7 @@ impl<'a> FirestoreTransaction<'a> { } self.transaction_span.in_scope(|| { - debug!("Transaction has been committed"); + debug!("Transaction has been committed."); }); Ok(result) @@ -126,7 +126,7 @@ impl<'a> FirestoreTransaction<'a> { self.db.client().get().rollback(request).await?; self.transaction_span.in_scope(|| { - debug!("Transaction has been rollback"); + debug!("Transaction has been rolled back."); }); Ok(()) @@ -141,7 +141,7 @@ impl<'a> Drop for FirestoreTransaction<'a> { fn drop(&mut self) { if !self.finished { self.transaction_span - .in_scope(|| warn!("Transaction was neither committed nor rollback")); + .in_scope(|| warn!("Transaction was neither committed nor rolled back.")); } } } @@ -202,8 +202,8 @@ impl FirestoreDb { FirestoreError::DatabaseError(ref db_err) if db_err.retry_possible => { transaction_span.in_scope(|| { warn!( - "Transient error occurred in committing transaction: {}", - &err + %err, + "Transient error occurred while committing transaction.", ) }); // Ignore; we'll try again below @@ -215,7 +215,7 @@ impl FirestoreDb { Err(err) => match err { BackoffError::Transient { err, retry_after } => { transaction_span.in_scope(|| { - warn!("Transient error occurred in transaction function: {}. Retrying after: {:?}", &err, retry_after) + warn!(%err, delay = ?retry_after, "Transient error occurred in transaction function. Retrying after the specified delay."); }); initial_backoff_duration = retry_after; } @@ -263,7 +263,7 @@ impl FirestoreDb { match backoff_err { BackoffError::Transient { err, retry_after } => { transaction_span.in_scope(|| { - warn!("Transient error occurred in transaction function: {}. Retrying after: {:?}", &err, &retry_after) + warn!(%err, delay = ?retry_after, "Transient error occurred in transaction function. Retrying after the specified delay."); }); let firestore_err = FirestoreError::ErrorInTransaction( @@ -307,8 +307,8 @@ impl FirestoreDb { if let Err(ref err) = retry_result { transaction_span.in_scope(|| { error!( - "Unable to commit transaction: {}. Trying to roll it back", - &err + %err, + "Unable to commit transaction. Trying to roll it back.", ) }); diff --git a/src/db/update.rs b/src/db/update.rs index 06859c2..37abfa2 100644 --- a/src/db/update.rs +++ b/src/db/update.rs @@ -154,7 +154,7 @@ impl FirestoreUpdateSupport for FirestoreDb { ); span.in_scope(|| { - debug!("Updated the document: {}/{}", collection_id, document_id); + debug!(collection_id, document_id, "Updated the document."); }); Ok(update_response.into_inner()) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 50a33e3..20a9f41 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -39,7 +39,7 @@ where for<'de> T: Deserialize<'de>, DF: Fn(&T) -> String, { - info!("Populating {} collection", collection_name); + info!(collection_name, "Populating collection."); let batch_writer = db.create_simple_batch_writer().await?; let mut current_batch = batch_writer.new_batch();