Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retry mechanism on data loading when rebuild the data. #151

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions zstor/src/actors/zstor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Handler<Retrieve> for ZstorActor {
)
})?;

let shards = load_data(&metadata).await?;
let shards = load_data(&metadata, 1).await?;

pipeline
.send(RecoverFile {
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Handler<Rebuild> for ZstorActor {
};

// load the data from the storage backends
let input = load_data(&old_metadata).await?;
let input = load_data(&old_metadata, 3).await?;
let existing_data = input.clone();

// rebuild the data (in memory only)
Expand Down Expand Up @@ -454,7 +454,8 @@ impl Handler<ReloadConfig> for ZstorActor {
}
}

async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
/// load data from the storage backends
async fn load_data(metadata: &MetaData, max_attempts: u64) -> ZstorResult<Vec<Option<Vec<u8>>>> {
// attempt to retrieve all shards
let mut shard_loads: Vec<JoinHandle<(usize, Result<(_, _), ZstorError>)>> =
Vec::with_capacity(metadata.shards().len());
Expand All @@ -468,7 +469,7 @@ async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
Ok(ok) => ok,
Err(e) => return (idx, Err(e.into())),
};
match db.get(&key).await {
match db.get_with_retry(&key, max_attempts).await {
Ok(potential_shard) => match potential_shard {
Some(shard) => (idx, Ok((shard, chksum))),
None => (
Expand Down
35 changes: 34 additions & 1 deletion zstor/src/zdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,39 @@ impl SequentialZdb {
Ok(Some(data))
}

/// get data from the zdb with a retry mechanism.
/// The retry will only happen at temporary errors,
/// currently only timeouts.
pub async fn get_with_retry(
&self,
keys: &[Key],
max_attempts: u64,
) -> ZdbResult<Option<Vec<u8>>> {
if max_attempts < 2 {
return self.get(keys).await;
}

let mut last_error = None;

for attempt in 0..max_attempts {
match self.get(keys).await {
Ok(result) => return Ok(result),
Err(e) => {
if e.internal == ErrorCause::Timeout {
last_error = Some(e);
if attempt < max_attempts - 1 {
debug!("timeout error on attempt {}, retrying", attempt + 1);
}
continue;
LeeSmet marked this conversation as resolved.
Show resolved Hide resolved
}
return Err(e);
}
}
}

Err(last_error.unwrap())
}

/// Returns the [`ZdbConnectionInfo`] object used to connect to this db.
#[inline]
pub fn connection_info(&self) -> &ZdbConnectionInfo {
Expand Down Expand Up @@ -1037,7 +1070,7 @@ impl ZdbError {
}

/// The cause of a zero db error.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
enum ErrorCause {
Redis(redis::RedisError),
Other(String),
Expand Down
Loading