diff --git a/zstor/src/actors/zstor.rs b/zstor/src/actors/zstor.rs index bb5da36..2f1a17e 100644 --- a/zstor/src/actors/zstor.rs +++ b/zstor/src/actors/zstor.rs @@ -258,7 +258,7 @@ impl Handler for ZstorActor { ) })?; - let shards = load_data(&metadata).await?; + let shards = load_data(&metadata, 1).await?; pipeline .send(RecoverFile { @@ -335,7 +335,7 @@ impl Handler for ZstorActor { }; // load the data from the storage backends - let input = load_data(&old_metadata).await?; + let input = load_data(&old_metadata, 3).await?; let existing_data = input.clone(); // rebuild the data (in memory only) @@ -454,7 +454,8 @@ impl Handler for ZstorActor { } } -async fn load_data(metadata: &MetaData) -> ZstorResult>>> { +/// load data from the storage backends +async fn load_data(metadata: &MetaData, max_attempts: u64) -> ZstorResult>>> { // attempt to retrieve all shards let mut shard_loads: Vec)>> = Vec::with_capacity(metadata.shards().len()); @@ -468,7 +469,7 @@ async fn load_data(metadata: &MetaData) -> ZstorResult>>> { Ok(ok) => ok, Err(e) => return (idx, Err(e.into())), }; - match db.get(&key).await { + match db.get_with_retry(&key, max_attempts).await { Ok(potential_shard) => match potential_shard { Some(shard) => (idx, Ok((shard, chksum))), None => ( diff --git a/zstor/src/zdb.rs b/zstor/src/zdb.rs index d180a8a..6b64135 100644 --- a/zstor/src/zdb.rs +++ b/zstor/src/zdb.rs @@ -759,6 +759,39 @@ impl SequentialZdb { Ok(Some(data)) } + /// get data from the zdb with a retry mechanism. + /// The retry will only happen at temporary errors, + /// currently only timeouts. + pub async fn get_with_retry( + &self, + keys: &[Key], + max_attempts: u64, + ) -> ZdbResult>> { + if max_attempts < 2 { + return self.get(keys).await; + } + + let mut last_error = None; + + for attempt in 0..max_attempts { + match self.get(keys).await { + Ok(result) => return Ok(result), + Err(e) => { + if e.internal == ErrorCause::Timeout { + last_error = Some(e); + if attempt < max_attempts - 1 { + debug!("timeout error on attempt {}, retrying", attempt + 1); + } + continue; + } + return Err(e); + } + } + } + + Err(last_error.unwrap()) + } + /// Returns the [`ZdbConnectionInfo`] object used to connect to this db. #[inline] pub fn connection_info(&self) -> &ZdbConnectionInfo { @@ -1037,7 +1070,7 @@ impl ZdbError { } /// The cause of a zero db error. -#[derive(Debug)] +#[derive(Debug, PartialEq)] enum ErrorCause { Redis(redis::RedisError), Other(String),