Skip to content

Commit

Permalink
Parallelize S3 delete objects requests
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Nov 1, 2024
1 parent 70a248b commit 2c9e64f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 65 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl fmt::Display for BulkDeleteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"bulk delete error ({} success(es), {} failure(s), {} unattempted)",
"bulk delete error ({} success(es), {} failure(s), {} unattempted)",
self.successes.len(),
self.failures.len(),
self.unattempted.len()
Expand Down
144 changes: 80 additions & 64 deletions quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ use std::collections::HashMap;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{fmt, io};

use anyhow::{anyhow, Context as AnyhhowContext};
use anyhow::anyhow;
use async_trait::async_trait;
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput;
use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::builders::ObjectIdentifierBuilder;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier};
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete};
use aws_sdk_s3::Client as S3Client;
use base64::prelude::{Engine, BASE64_STANDARD};
use futures::{stream, StreamExt};
Expand Down Expand Up @@ -380,35 +381,35 @@ impl S3CompatibleObjectStorage {
fn build_delete_batch_requests<'a>(
&self,
delete_paths: &'a [&'a Path],
) -> anyhow::Result<Vec<(&'a [&'a Path], Delete)>> {
) -> Vec<(Vec<PathBuf>, Delete)> {
#[cfg(test)]
const MAX_NUM_KEYS: usize = 3;

#[cfg(not(test))]
const MAX_NUM_KEYS: usize = 1_000;

let path_chunks = delete_paths.chunks(MAX_NUM_KEYS);
let num_delete_requests = path_chunks.len();
let mut delete_requests: Vec<(&[&Path], Delete)> = Vec::with_capacity(num_delete_requests);

for path_chunk in path_chunks {
let object_ids: Vec<ObjectIdentifier> = path_chunk
.iter()
.map(|path| {
let key = self.key(path);
ObjectIdentifierBuilder::default()
.key(key)
.build()
.context("failed to build object identifier")
})
.collect::<anyhow::Result<_>>()?;
let delete = Delete::builder()
.set_objects(Some(object_ids))
.build()
.context("failed to build delete request")?;
delete_requests.push((path_chunk, delete));
}
Ok(delete_requests)
delete_paths
.chunks(MAX_NUM_KEYS)
.map(|path_chunk| {
let owned_path_chunk: Vec<PathBuf> =
path_chunk.iter().map(|path| path.to_path_buf()).collect();
let object_ids = path_chunk
.iter()
.map(|path| {
let key = self.key(path);
ObjectIdentifierBuilder::default()
.key(key)
.build()
.expect("`key` should bet set")
})
.collect();
let delete = Delete::builder()
.set_objects(Some(object_ids))
.build()
.expect("`objects` should be set");
(owned_path_chunk, delete)
})
.collect()
}

async fn upload_part<'a>(
Expand Down Expand Up @@ -617,30 +618,21 @@ impl S3CompatibleObjectStorage {
/// Bulk delete implementation based on the DeleteObjects API, also called Multi-Object Delete
/// API: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
async fn bulk_delete_multi<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> {
let _permit = REQUEST_SEMAPHORE.acquire().await;
let cancellation_token = CancellationToken::default();
let delete_requests: Vec<(Vec<PathBuf>, Delete)> = self.build_delete_batch_requests(paths);

let delete_requests: Vec<(&[&Path], Delete)> = self
.build_delete_batch_requests(paths)
.map_err(|error: anyhow::Error| {
let unattempted = paths.iter().copied().map(Path::to_path_buf).collect();
BulkDeleteError {
error: Some(StorageErrorKind::Internal.with_error(error)),
successes: Default::default(),
failures: Default::default(),
unattempted,
}
})?;
let delete_objects_futures = delete_requests.into_iter().map(|(path_chunk, delete)| {
let bucket = self.bucket.clone();
let retry_params = self.retry_params;
let cancellation_token = cancellation_token.clone();

let mut error = None;
let mut successes = Vec::with_capacity(paths.len());
let mut failures = HashMap::new();
let mut unattempted = Vec::new();

let mut delete_requests_it = delete_requests.iter();
async move {
let _permit = REQUEST_SEMAPHORE.acquire().await;

for (path_chunk, delete) in &mut delete_requests_it {
let delete_objects_res: StorageResult<DeleteObjectsOutput> =
aws_retry(&self.retry_params, || async {
if cancellation_token.is_cancelled() {
return DeleteObjectsOutcome::Cancelled(path_chunk);
}
let delete_objects_result = aws_retry(&retry_params, || async {
crate::STORAGE_METRICS
.object_storage_bulk_delete_requests_total
.inc();
Expand All @@ -649,16 +641,29 @@ impl S3CompatibleObjectStorage {
.start_timer();
self.s3_client
.delete_objects()
.bucket(self.bucket.clone())
.bucket(bucket.clone())
.delete(delete.clone())
.send()
.await
})
.await
.map_err(Into::into);

match delete_objects_res {
Ok(delete_objects_output) => {
.await;
if delete_objects_result.is_err() {
cancellation_token.cancel();
}
DeleteObjectsOutcome::Completed(delete_objects_result, path_chunk)
}
});
let mut delete_objects_outcome_stream =
futures::stream::iter(delete_objects_futures).buffer_unordered(10);

let mut error: Option<StorageError> = None;
let mut successes: Vec<PathBuf> = Vec::with_capacity(paths.len());
let mut failures: HashMap<PathBuf, DeleteFailure> = HashMap::new();
let mut unattempted: Vec<PathBuf> = Vec::new();

while let Some(delete_objects_outcome) = delete_objects_outcome_stream.next().await {
match delete_objects_outcome {
DeleteObjectsOutcome::Completed(Ok(delete_objects_output), _) => {
if let Some(deleted_objects) = delete_objects_output.deleted {
for deleted_object in deleted_objects {
if let Some(key) = deleted_object.key {
Expand Down Expand Up @@ -688,23 +693,16 @@ impl S3CompatibleObjectStorage {
}
}
}
Err(delete_objects_error) => {
error = Some(delete_objects_error);
unattempted.extend(path_chunk.iter().copied().map(PathBuf::from));
break;
DeleteObjectsOutcome::Completed(Err(delete_objects_error), paths) => {
error = Some(delete_objects_error.into());
unattempted.extend(paths)
}
DeleteObjectsOutcome::Cancelled(paths) => unattempted.extend(paths),
}
}

if error.is_none() && failures.is_empty() {
return Ok(());
}

// Do we have remaining requests?
for (path_chunk, _) in delete_requests_it {
unattempted.extend(path_chunk.iter().copied().map(PathBuf::from));
}

Err(BulkDeleteError {
error,
successes,
Expand Down Expand Up @@ -880,6 +878,24 @@ impl Storage for S3CompatibleObjectStorage {
}
}

enum DeleteObjectsOutcome<T> {
Completed(T, Vec<PathBuf>),
Cancelled(Vec<PathBuf>),
}

#[derive(Clone, Debug, Default)]
struct CancellationToken(Arc<AtomicBool>);

impl CancellationToken {
fn cancel(&self) {
self.0.store(true, Ordering::Release);
}

fn is_cancelled(&self) -> bool {
self.0.load(Ordering::Acquire)
}
}

#[cfg(test)]
mod tests {

Expand Down

0 comments on commit 2c9e64f

Please sign in to comment.