Skip to content

Commit

Permalink
Add tests for bulk delete request building and response parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
andrebsguedes committed Nov 5, 2024
1 parent 3b44eef commit 412c3b2
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 12 deletions.
2 changes: 2 additions & 0 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ hyper-util = "0.1"
http-body-util = "0.1"
rand = "0.8"
tempfile = "3.1.0"
regex = "1.11.1"
http = "1.1.0"

[[test]]
name = "get_range_file"
Expand Down
188 changes: 176 additions & 12 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,16 +589,12 @@ impl AzureClient {
Ok(())
}

pub async fn bulk_delete_request(&self, paths: Vec<Path>) -> Result<Vec<Result<Path>>> {
if paths.is_empty() {
return Ok(Vec::new());
}

let credential = self.get_credential().await?;

// https://www.ietf.org/rfc/rfc2046
let boundary = format!("batch_{}", uuid::Uuid::new_v4());

fn build_bulk_delete_body(
&self,
boundary: &str,
paths: &[Path],
credential: &Option<Arc<AzureCredential>>,
) -> Vec<u8> {
let mut body_bytes = Vec::with_capacity(paths.len() * 2048);

for (idx, path) in paths.iter().enumerate() {
Expand All @@ -612,21 +608,35 @@ impl AzureClient {
// Each subrequest must be authorized individually [1] and we use
// the CredentialExt for this.
// [1]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body
.with_azure_authorization(&credential, &self.config.account)
.with_azure_authorization(credential, &self.config.account)
.build()
.unwrap();

// Url for part requests must be relative and without base
let relative_url = self.config.service.make_relative(request.url()).unwrap();

serialize_part_delete_request(&mut body_bytes, &boundary, idx, request, relative_url)
serialize_part_delete_request(&mut body_bytes, boundary, idx, request, relative_url)
}

// Encode end marker
extend(&mut body_bytes, b"--");
extend(&mut body_bytes, boundary.as_bytes());
extend(&mut body_bytes, b"--");
extend(&mut body_bytes, b"\r\n");
body_bytes
}

pub(crate) async fn bulk_delete_request(&self, paths: Vec<Path>) -> Result<Vec<Result<Path>>> {
if paths.is_empty() {
return Ok(Vec::new());
}

let credential = self.get_credential().await?;

// https://www.ietf.org/rfc/rfc2046
let boundary = format!("batch_{}", uuid::Uuid::new_v4());

let body_bytes = self.build_bulk_delete_body(&boundary, &paths, &credential);

// Send multipart request
let url = self.config.path_url(&Path::from("/"));
Expand Down Expand Up @@ -1085,8 +1095,10 @@ pub(crate) struct UserDelegationKey {
#[cfg(test)]
mod tests {
use bytes::Bytes;
use regex::bytes::Regex;

use super::*;
use crate::StaticCredentialProvider;

#[test]
fn deserde_azure() {
Expand Down Expand Up @@ -1276,4 +1288,156 @@ mod tests {
let _delegated_key_response_internal: UserDelegationKey =
quick_xml::de::from_str(S).unwrap();
}

#[tokio::test]
async fn test_build_bulk_delete_body() {
let credential_provider = Arc::new(StaticCredentialProvider::new(
AzureCredential::BearerToken("static-token".to_string()),
));

let config = AzureConfig {
account: "testaccount".to_string(),
container: "testcontainer".to_string(),
credentials: credential_provider,
service: "http://example.com".try_into().unwrap(),
retry_config: Default::default(),
is_emulator: false,
skip_signature: false,
disable_tagging: false,
client_options: Default::default(),
};

let client = AzureClient::new(config).unwrap();

let credential = client.get_credential().await.unwrap();
let paths = &[Path::from("a"), Path::from("b"), Path::from("c")];

let boundary = "batch_statictestboundary".to_string();

let body_bytes = client.build_bulk_delete_body(&boundary, paths, &credential);

// Replace Date header value with a static date
let re = Regex::new("Date:[^\r]+").unwrap();
let body_bytes = re
.replace_all(&body_bytes, b"Date: Tue, 05 Nov 2024 15:01:15 GMT")
.to_vec();

let expected_body = b"--batch_statictestboundary\r
Content-Type: application/http\r
Content-Transfer-Encoding: binary\r
Content-ID: 0\r
\r
DELETE /testcontainer/a HTTP/1.1\r
Content-Length: 0\r
Date: Tue, 05 Nov 2024 15:01:15 GMT\r
X-Ms-Version: 2023-11-03\r
Authorization: Bearer static-token\r
\r
\r
--batch_statictestboundary\r
Content-Type: application/http\r
Content-Transfer-Encoding: binary\r
Content-ID: 1\r
\r
DELETE /testcontainer/b HTTP/1.1\r
Content-Length: 0\r
Date: Tue, 05 Nov 2024 15:01:15 GMT\r
X-Ms-Version: 2023-11-03\r
Authorization: Bearer static-token\r
\r
\r
--batch_statictestboundary\r
Content-Type: application/http\r
Content-Transfer-Encoding: binary\r
Content-ID: 2\r
\r
DELETE /testcontainer/c HTTP/1.1\r
Content-Length: 0\r
Date: Tue, 05 Nov 2024 15:01:15 GMT\r
X-Ms-Version: 2023-11-03\r
Authorization: Bearer static-token\r
\r
\r
--batch_statictestboundary--\r\n"
.to_vec();

assert_eq!(expected_body, body_bytes);
}

#[tokio::test]
async fn test_parse_blob_batch_delete_response() {
let response_body = b"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
Content-Type: application/http\r
Content-ID: 0\r
\r
HTTP/1.1 202 Accepted\r
x-ms-delete-type-permanent: true\r
x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r
x-ms-version: 2018-11-09\r
\r
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
Content-Type: application/http\r
Content-ID: 1\r
\r
HTTP/1.1 202 Accepted\r
x-ms-delete-type-permanent: true\r
x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2851\r
x-ms-version: 2018-11-09\r
\r
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
Content-Type: application/http\r
Content-ID: 2\r
\r
HTTP/1.1 404 The specified blob does not exist.\r
x-ms-error-code: BlobNotFound\r
x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r
x-ms-version: 2018-11-09\r
Content-Length: 216\r
Content-Type: application/xml\r
\r
<?xml version=\"1.0\" encoding=\"utf-8\"?>
<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.
RequestId:778fdc83-801e-0000-62ff-0334671e2852
Time:2018-06-14T16:46:54.6040685Z</Message></Error>\r
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r\n";

let response: reqwest::Response = http::Response::builder()
.status(202)
.header("Transfer-Encoding", "chunked")
.header(
"Content-Type",
"multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed",
)
.header("x-ms-request-id", "778fdc83-801e-0000-62ff-033467000000")
.header("x-ms-version", "2018-11-09")
.body(Bytes::from(response_body.as_slice()))
.unwrap()
.into();

let paths = &[Path::from("a"), Path::from("b"), Path::from("c")];

let results = parse_blob_batch_delete_response(response, paths)
.await
.unwrap();

assert!(results[0].is_ok());
assert_eq!(&paths[0], results[0].as_ref().unwrap());

assert!(results[1].is_ok());
assert_eq!(&paths[1], results[1].as_ref().unwrap());

assert!(results[2].is_err());
let err = results[2].as_ref().unwrap_err();
let crate::Error::NotFound { source, .. } = err else {
unreachable!("must be not found")
};
let Some(Error::DeleteFailed { path, code, reason }) = source.downcast_ref::<Error>()
else {
unreachable!("must be client error")
};

assert_eq!(paths[2].as_ref(), path);
assert_eq!("404", code);
assert_eq!("The specified blob does not exist.", reason);
}
}

0 comments on commit 412c3b2

Please sign in to comment.