From db3109d827ac8101a828e77eec0ff8922031f235 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Thu, 13 Jun 2024 20:00:18 +0200 Subject: [PATCH] Add randomised delay for retries on transient errors --- Cargo.toml | 1 + src/db/aggregated_query.rs | 33 +++++++++++++++++++++++---------- src/db/get.rs | 18 +++++++++++++----- src/db/list.rs | 34 ++++++++++++++++++++++++---------- src/db/query.rs | 9 ++++++++- 5 files changed, 69 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 23fc68d..38012b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ hex = "0.4" backoff = { version = "0.4", features = ["tokio"] } redb = { version = "2.1", optional = true } moka = { version = "0.12", features = ["future"], optional = true } # Caching library +rand = "0.8" [dev-dependencies] cargo-husky = { version = "1.5", default-features = false, features = ["run-for-all", "prepush-hook", "run-cargo-fmt"] } diff --git a/src/db/aggregated_query.rs b/src/db/aggregated_query.rs index 93c3ae7..6ce17cc 100644 --- a/src/db/aggregated_query.rs +++ b/src/db/aggregated_query.rs @@ -10,6 +10,7 @@ use futures::TryFutureExt; use futures::TryStreamExt; use futures::{future, StreamExt}; use gcloud_sdk::google::firestore::v1::*; +use rand::Rng; use rsb_derive::*; use serde::Deserialize; use tracing::*; @@ -341,12 +342,18 @@ impl FirestoreDb { FirestoreError::DatabaseError(ref db_err) if db_err.retry_possible && retries < self.inner.options.max_retries => { + let sleep_duration = tokio::time::Duration::from_millis( + rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1), + ); warn!( - err = %db_err, - current_retry = retries + 1, - max_retries = self.inner.options.max_retries, - "Failed to run aggregation query. Retrying up to the specified number of times.", - ); + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + delay = sleep_duration.as_millis(), + "Failed to run aggregation query. Retrying up to the specified number of times.", + ); + + tokio::time::sleep(sleep_duration).await; self.stream_aggregated_query_doc_with_retries(params, retries + 1, span) .await @@ -405,12 +412,18 @@ impl FirestoreDb { FirestoreError::DatabaseError(ref db_err) if db_err.retry_possible && retries < self.inner.options.max_retries => { + let sleep_duration = tokio::time::Duration::from_millis( + rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1), + ); warn!( - err = %db_err, - current_retry = retries + 1, - max_retries = self.inner.options.max_retries, - "Failed to run aggregation query. Retrying up to the specified number of times.", - ); + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + delay = sleep_duration.as_millis(), + "Failed to run aggregation query. Retrying up to the specified number of times.", + ); + + tokio::time::sleep(sleep_duration).await; self.aggregated_query_doc_with_retries(params, retries + 1, span) .await diff --git a/src/db/get.rs b/src/db/get.rs index bc2761d..11b18c7 100644 --- a/src/db/get.rs +++ b/src/db/get.rs @@ -9,6 +9,7 @@ use futures::TryFutureExt; use futures::TryStreamExt; use futures::{future, StreamExt}; use gcloud_sdk::google::firestore::v1::*; +use rand::Rng; use serde::Deserialize; use tracing::*; @@ -630,14 +631,21 @@ impl FirestoreDb { FirestoreError::DatabaseError(ref db_err) if db_err.retry_possible && retries < self.get_options().max_retries => { + let sleep_duration = tokio::time::Duration::from_millis( + rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1), + ); span.in_scope(|| { warn!( - err = %db_err, - current_retry = retries + 1, - max_retries = self.get_options().max_retries, - "Failed to get document. Retrying up to the specified number of times.", - ); + err = %db_err, + current_retry = retries + 1, + max_retries = self.get_options().max_retries, + delay = sleep_duration.as_millis(), + "Failed to get document. Retrying up to the specified number of times.", + ); }); + + tokio::time::sleep(sleep_duration).await; + self.get_doc_by_path(collection_id, document_path, None, retries + 1) .await } diff --git a/src/db/list.rs b/src/db/list.rs index 2193138..d2521f0 100644 --- a/src/db/list.rs +++ b/src/db/list.rs @@ -9,6 +9,7 @@ use futures::StreamExt; use futures::TryFutureExt; use futures::TryStreamExt; use gcloud_sdk::google::firestore::v1::*; +use rand::Rng; use rsb_derive::*; use serde::Deserialize; use std::future; @@ -352,12 +353,19 @@ impl FirestoreDb { FirestoreError::DatabaseError(ref db_err) if db_err.retry_possible && retries < db_inner.options.max_retries => { + let sleep_duration = tokio::time::Duration::from_millis( + rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1), + ); + warn!( - err = %db_err, - current_retry = retries + 1, - max_retries = db_inner.options.max_retries, - "Failed to list documents. Retrying up to the specified number of times.", - ); + err = %db_err, + current_retry = retries + 1, + max_retries = db_inner.options.max_retries, + delay = sleep_duration.as_millis(), + "Failed to list documents. Retrying up to the specified number of times.", + ); + + tokio::time::sleep(sleep_duration).await; Self::list_doc_with_retries_inner(db_inner, list_request, retries + 1, span).await } @@ -506,12 +514,18 @@ impl FirestoreDb { FirestoreError::DatabaseError(ref db_err) if db_err.retry_possible && retries < self.inner.options.max_retries => { + let sleep_duration = tokio::time::Duration::from_millis( + rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1), + ); warn!( - err = %db_err, - current_retry = retries + 1, - max_retries = self.inner.options.max_retries, - "Failed to list collection IDs. Retrying up to the specified number of times.", - ); + err = %db_err, + current_retry = retries + 1, + max_retries = self.inner.options.max_retries, + delay = sleep_duration.as_millis(), + "Failed to list collection IDs. Retrying up to the specified number of times.", + ); + + tokio::time::sleep(sleep_duration).await; self.list_collection_ids_with_retries(params, retries + 1, span) .await diff --git a/src/db/query.rs b/src/db/query.rs index c9fd50b..7755dbe 100644 --- a/src/db/query.rs +++ b/src/db/query.rs @@ -8,6 +8,7 @@ use futures::TryFutureExt; use futures::TryStreamExt; use futures::{future, StreamExt}; use gcloud_sdk::google::firestore::v1::*; +use rand::Rng; use serde::Deserialize; use tokio::sync::mpsc; use tracing::*; @@ -155,13 +156,19 @@ impl FirestoreDb { FirestoreError::DatabaseError(ref db_err) if db_err.retry_possible && retries < self.inner.options.max_retries => { + let sleep_duration = tokio::time::Duration::from_millis( + rand::thread_rng().gen_range(0..2u64.pow(retries as u32) * 1000 + 1), + ); warn!( err = %db_err, current_retry = retries + 1, max_retries = self.inner.options.max_retries, - "Failed to stream query. Retrying up to the specified number of times.", + delay = sleep_duration.as_millis(), + "Failed to stream query. Retrying up to the specified number of times." ); + tokio::time::sleep(sleep_duration).await; + self.stream_query_doc_with_retries(params, retries + 1, span) .await }