Skip to content

Commit

Permalink
Add randomised delay for retries on transient errors
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Jun 13, 2024
1 parent 3e44659 commit db3109d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ hex = "0.4"
backoff = { version = "0.4", features = ["tokio"] }
redb = { version = "2.1", optional = true }
moka = { version = "0.12", features = ["future"], optional = true } # Caching library
rand = "0.8"

[dev-dependencies]
cargo-husky = { version = "1.5", default-features = false, features = ["run-for-all", "prepush-hook", "run-cargo-fmt"] }
Expand Down
33 changes: 23 additions & 10 deletions src/db/aggregated_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::TryFutureExt;
use futures::TryStreamExt;
use futures::{future, StreamExt};
use gcloud_sdk::google::firestore::v1::*;
use rand::Rng;
use rsb_derive::*;
use serde::Deserialize;
use tracing::*;
Expand Down Expand Up @@ -341,12 +342,18 @@ impl FirestoreDb {
FirestoreError::DatabaseError(ref db_err)
if db_err.retry_possible && retries < self.inner.options.max_retries =>
{
let sleep_duration = tokio::time::Duration::from_millis(
rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1),
);
warn!(
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
"Failed to run aggregation query. Retrying up to the specified number of times.",
);
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
delay = sleep_duration.as_millis(),
"Failed to run aggregation query. Retrying up to the specified number of times.",
);

tokio::time::sleep(sleep_duration).await;

self.stream_aggregated_query_doc_with_retries(params, retries + 1, span)
.await
Expand Down Expand Up @@ -405,12 +412,18 @@ impl FirestoreDb {
FirestoreError::DatabaseError(ref db_err)
if db_err.retry_possible && retries < self.inner.options.max_retries =>
{
let sleep_duration = tokio::time::Duration::from_millis(
rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1),
);
warn!(
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
"Failed to run aggregation query. Retrying up to the specified number of times.",
);
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
delay = sleep_duration.as_millis(),
"Failed to run aggregation query. Retrying up to the specified number of times.",
);

tokio::time::sleep(sleep_duration).await;

self.aggregated_query_doc_with_retries(params, retries + 1, span)
.await
Expand Down
18 changes: 13 additions & 5 deletions src/db/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::TryFutureExt;
use futures::TryStreamExt;
use futures::{future, StreamExt};
use gcloud_sdk::google::firestore::v1::*;
use rand::Rng;
use serde::Deserialize;
use tracing::*;

Expand Down Expand Up @@ -630,14 +631,21 @@ impl FirestoreDb {
FirestoreError::DatabaseError(ref db_err)
if db_err.retry_possible && retries < self.get_options().max_retries =>
{
let sleep_duration = tokio::time::Duration::from_millis(
rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1),
);
span.in_scope(|| {
warn!(
err = %db_err,
current_retry = retries + 1,
max_retries = self.get_options().max_retries,
"Failed to get document. Retrying up to the specified number of times.",
);
err = %db_err,
current_retry = retries + 1,
max_retries = self.get_options().max_retries,
delay = sleep_duration.as_millis(),
"Failed to get document. Retrying up to the specified number of times.",
);
});

tokio::time::sleep(sleep_duration).await;

self.get_doc_by_path(collection_id, document_path, None, retries + 1)
.await
}
Expand Down
34 changes: 24 additions & 10 deletions src/db/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::StreamExt;
use futures::TryFutureExt;
use futures::TryStreamExt;
use gcloud_sdk::google::firestore::v1::*;
use rand::Rng;
use rsb_derive::*;
use serde::Deserialize;
use std::future;
Expand Down Expand Up @@ -352,12 +353,19 @@ impl FirestoreDb {
FirestoreError::DatabaseError(ref db_err)
if db_err.retry_possible && retries < db_inner.options.max_retries =>
{
let sleep_duration = tokio::time::Duration::from_millis(
rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1),
);

warn!(
err = %db_err,
current_retry = retries + 1,
max_retries = db_inner.options.max_retries,
"Failed to list documents. Retrying up to the specified number of times.",
);
err = %db_err,
current_retry = retries + 1,
max_retries = db_inner.options.max_retries,
delay = sleep_duration.as_millis(),
"Failed to list documents. Retrying up to the specified number of times.",
);

tokio::time::sleep(sleep_duration).await;

Self::list_doc_with_retries_inner(db_inner, list_request, retries + 1, span).await
}
Expand Down Expand Up @@ -506,12 +514,18 @@ impl FirestoreDb {
FirestoreError::DatabaseError(ref db_err)
if db_err.retry_possible && retries < self.inner.options.max_retries =>
{
let sleep_duration = tokio::time::Duration::from_millis(
rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1),
);
warn!(
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
"Failed to list collection IDs. Retrying up to the specified number of times.",
);
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
delay = sleep_duration.as_millis(),
"Failed to list collection IDs. Retrying up to the specified number of times.",
);

tokio::time::sleep(sleep_duration).await;

self.list_collection_ids_with_retries(params, retries + 1, span)
.await
Expand Down
9 changes: 8 additions & 1 deletion src/db/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::TryFutureExt;
use futures::TryStreamExt;
use futures::{future, StreamExt};
use gcloud_sdk::google::firestore::v1::*;
use rand::Rng;
use serde::Deserialize;
use tokio::sync::mpsc;
use tracing::*;
Expand Down Expand Up @@ -155,13 +156,19 @@ impl FirestoreDb {
FirestoreError::DatabaseError(ref db_err)
if db_err.retry_possible && retries < self.inner.options.max_retries =>
{
let sleep_duration = tokio::time::Duration::from_millis(
rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1),
);
warn!(
err = %db_err,
current_retry = retries + 1,
max_retries = self.inner.options.max_retries,
"Failed to stream query. Retrying up to the specified number of times.",
delay = sleep_duration.as_millis(),
"Failed to stream query. Retrying up to the specified number of times."
);

tokio::time::sleep(sleep_duration).await;

self.stream_query_doc_with_retries(params, retries + 1, span)
.await
}
Expand Down

0 comments on commit db3109d

Please sign in to comment.