From acf6f726a2a01134d5fbb14a5b1a13f48a1044bf Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Thu, 19 Dec 2024 10:53:46 +0700 Subject: [PATCH] add retry mechanism on data loading when rebuild the data. We only retry on temporary errors like timeout. retry also only enabled on data rebuild, not retrieve, because latency is not crucial during data rebuild. --- zstor/src/actors/zstor.rs | 9 +++++---- zstor/src/zdb.rs | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/zstor/src/actors/zstor.rs b/zstor/src/actors/zstor.rs index bb5da36..2f1a17e 100644 --- a/zstor/src/actors/zstor.rs +++ b/zstor/src/actors/zstor.rs @@ -258,7 +258,7 @@ impl Handler for ZstorActor { ) })?; - let shards = load_data(&metadata).await?; + let shards = load_data(&metadata, 1).await?; pipeline .send(RecoverFile { @@ -335,7 +335,7 @@ impl Handler for ZstorActor { }; // load the data from the storage backends - let input = load_data(&old_metadata).await?; + let input = load_data(&old_metadata, 3).await?; let existing_data = input.clone(); // rebuild the data (in memory only) @@ -454,7 +454,8 @@ impl Handler for ZstorActor { } } -async fn load_data(metadata: &MetaData) -> ZstorResult>>> { +/// load data from the storage backends +async fn load_data(metadata: &MetaData, max_attempts: u64) -> ZstorResult>>> { // attempt to retrieve all shards let mut shard_loads: Vec)>> = Vec::with_capacity(metadata.shards().len()); @@ -468,7 +469,7 @@ async fn load_data(metadata: &MetaData) -> ZstorResult>>> { Ok(ok) => ok, Err(e) => return (idx, Err(e.into())), }; - match db.get(&key).await { + match db.get_with_retry(&key, max_attempts).await { Ok(potential_shard) => match potential_shard { Some(shard) => (idx, Ok((shard, chksum))), None => ( diff --git a/zstor/src/zdb.rs b/zstor/src/zdb.rs index d180a8a..6b64135 100644 --- a/zstor/src/zdb.rs +++ b/zstor/src/zdb.rs @@ -759,6 +759,39 @@ impl SequentialZdb { Ok(Some(data)) } + /// get data from the zdb with a retry mechanism. + /// The retry will only happen at temporary errors, + /// currently only timeouts. + pub async fn get_with_retry( + &self, + keys: &[Key], + max_attempts: u64, + ) -> ZdbResult>> { + if max_attempts < 2 { + return self.get(keys).await; + } + + let mut last_error = None; + + for attempt in 0..max_attempts { + match self.get(keys).await { + Ok(result) => return Ok(result), + Err(e) => { + if e.internal == ErrorCause::Timeout { + last_error = Some(e); + if attempt < max_attempts - 1 { + debug!("timeout error on attempt {}, retrying", attempt + 1); + } + continue; + } + return Err(e); + } + } + } + + Err(last_error.unwrap()) + } + /// Returns the [`ZdbConnectionInfo`] object used to connect to this db. #[inline] pub fn connection_info(&self) -> &ZdbConnectionInfo { @@ -1037,7 +1070,7 @@ impl ZdbError { } /// The cause of a zero db error. -#[derive(Debug)] +#[derive(Debug, PartialEq)] enum ErrorCause { Redis(redis::RedisError), Other(String),