Skip to content

Commit

Permalink
Use tracing fields to store variable log data (#134)
Browse files Browse the repository at this point in the history
* Use tracing fields to store variable log data

This greatly assists when analyzing errors using a tool like
Google's Log Analytics.

See the root-level documentation of tracing for more explanation
 on the merits of doing this:
https://docs.rs/tracing/latest/tracing/#recording-fields

* Fix build error

* Fix build errors in tests
  • Loading branch information
NickCaplinger authored Oct 15, 2023
1 parent e5565ed commit 50ae7ec
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 189 deletions.
24 changes: 12 additions & 12 deletions src/cache/backends/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl FirestoreMemoryCacheBackend {
FirestoreCacheCollectionLoadMode::PreloadAllDocs
| FirestoreCacheCollectionLoadMode::PreloadAllIfEmpty => {
if let Some(mem_cache) = self.collection_caches.get(collection_path.as_str()) {
debug!("Preloading {}", collection_path.as_str());
debug!(collection_path, "Preloading collection.");

let params = if let Some(parent) = &config.parent {
db.fluent()
Expand All @@ -83,9 +83,9 @@ impl FirestoreMemoryCacheBackend {
.map(|(index, docs)| {
if index > 0 && index % 5000 == 0 {
debug!(
"Preloading collection `{}`: {} entries loaded",
collection_path.as_str(),
index
collection_path = collection_path.as_str(),
entries_loaded = index,
"Collection preload in progress...",
);
}
docs
Expand All @@ -99,9 +99,9 @@ impl FirestoreMemoryCacheBackend {
mem_cache.run_pending_tasks().await;

info!(
"Preloading collection `{}` has been finished. Loaded: {} entries",
collection_path.as_str(),
mem_cache.entry_count()
collection_path = collection_path.as_str(),
entry_count = mem_cache.entry_count(),
"Preloading collection has been finished.",
);
}
}
Expand Down Expand Up @@ -185,7 +185,7 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {

async fn invalidate_all(&self) -> FirestoreResult<()> {
for (collection_path, mem_cache) in &self.collection_caches {
debug!("Invalidating cache for {}", collection_path);
debug!(collection_path, "Invalidating cache for collection.");
mem_cache.invalidate_all();
mem_cache.run_pending_tasks().await;
}
Expand All @@ -203,8 +203,8 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {
let (collection_path, document_id) = split_document_path(&doc.name);
if let Some(mem_cache) = self.collection_caches.get(collection_path) {
trace!(
"Writing document to cache due to listener event: {:?}",
doc.name
doc_name = ?doc.name,
"Writing document to cache due to listener event.",
);
mem_cache.insert(document_id.to_string(), doc).await;
}
Expand All @@ -215,8 +215,8 @@ impl FirestoreCacheBackend for FirestoreMemoryCacheBackend {
let (collection_path, document_id) = split_document_path(&doc_deleted.document);
if let Some(mem_cache) = self.collection_caches.get(collection_path) {
trace!(
"Removing document from cache due to listener event: {:?}",
doc_deleted.document.as_str()
deleted_doc = ?doc_deleted.document.as_str(),
"Removing document from cache due to listener event.",
);
mem_cache.remove(document_id).await;
}
Expand Down
67 changes: 32 additions & 35 deletions src/cache/backends/persistent_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ impl FirestorePersistentCacheBackend {

if !db_dir.exists() {
debug!(
"Creating a temp directory to store persistent cache: {}",
db_dir.display()
directory = %db_dir.display(),
"Creating a temp directory to store persistent cache.",
);
std::fs::create_dir_all(&db_dir)?;
} else {
debug!(
"Using a temp directory to store persistent cache: {}",
db_dir.display()
directory = %db_dir.display(),
"Using a temp directory to store persistent cache.",
);
}
Self::with_options(config, db_dir.join("redb"))
Expand All @@ -44,21 +44,15 @@ impl FirestorePersistentCacheBackend {
data_file_path: PathBuf,
) -> FirestoreResult<Self> {
if data_file_path.exists() {
debug!(
"Opening database for persistent cache {:?}...",
data_file_path
);
debug!(?data_file_path, "Opening database for persistent cache...",);
} else {
debug!(
"Creating database for persistent cache {:?}...",
data_file_path
);
debug!(?data_file_path, "Creating database for persistent cache...",);
}

let mut db = Database::create(data_file_path)?;

db.compact()?;
info!("Successfully opened database for persistent cache");
info!("Successfully opened database for persistent cache.");

Ok(Self { config, redb: db })
}
Expand Down Expand Up @@ -88,14 +82,18 @@ impl FirestorePersistentCacheBackend {
) && existing_records > 0
{
info!(
"Preloading collection `{}` has been skipped. Already loaded: {} entries",
collection_path.as_str(),
existing_records
);
collection_path = collection_path.as_str(),
entries_loaded = existing_records,
"Preloading collection has been skipped.",
);
continue;
}

debug!("Preloading {}", collection_path.as_str());
debug!(
collection_path = collection_path.as_str(),
"Preloading collection."
);

let params = if let Some(parent) = &config.parent {
db.fluent()
.select()
Expand All @@ -112,17 +110,17 @@ impl FirestorePersistentCacheBackend {
.map(|(index, docs)| {
if index > 0 && index % 5000 == 0 {
debug!(
"Preloading collection `{}`: {} entries loaded",
collection_path.as_str(),
index
collection_path = collection_path.as_str(),
entries_loaded = index,
"Collection preload in progress...",
);
}
docs
})
.ready_chunks(100)
.for_each(|docs| async move {
if let Err(err) = self.write_batch_docs(collection_path, docs) {
error!("Error while preloading collection: {}", err);
error!(?err, "Error while preloading collection.");
}
})
.await;
Expand All @@ -140,17 +138,13 @@ impl FirestorePersistentCacheBackend {
};

info!(
"Preloading collection `{}` has been finished. Loaded: {} entries",
collection_path.as_str(),
updated_records
collection_path = collection_path.as_str(),
updated_records, "Preloading collection has been finished.",
);
}
FirestoreCacheCollectionLoadMode::PreloadNone => {
let tx = self.redb.begin_write()?;
debug!(
"Creating corresponding collection table `{}`",
collection_path
);
debug!(collection_path, "Creating corresponding collection table.",);
tx.open_table(td)?;
tx.commit()?;
}
Expand Down Expand Up @@ -289,8 +283,8 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend {
let write_txn = self.redb.begin_write()?;
{
debug!(
"Invalidating {} and draining the corresponding table",
collection_path
collection_path,
"Invalidating collection and draining the corresponding table.",
);
let mut table = write_txn.open_table(td)?;
table.drain::<&str>(..)?;
Expand All @@ -310,9 +304,10 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend {
FirestoreListenEvent::DocumentChange(doc_change) => {
if let Some(doc) = doc_change.document {
trace!(
"Writing document to cache due to listener event: {:?}",
doc.name
doc_name = ?doc.name,
"Writing document to cache due to listener event.",
);

self.write_document(&doc)?;
}
Ok(())
Expand All @@ -322,10 +317,12 @@ impl FirestoreCacheBackend for FirestorePersistentCacheBackend {
let write_txn = self.redb.begin_write()?;
let td: TableDefinition<&str, &[u8]> = TableDefinition::new(collection_path);
let mut table = write_txn.open_table(td)?;

trace!(
"Removing document from cache due to listener event: {:?}",
doc_deleted.document.as_str()
deleted_doc = ?doc_deleted.document.as_str(),
"Removing document from cache due to listener event.",
);

table.remove(document_id)?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ where
let backend = backend.clone();
async move {
if let Err(err) = backend.on_listen_event(event).await {
error!("Error occurred while updating cache: {}", err);
error!(?err, "Error occurred while updating cache.");
};
Ok(())
}
Expand Down Expand Up @@ -175,7 +175,7 @@ pub trait FirestoreCacheDocsByPathSupport {
Ok((doc_id, Some(document)))
}),
Err(err) => {
error!("Error occurred while reading from cache: {}", err);
error!(%err, "Error occurred while reading from cache.");
None
}
}
Expand Down
37 changes: 19 additions & 18 deletions src/db/aggregated_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl FirestoreAggregatedQuerySupport for FirestoreDb {
Ok(Some(doc)) => Some(doc),
Ok(None) => None,
Err(err) => {
error!("Error occurred while consuming query: {}", err);
error!(%err, "Error occurred while consuming query.");
None
}
})
Expand Down Expand Up @@ -210,7 +210,7 @@ impl FirestoreAggregatedQuerySupport for FirestoreDb {
Ok(Some(doc)) => Some(Ok(doc)),
Ok(None) => None,
Err(err) => {
error!("Error occurred while consuming query: {}", err);
error!(%err, "Error occurred while consuming query.");
Some(Err(err))
}
})
Expand Down Expand Up @@ -244,8 +244,8 @@ impl FirestoreAggregatedQuerySupport for FirestoreDb {
Ok(obj) => Some(obj),
Err(err) => {
error!(
"Error occurred while consuming query document as a stream: {}",
err
%err,
"Error occurred while consuming query document as a stream.",
);
None
}
Expand Down Expand Up @@ -328,9 +328,9 @@ impl FirestoreDb {
);
span.in_scope(|| {
debug!(
"Querying stream of documents in {:?} took {}ms",
params.query_params.collection_id,
query_duration.num_milliseconds()
collection_id = ?params.query_params.collection_id,
duration_milliseconds = query_duration.num_milliseconds(),
"Querying stream of documents in specified collection.",
);
});

Expand All @@ -341,10 +341,10 @@ impl FirestoreDb {
if db_err.retry_possible && retries < self.inner.options.max_retries =>
{
warn!(
"Failed with {}. Retrying: {}/{}",
db_err,
retries + 1,
self.inner.options.max_retries
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
"Failed to run aggregation query. Retrying up to the specified number of times.",
);

self.stream_aggregated_query_doc_with_retries(params, retries + 1, span)
Expand Down Expand Up @@ -392,9 +392,9 @@ impl FirestoreDb {
);
span.in_scope(|| {
debug!(
"Querying documents in {:?} took {}ms",
params.query_params.collection_id,
query_duration.num_milliseconds()
collection_id = ?params.query_params.collection_id,
duration_milliseconds = query_duration.num_milliseconds(),
"Querying documents in specified collection.",
);
});

Expand All @@ -405,11 +405,12 @@ impl FirestoreDb {
if db_err.retry_possible && retries < self.inner.options.max_retries =>
{
warn!(
"Failed with {}. Retrying: {}/{}",
db_err,
retries + 1,
self.inner.options.max_retries
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
"Failed to run aggregation query. Retrying up to the specified number of times.",
);

self.aggregated_query_doc_with_retries(params, retries + 1, span)
.await
}
Expand Down
22 changes: 14 additions & 8 deletions src/db/batch_streaming_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub struct FirestoreStreamingBatchWriter {
impl Drop for FirestoreStreamingBatchWriter {
fn drop(&mut self) {
if !self.finished.load(Ordering::Relaxed) {
self.batch_span.in_scope(|| warn!("Batch was not finished"));
self.batch_span
.in_scope(|| warn!("Batch was not finished."));
}
}
}
Expand Down Expand Up @@ -122,8 +123,9 @@ impl FirestoreStreamingBatchWriter {
}
Err(err) => {
error!(
"Batch write operation {} failed: {}",
received_counter, err
%err,
received_counter,
"Batch write operation failed.",
);
responses_writer.send(Err(err)).ok();
break;
Expand All @@ -142,7 +144,7 @@ impl FirestoreStreamingBatchWriter {
break;
}
Err(err) if err.code() == gcloud_sdk::tonic::Code::Cancelled => {
debug!("Batch write operation finished on: {}", received_counter);
debug!(received_counter, "Batch write operation finished.");
responses_writer
.send(Ok(FirestoreBatchWriteResponse::new(
received_counter - 1,
Expand All @@ -154,8 +156,9 @@ impl FirestoreStreamingBatchWriter {
}
Err(err) => {
error!(
"Batch write operation {} failed: {}",
received_counter, err
%err,
received_counter,
"Batch write operation failed.",
);
responses_writer.send(Err(err.into())).ok();
break;
Expand All @@ -182,7 +185,10 @@ impl FirestoreStreamingBatchWriter {
}
}
Err(err) => {
error!("Batch write operation failed: {}", err);
error!(
%err,
"Batch write operation failed.",
);
responses_writer.send(Err(err.into())).ok();
}
}
Expand Down Expand Up @@ -228,7 +234,7 @@ impl FirestoreStreamingBatchWriter {
> self.received_counter.load(Ordering::Relaxed) - 1
{
drop(locked);
debug!("Still waiting receiving responses for batch writes");
debug!("Still waiting to receive responses for batch writes.");
self.init_wait_reader.recv().await;
} else {
drop(locked);
Expand Down
4 changes: 2 additions & 2 deletions src/db/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ impl FirestoreCreateSupport for FirestoreDb {

span.in_scope(|| {
debug!(
"Created a new document: {}/{:?}",
collection_id,
document_id.as_ref().map(|id| id.as_ref())
document_id = document_id.as_ref().map(|id| id.as_ref()),
"Created a new document.",
);
});

Expand Down
Loading

0 comments on commit 50ae7ec

Please sign in to comment.