diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 4564539280e3..d6d66fc66e98 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -78,6 +78,8 @@ hyper-util = "0.1" http-body-util = "0.1" rand = "0.8" tempfile = "3.1.0" +regex = "1.11.1" +http = "1.1.0" [[test]] name = "get_range_file" diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 7e719eb1c719..4c4455a84d1d 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -589,16 +589,12 @@ impl AzureClient { Ok(()) } - pub async fn bulk_delete_request(&self, paths: Vec) -> Result>> { - if paths.is_empty() { - return Ok(Vec::new()); - } - - let credential = self.get_credential().await?; - - // https://www.ietf.org/rfc/rfc2046 - let boundary = format!("batch_{}", uuid::Uuid::new_v4()); - + fn build_bulk_delete_body( + &self, + boundary: &str, + paths: &[Path], + credential: &Option>, + ) -> Vec { let mut body_bytes = Vec::with_capacity(paths.len() * 2048); for (idx, path) in paths.iter().enumerate() { @@ -612,14 +608,14 @@ impl AzureClient { // Each subrequest must be authorized individually [1] and we use // the CredentialExt for this. // [1]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body - .with_azure_authorization(&credential, &self.config.account) + .with_azure_authorization(credential, &self.config.account) .build() .unwrap(); // Url for part requests must be relative and without base let relative_url = self.config.service.make_relative(request.url()).unwrap(); - serialize_part_delete_request(&mut body_bytes, &boundary, idx, request, relative_url) + serialize_part_delete_request(&mut body_bytes, boundary, idx, request, relative_url) } // Encode end marker @@ -627,6 +623,20 @@ impl AzureClient { extend(&mut body_bytes, boundary.as_bytes()); extend(&mut body_bytes, b"--"); extend(&mut body_bytes, b"\r\n"); + body_bytes + } + + pub async fn bulk_delete_request(&self, paths: Vec) -> Result>> { + if paths.is_empty() { + return Ok(Vec::new()); + } + + let credential = self.get_credential().await?; + + // https://www.ietf.org/rfc/rfc2046 + let boundary = format!("batch_{}", uuid::Uuid::new_v4()); + + let body_bytes = self.build_bulk_delete_body(&boundary, &paths, &credential); // Send multipart request let url = self.config.path_url(&Path::from("/")); @@ -1085,8 +1095,10 @@ pub(crate) struct UserDelegationKey { #[cfg(test)] mod tests { use bytes::Bytes; + use regex::bytes::Regex; use super::*; + use crate::StaticCredentialProvider; #[test] fn deserde_azure() { @@ -1276,4 +1288,156 @@ mod tests { let _delegated_key_response_internal: UserDelegationKey = quick_xml::de::from_str(S).unwrap(); } + + #[tokio::test] + async fn test_build_bulk_delete_body() { + let credential_provider = Arc::new(StaticCredentialProvider::new( + AzureCredential::BearerToken("static-token".to_string()), + )); + + let config = AzureConfig { + account: "testaccount".to_string(), + container: "testcontainer".to_string(), + credentials: credential_provider, + service: "http://example.com".try_into().unwrap(), + retry_config: Default::default(), + is_emulator: false, + skip_signature: false, + disable_tagging: false, + client_options: Default::default(), + }; + + let client = AzureClient::new(config).unwrap(); + + let credential = client.get_credential().await.unwrap(); + let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; + + let boundary = "batch_statictestboundary".to_string(); + + let body_bytes = client.build_bulk_delete_body(&boundary, paths, &credential); + + // Replace Date header value with a static date + let re = Regex::new("Date:[^\r]+").unwrap(); + let body_bytes = re + .replace_all(&body_bytes, b"Date: Tue, 05 Nov 2024 15:01:15 GMT") + .to_vec(); + + let expected_body = b"--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 0\r +\r +DELETE /testcontainer/a HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 1\r +\r +DELETE /testcontainer/b HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 2\r +\r +DELETE /testcontainer/c HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary--\r\n" + .to_vec(); + + assert_eq!(expected_body, body_bytes); + } + + #[tokio::test] + async fn test_parse_blob_batch_delete_response() { + let response_body = b"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 0\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 1\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2851\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 2\r +\r +HTTP/1.1 404 The specified blob does not exist.\r +x-ms-error-code: BlobNotFound\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r +x-ms-version: 2018-11-09\r +Content-Length: 216\r +Content-Type: application/xml\r +\r + +BlobNotFoundThe specified blob does not exist. +RequestId:778fdc83-801e-0000-62ff-0334671e2852 +Time:2018-06-14T16:46:54.6040685Z\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r\n"; + + let response: reqwest::Response = http::Response::builder() + .status(202) + .header("Transfer-Encoding", "chunked") + .header( + "Content-Type", + "multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed", + ) + .header("x-ms-request-id", "778fdc83-801e-0000-62ff-033467000000") + .header("x-ms-version", "2018-11-09") + .body(Bytes::from(response_body.as_slice())) + .unwrap() + .into(); + + let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; + + let results = parse_blob_batch_delete_response(response, paths) + .await + .unwrap(); + + assert!(results[0].is_ok()); + assert_eq!(&paths[0], results[0].as_ref().unwrap()); + + assert!(results[1].is_ok()); + assert_eq!(&paths[1], results[1].as_ref().unwrap()); + + assert!(results[2].is_err()); + let err = results[2].as_ref().unwrap_err(); + let crate::Error::NotFound { source, .. } = err else { + unreachable!("must be not found") + }; + let Some(Error::DeleteFailed { path, code, reason }) = source.downcast_ref::() + else { + unreachable!("must be client error") + }; + + assert_eq!(paths[2].as_ref(), path); + assert_eq!("404", code); + assert_eq!("The specified blob does not exist.", reason); + } }