From efd5c7c61eb9ea62a5f7599f1de7b13b70ab2295 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 22 Apr 2024 19:19:37 -0300 Subject: [PATCH 01/14] Implement bulk_delete_request for Azure --- object_store/Cargo.toml | 4 +- object_store/src/azure/client.rs | 280 ++++++++++++++++++++++++++++++- object_store/src/azure/mod.rs | 22 ++- 3 files changed, 303 insertions(+), 3 deletions(-) diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 86d1392ebf61..5298dd8ad907 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -55,13 +55,15 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] } md-5 = { version = "0.10.6", default-features = false, optional = true } +uuid = { version = "1.7.0", default-features = false, features = ["v4"], optional = true } +httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.29.0", features = ["fs"] } [features] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] -azure = ["cloud"] +azure = ["cloud", "uuid", "httparse"] gcp = ["cloud", "rustls-pemfile"] aws = ["cloud", "md-5"] http = ["cloud"] diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index e78f8db7a8c8..213aec51871d 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -37,7 +37,7 @@ use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use hyper::http::HeaderName; use reqwest::{ - header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH}, + header::{HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}, Client as ReqwestClient, Method, RequestBuilder, Response, }; use serde::{Deserialize, Serialize}; @@ -47,6 +47,7 @@ use std::sync::Arc; use std::time::Duration; use url::Url; + const VERSION_HEADER: &str = "x-ms-version-id"; const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-ms-meta-"; static MS_CACHE_CONTROL: HeaderName = HeaderName::from_static("x-ms-blob-cache-control"); @@ -79,6 +80,43 @@ pub(crate) enum Error { path: String, }, + #[snafu(display("Error performing bulk delete request: {}", source))] + BulkDeleteRequest { + source: crate::client::retry::Error + }, + + #[snafu(display("Error receiving bulk delete request body: {}", source))] + BulkDeleteRequestBody { + source: reqwest::Error + }, + + #[snafu(display( + "Bulk delete request failed due to invalid input: {} (code: {})", + reason, + code + ))] + BulkDeleteRequestInvalidInput { + code: String, + reason: String, + }, + + #[snafu(display("Got invalid bulk delete response: {}", reason))] + InvalidBulkDeleteResponse { + reason: String + }, + + #[snafu(display( + "Bulk delete request failed for key {}: {} (code: {})", + path, + reason, + code + ))] + DeleteFailed { + path: String, + code: String, + reason: String, + }, + #[snafu(display("Error performing list request: {}", source))] ListRequest { source: crate::client::retry::Error }, @@ -247,6 +285,148 @@ impl<'a> PutRequest<'a> { } } +#[inline] +fn extend(dst: &mut Vec, data: &[u8]) { + dst.extend_from_slice(data); +} + +// Write header names as title case. The header name is assumed to be ASCII. +fn title_case(dst: &mut Vec, name: &[u8]) { + dst.reserve(name.len()); + + // Ensure first character is uppercased + let mut prev = b'-'; + for &(mut c) in name { + if prev == b'-' { + c.make_ascii_uppercase(); + } + dst.push(c); + prev = c; + } +} + +fn write_headers(headers: &HeaderMap, dst: &mut Vec) { + for (name, value) in headers { + if name == "content-id" { + extend(dst, b"Content-ID"); + } else { + title_case(dst, name.as_str().as_bytes()); + } + extend(dst, b": "); + extend(dst, value.as_bytes()); + extend(dst, b"\r\n"); + } +} + +fn serialize_part_request(dst: &mut Vec, boundary: &str, idx: usize, request: reqwest::Request, relative_url: String) { + // Encode start marker for part + extend(dst, b"--"); + extend(dst, boundary.as_bytes()); + extend(dst, b"\r\n"); + + // Encode part headers + let mut part_headers = HeaderMap::new(); + part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); + part_headers.insert("Content-Transfer-Encoding", "binary".parse().unwrap()); + part_headers.insert("Content-ID", HeaderValue::from(idx)); + write_headers(&part_headers, dst); + extend(dst, b"\r\n"); + + // Encode the subrequest request-line + extend(dst, b"DELETE "); + extend(dst, format!("/{} ", relative_url).as_bytes()); + extend(dst, b"HTTP/1.1"); + extend(dst, b"\r\n"); + + // Encode subrequest headers + write_headers(request.headers(), dst); + extend(dst, b"\r\n"); + extend(dst, b"\r\n"); +} + +fn parse_response_part(remaining: &mut &[u8], results: &mut Vec>, paths: &[Path]) -> Result<()> { + + let invalid_response = |msg: &str| { + Error::InvalidBulkDeleteResponse { + reason: msg.to_string() + } + }; + + // Parse part headers and retrieve part id + let mut headers = [httparse::EMPTY_HEADER; 4]; + let id = match httparse::parse_headers(remaining, &mut headers) { + Ok(httparse::Status::Complete((pos, headers))) => { + *remaining = &remaining[pos..]; + headers + .iter() + .find(|h| h.name.to_lowercase() == "content-id") + .and_then(|h| std::str::from_utf8(h.value).ok()) + .and_then(|v| v.parse::().ok()) + } + _ => { + return Err(invalid_response("unable to parse parse headers").into()) + } + }; + + // Parse part response headers + let mut headers = [httparse::EMPTY_HEADER; 10]; + let mut part_response = httparse::Response::new(&mut headers); + let content_length = match part_response.parse(remaining) { + Ok(httparse::Status::Complete(pos)) => { + *remaining = &remaining[pos..]; + part_response + .headers + .iter() + .find(|h| h.name.to_lowercase() == "content-length") + .and_then(|h| std::str::from_utf8(h.value).ok()) + .and_then(|v| v.parse::().ok()) + .unwrap_or_default() + } + _ => { + return Err(invalid_response("unable to parse response").into()) + } + }; + + let part_body = &remaining[..content_length]; + + // Set slice past part body + *remaining = &remaining[content_length..]; + + if !part_body.is_empty() { + // Skips CRLF after body if non-empty + *remaining = remaining + .strip_prefix(b"\r\n") + .ok_or_else(|| invalid_response("missing part separator"))?; + } + + match (id, part_response.code) { + (Some(_id), Some(code)) if (200..300).contains(&code) => {} + (Some(id), Some(code)) => { + results[id] = Err(Error::DeleteFailed { + path: paths[id].as_ref().to_string(), + code: code.to_string(), + reason: part_response.reason + .unwrap_or_default() + .to_string(), + }.into()); + } + (None, Some(code)) => { + return Err(Error::BulkDeleteRequestInvalidInput { + code: code.to_string(), + reason: part_response.reason + .unwrap_or_default() + .to_string(), + }.into()) + } + _ => { + return Err(invalid_response("missing part response status code").into()) + } + } + + Ok(()) +} + + #[derive(Debug)] pub(crate) struct AzureClient { config: AzureConfig, @@ -380,6 +560,104 @@ impl AzureClient { Ok(()) } + pub async fn bulk_delete_request(&self, paths: Vec) -> Result>> { + if paths.is_empty() { + return Ok(Vec::new()); + } + + let credential = self.get_credential().await?; + + let boundary = format!("batch_{}", uuid::Uuid::new_v4()); + + let mut body_bytes = Vec::with_capacity(paths.len() * 256); + + for (idx, path) in paths.iter().enumerate() { + let url = self.config.path_url(path); + + // Build subrequest with proper authorization + let request = self.client.request(Method::DELETE, url) + .header(CONTENT_LENGTH, HeaderValue::from(0)) + .with_azure_authorization(&credential, &self.config.account) + .build().unwrap(); + + // Url for part requests must be relative and without base + let relative_url = self.config.service.make_relative(request.url()).unwrap(); + + serialize_part_request(&mut body_bytes, &boundary, idx, request, relative_url) + } + + // Encode end marker + extend(&mut body_bytes, b"--"); + extend(&mut body_bytes, boundary.as_bytes()); + extend(&mut body_bytes, b"--"); + extend(&mut body_bytes, b"\r\n"); + + // Send multipart request + let url = self.config.path_url(&Path::from("/")); + let batch_response = self.client.request(Method::POST, url) + .query(&[("restype", "container"), ("comp", "batch")]) + .header(CONTENT_TYPE, HeaderValue::from_str(format!("multipart/mixed; boundary={}", boundary).as_str()).unwrap()) + .header(CONTENT_LENGTH, HeaderValue::from(body_bytes.len())) + .body(body_bytes) + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(BulkDeleteRequestSnafu {})?; + + let invalid_response = |msg: &str| { + Error::InvalidBulkDeleteResponse { + reason: msg.to_string() + } + }; + + let content_type = batch_response + .headers() + .get(CONTENT_TYPE) + .ok_or_else(|| invalid_response("missing Content-Type"))?; + + let boundary = content_type + .as_ref() + .strip_prefix(b"multipart/mixed; boundary=") + .ok_or_else(|| invalid_response("invalid Content-Type value"))? + .to_vec(); + + let start_marker = [ + b"--".as_slice(), + boundary.as_slice(), + b"\r\n" + ].concat(); + + let end_marker = [ + b"--".as_slice(), + boundary.as_slice(), + b"--" + ].concat(); + + let response_body = batch_response + .bytes() + .await + .context(BulkDeleteRequestBodySnafu {})?; + + let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); + + let mut remaining = response_body.as_ref(); + + loop { + // Check for part marker + remaining = remaining + .strip_prefix(start_marker.as_slice()) + .ok_or_else(|| invalid_response("missing start marker for part"))?; + + parse_response_part(&mut remaining, &mut results, &paths)?; + + if remaining == end_marker { + break; + } + } + + Ok(results) + } + /// Make an Azure Copy request pub(crate) async fn copy_request(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { let credential = self.get_credential().await?; diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index f89a184f9523..177bffb653ae 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -30,7 +30,7 @@ use crate::{ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; -use futures::stream::BoxStream; +use futures::stream::{BoxStream, StreamExt, TryStreamExt}; use reqwest::Method; use std::fmt::Debug; use std::sync::Arc; @@ -119,6 +119,26 @@ impl ObjectStore for MicrosoftAzure { self.client.delete_request(location, &()).await } + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, Result>, + ) -> BoxStream<'a, Result> { + locations + .try_chunks(256) + .map(move |locations| async { + // Early return the error. We ignore the paths that have already been + // collected into the chunk. + let locations = locations.map_err(|e| e.1)?; + self.client + .bulk_delete_request(locations) + .await + .map(futures::stream::iter) + }) + .buffered(20) + .try_flatten() + .boxed() + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { self.client.list(prefix) } From 075cc21c34395b0719261445306ab96dcc175b9b Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Tue, 23 Apr 2024 12:56:26 -0300 Subject: [PATCH 02/14] Fix lint and add Azurite bug workaround --- object_store/src/azure/client.rs | 110 +++++++++++++++---------------- 1 file changed, 53 insertions(+), 57 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 213aec51871d..17ffe35753f0 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -47,7 +47,6 @@ use std::sync::Arc; use std::time::Duration; use url::Url; - const VERSION_HEADER: &str = "x-ms-version-id"; const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-ms-meta-"; static MS_CACHE_CONTROL: HeaderName = HeaderName::from_static("x-ms-blob-cache-control"); @@ -81,29 +80,20 @@ pub(crate) enum Error { }, #[snafu(display("Error performing bulk delete request: {}", source))] - BulkDeleteRequest { - source: crate::client::retry::Error - }, + BulkDeleteRequest { source: crate::client::retry::Error }, #[snafu(display("Error receiving bulk delete request body: {}", source))] - BulkDeleteRequestBody { - source: reqwest::Error - }, + BulkDeleteRequestBody { source: reqwest::Error }, #[snafu(display( "Bulk delete request failed due to invalid input: {} (code: {})", reason, code ))] - BulkDeleteRequestInvalidInput { - code: String, - reason: String, - }, + BulkDeleteRequestInvalidInput { code: String, reason: String }, #[snafu(display("Got invalid bulk delete response: {}", reason))] - InvalidBulkDeleteResponse { - reason: String - }, + InvalidBulkDeleteResponse { reason: String }, #[snafu(display( "Bulk delete request failed for key {}: {} (code: {})", @@ -318,7 +308,13 @@ fn write_headers(headers: &HeaderMap, dst: &mut Vec) { } } -fn serialize_part_request(dst: &mut Vec, boundary: &str, idx: usize, request: reqwest::Request, relative_url: String) { +fn serialize_part_request( + dst: &mut Vec, + boundary: &str, + idx: usize, + request: reqwest::Request, + relative_url: String, +) { // Encode start marker for part extend(dst, b"--"); extend(dst, boundary.as_bytes()); @@ -344,12 +340,13 @@ fn serialize_part_request(dst: &mut Vec, boundary: &str, idx: usize, request extend(dst, b"\r\n"); } -fn parse_response_part(remaining: &mut &[u8], results: &mut Vec>, paths: &[Path]) -> Result<()> { - - let invalid_response = |msg: &str| { - Error::InvalidBulkDeleteResponse { - reason: msg.to_string() - } +fn parse_response_part( + remaining: &mut &[u8], + results: &mut [Result], + paths: &[Path], +) -> Result<()> { + let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { + reason: msg.to_string(), }; // Parse part headers and retrieve part id @@ -363,9 +360,7 @@ fn parse_response_part(remaining: &mut &[u8], results: &mut Vec>, p .and_then(|h| std::str::from_utf8(h.value).ok()) .and_then(|v| v.parse::().ok()) } - _ => { - return Err(invalid_response("unable to parse parse headers").into()) - } + _ => return Err(invalid_response("unable to parse parse headers").into()), }; // Parse part response headers @@ -382,9 +377,7 @@ fn parse_response_part(remaining: &mut &[u8], results: &mut Vec>, p .and_then(|v| v.parse::().ok()) .unwrap_or_default() } - _ => { - return Err(invalid_response("unable to parse response").into()) - } + _ => return Err(invalid_response("unable to parse response").into()), }; let part_body = &remaining[..content_length]; @@ -405,28 +398,23 @@ fn parse_response_part(remaining: &mut &[u8], results: &mut Vec>, p results[id] = Err(Error::DeleteFailed { path: paths[id].as_ref().to_string(), code: code.to_string(), - reason: part_response.reason - .unwrap_or_default() - .to_string(), - }.into()); + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into()); } (None, Some(code)) => { return Err(Error::BulkDeleteRequestInvalidInput { code: code.to_string(), - reason: part_response.reason - .unwrap_or_default() - .to_string(), - }.into()) - } - _ => { - return Err(invalid_response("missing part response status code").into()) + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into()) } + _ => return Err(invalid_response("missing part response status code").into()), } Ok(()) } - #[derive(Debug)] pub(crate) struct AzureClient { config: AzureConfig, @@ -575,10 +563,13 @@ impl AzureClient { let url = self.config.path_url(path); // Build subrequest with proper authorization - let request = self.client.request(Method::DELETE, url) + let request = self + .client + .request(Method::DELETE, url) .header(CONTENT_LENGTH, HeaderValue::from(0)) .with_azure_authorization(&credential, &self.config.account) - .build().unwrap(); + .build() + .unwrap(); // Url for part requests must be relative and without base let relative_url = self.config.service.make_relative(request.url()).unwrap(); @@ -594,9 +585,15 @@ impl AzureClient { // Send multipart request let url = self.config.path_url(&Path::from("/")); - let batch_response = self.client.request(Method::POST, url) + let batch_response = self + .client + .request(Method::POST, url) .query(&[("restype", "container"), ("comp", "batch")]) - .header(CONTENT_TYPE, HeaderValue::from_str(format!("multipart/mixed; boundary={}", boundary).as_str()).unwrap()) + .header( + CONTENT_TYPE, + HeaderValue::from_str(format!("multipart/mixed; boundary={}", boundary).as_str()) + .unwrap(), + ) .header(CONTENT_LENGTH, HeaderValue::from(body_bytes.len())) .body(body_bytes) .with_azure_authorization(&credential, &self.config.account) @@ -604,10 +601,8 @@ impl AzureClient { .await .context(BulkDeleteRequestSnafu {})?; - let invalid_response = |msg: &str| { - Error::InvalidBulkDeleteResponse { - reason: msg.to_string() - } + let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { + reason: msg.to_string(), }; let content_type = batch_response @@ -621,17 +616,9 @@ impl AzureClient { .ok_or_else(|| invalid_response("invalid Content-Type value"))? .to_vec(); - let start_marker = [ - b"--".as_slice(), - boundary.as_slice(), - b"\r\n" - ].concat(); + let start_marker = [b"--".as_slice(), boundary.as_slice(), b"\r\n"].concat(); - let end_marker = [ - b"--".as_slice(), - boundary.as_slice(), - b"--" - ].concat(); + let end_marker = [b"--".as_slice(), boundary.as_slice(), b"--"].concat(); let response_body = batch_response .bytes() @@ -650,6 +637,15 @@ impl AzureClient { parse_response_part(&mut remaining, &mut results, &paths)?; + // Workaround for Azurite bug where it does not set content-length but still sends some + // body. This code skips to the next part or the end. + if let Some(pos) = remaining + .windows(start_marker.len()) + .position(|s| s == start_marker.as_slice() || s == end_marker.as_slice()) + { + remaining = &remaining[pos..]; + } + if remaining == end_marker { break; } From 5f0331b80ca11bb998bf12d6aba2335bce6ce328 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Tue, 23 Apr 2024 13:10:25 -0300 Subject: [PATCH 03/14] Special 404 error case --- object_store/src/azure/client.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 17ffe35753f0..bd51a08bd441 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -394,6 +394,18 @@ fn parse_response_part( match (id, part_response.code) { (Some(_id), Some(code)) if (200..300).contains(&code) => {} + (Some(id), Some(404)) => { + results[id] = Err(crate::Error::NotFound { + path: paths[id].as_ref().to_string(), + source: Error::DeleteFailed { + path: paths[id].as_ref().to_string(), + code: 404.to_string(), + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into(), + } + .into()); + } (Some(id), Some(code)) => { results[id] = Err(Error::DeleteFailed { path: paths[id].as_ref().to_string(), From a384fc1635492308238e68179ed4ed8ebfd0ccb2 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Tue, 23 Apr 2024 13:40:01 -0300 Subject: [PATCH 04/14] Clippy fix --- object_store/src/azure/client.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index bd51a08bd441..c48d349a0eb6 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -403,8 +403,7 @@ fn parse_response_part( reason: part_response.reason.unwrap_or_default().to_string(), } .into(), - } - .into()); + }); } (Some(id), Some(code)) => { results[id] = Err(Error::DeleteFailed { From 2787ad2b5d013a77289cc43ab1bd3c36f5ccd715 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 29 Jul 2024 13:34:08 -0300 Subject: [PATCH 05/14] Make number of expected headers more conservative and better document invariants --- object_store/src/azure/client.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index c48d349a0eb6..21eb684d0d59 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -281,6 +281,7 @@ fn extend(dst: &mut Vec, data: &[u8]) { } // Write header names as title case. The header name is assumed to be ASCII. +// We need it because Azure is not always treating headers as case insensitive. fn title_case(dst: &mut Vec, name: &[u8]) { dst.reserve(name.len()); @@ -297,6 +298,8 @@ fn title_case(dst: &mut Vec, name: &[u8]) { fn write_headers(headers: &HeaderMap, dst: &mut Vec) { for (name, value) in headers { + // We need special case handling here otherwise Azure returns 400 + // due to `Content-Id` instead of `Content-ID` if name == "content-id" { extend(dst, b"Content-ID"); } else { @@ -308,6 +311,7 @@ fn write_headers(headers: &HeaderMap, dst: &mut Vec) { } } +/// https://docs.oasis-open.org/odata/odata/v4.0/errata02/os/complete/part1-protocol/odata-v4.0-errata02-os-part1-protocol-complete.html#_Toc406398359 fn serialize_part_request( dst: &mut Vec, boundary: &str, @@ -322,8 +326,9 @@ fn serialize_part_request( // Encode part headers let mut part_headers = HeaderMap::new(); - part_headers.insert(CONTENT_TYPE, "application/http".parse().unwrap()); + part_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/http")); part_headers.insert("Content-Transfer-Encoding", "binary".parse().unwrap()); + // Azure returns 400 if we send `Content-Id` instead of `Content-ID` part_headers.insert("Content-ID", HeaderValue::from(idx)); write_headers(&part_headers, dst); extend(dst, b"\r\n"); @@ -349,8 +354,11 @@ fn parse_response_part( reason: msg.to_string(), }; - // Parse part headers and retrieve part id - let mut headers = [httparse::EMPTY_HEADER; 4]; + // Parse part headers and retrieve part id. + // Documentation only mentions two possible headers for the start of a part + // we leave space for 8 just in case: + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#response-body + let mut headers = [httparse::EMPTY_HEADER; 8]; let id = match httparse::parse_headers(remaining, &mut headers) { Ok(httparse::Status::Complete((pos, headers))) => { *remaining = &remaining[pos..]; @@ -364,7 +372,11 @@ fn parse_response_part( }; // Parse part response headers - let mut headers = [httparse::EMPTY_HEADER; 10]; + // Documentation mentions 5 headers and states that other standard HTTP headers + // may be provided, in order to not incurr in more complexity to support an arbitrary + // amount of headers we chose a conservative amount and error otherwise + // https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob?tabs=microsoft-entra-id#response-headers + let mut headers = [httparse::EMPTY_HEADER; 48]; let mut part_response = httparse::Response::new(&mut headers); let content_length = match part_response.parse(remaining) { Ok(httparse::Status::Complete(pos)) => { @@ -566,9 +578,10 @@ impl AzureClient { let credential = self.get_credential().await?; + // https://www.ietf.org/rfc/rfc2046 let boundary = format!("batch_{}", uuid::Uuid::new_v4()); - let mut body_bytes = Vec::with_capacity(paths.len() * 256); + let mut body_bytes = Vec::with_capacity(paths.len() * 2048); for (idx, path) in paths.iter().enumerate() { let url = self.config.path_url(path); @@ -578,6 +591,9 @@ impl AzureClient { .client .request(Method::DELETE, url) .header(CONTENT_LENGTH, HeaderValue::from(0)) + // Each subrequest must be authorized individually [1] and we use + // the CredentialExt for this. + // [1]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body .with_azure_authorization(&credential, &self.config.account) .build() .unwrap(); From 85c14b1000227caf1174f8a8b1c00b584f8dd053 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 29 Jul 2024 17:25:44 -0300 Subject: [PATCH 06/14] Use multer for multipart parsing --- object_store/Cargo.toml | 1 + object_store/src/azure/client.rs | 226 ++++++++++++++----------------- 2 files changed, 102 insertions(+), 125 deletions(-) diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 5298dd8ad907..c86df894836a 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -57,6 +57,7 @@ tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-ut md-5 = { version = "0.10.6", default-features = false, optional = true } uuid = { version = "1.7.0", default-features = false, features = ["v4"], optional = true } httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } +multer = "3.1.0" [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.29.0", features = ["fs"] } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 21eb684d0d59..bdccec6b8031 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -83,7 +83,7 @@ pub(crate) enum Error { BulkDeleteRequest { source: crate::client::retry::Error }, #[snafu(display("Error receiving bulk delete request body: {}", source))] - BulkDeleteRequestBody { source: reqwest::Error }, + BulkDeleteRequestBody { source: multer::Error }, #[snafu(display( "Bulk delete request failed due to invalid input: {} (code: {})", @@ -312,7 +312,7 @@ fn write_headers(headers: &HeaderMap, dst: &mut Vec) { } /// https://docs.oasis-open.org/odata/odata/v4.0/errata02/os/complete/part1-protocol/odata-v4.0-errata02-os-part1-protocol-complete.html#_Toc406398359 -fn serialize_part_request( +fn serialize_part_delete_request( dst: &mut Vec, boundary: &str, idx: usize, @@ -345,97 +345,121 @@ fn serialize_part_request( extend(dst, b"\r\n"); } -fn parse_response_part( - remaining: &mut &[u8], - results: &mut [Result], - paths: &[Path], -) -> Result<()> { +fn parse_multipart_response_boundary(response: &Response) -> Result { let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { reason: msg.to_string(), }; - // Parse part headers and retrieve part id. - // Documentation only mentions two possible headers for the start of a part - // we leave space for 8 just in case: - // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#response-body - let mut headers = [httparse::EMPTY_HEADER; 8]; - let id = match httparse::parse_headers(remaining, &mut headers) { - Ok(httparse::Status::Complete((pos, headers))) => { - *remaining = &remaining[pos..]; - headers - .iter() - .find(|h| h.name.to_lowercase() == "content-id") - .and_then(|h| std::str::from_utf8(h.value).ok()) - .and_then(|v| v.parse::().ok()) - } - _ => return Err(invalid_response("unable to parse parse headers").into()), - }; + let content_type = response + .headers() + .get(CONTENT_TYPE) + .ok_or_else(|| invalid_response("missing Content-Type"))?; - // Parse part response headers - // Documentation mentions 5 headers and states that other standard HTTP headers - // may be provided, in order to not incurr in more complexity to support an arbitrary - // amount of headers we chose a conservative amount and error otherwise - // https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob?tabs=microsoft-entra-id#response-headers - let mut headers = [httparse::EMPTY_HEADER; 48]; - let mut part_response = httparse::Response::new(&mut headers); - let content_length = match part_response.parse(remaining) { - Ok(httparse::Status::Complete(pos)) => { - *remaining = &remaining[pos..]; - part_response - .headers - .iter() - .find(|h| h.name.to_lowercase() == "content-length") - .and_then(|h| std::str::from_utf8(h.value).ok()) - .and_then(|v| v.parse::().ok()) - .unwrap_or_default() - } - _ => return Err(invalid_response("unable to parse response").into()), + let boundary = content_type + .as_ref() + .strip_prefix(b"multipart/mixed; boundary=") + .ok_or_else(|| invalid_response("invalid Content-Type value"))? + .to_vec(); + + let boundary = String::from_utf8(boundary) + .map_err(|_| invalid_response("invalid multipart boundary"))?; + + Ok(boundary) +} + +async fn parse_blob_batch_delete_response(batch_response: Response, paths: &[Path]) -> Result>> { + let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { + reason: msg.to_string(), }; - let part_body = &remaining[..content_length]; + let boundary = parse_multipart_response_boundary(&batch_response)?; - // Set slice past part body - *remaining = &remaining[content_length..]; + let stream = batch_response.bytes_stream(); - if !part_body.is_empty() { - // Skips CRLF after body if non-empty - *remaining = remaining - .strip_prefix(b"\r\n") - .ok_or_else(|| invalid_response("missing part separator"))?; - } + let mut multipart = multer::Multipart::new(stream, boundary); + + let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); + + while let Some(mut part) = multipart + .next_field() + .await + .context(BulkDeleteRequestBodySnafu {})? { - match (id, part_response.code) { - (Some(_id), Some(code)) if (200..300).contains(&code) => {} - (Some(id), Some(404)) => { - results[id] = Err(crate::Error::NotFound { - path: paths[id].as_ref().to_string(), - source: Error::DeleteFailed { + let id = part.headers() + .get("content-id") + .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) + .and_then(|v| v.parse::().ok()); + + let mut raw_part_response = Vec::with_capacity(2048); + while let Some(bytes) = part.chunk().await + .context(BulkDeleteRequestBodySnafu {})? { + raw_part_response.extend_from_slice(&bytes); + } + + // We add this extra CRLF because multer will unconditionally skip it even for requests with + // an empty body. + raw_part_response.extend_from_slice(b"\r\n"); + + println!("{:?}", String::from_utf8_lossy(&raw_part_response)); + // Parse part response headers + // Documentation mentions 5 headers and states that other standard HTTP headers + // may be provided, in order to not incurr in more complexity to support an arbitrary + // amount of headers we chose a conservative amount and error otherwise + // https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob?tabs=microsoft-entra-id#response-headers + let mut headers = [httparse::EMPTY_HEADER; 48]; + let mut part_response = httparse::Response::new(&mut headers); + let (body, content_length) = match part_response.parse(&raw_part_response) { + Ok(httparse::Status::Complete(pos)) => { + ( + &raw_part_response[pos..], + part_response + .headers + .iter() + .find(|h| h.name.to_lowercase() == "content-length") + .and_then(|h| std::str::from_utf8(h.value).ok()) + .and_then(|v| v.parse::().ok()) + .unwrap_or_default() + ) + } + e => { + println!("{:?}", e); + return Err(invalid_response("unable to parse response").into()) + }, + }; + + match (id, part_response.code) { + (Some(_id), Some(code)) if (200..300).contains(&code) => {} + (Some(id), Some(404)) => { + results[id] = Err(crate::Error::NotFound { path: paths[id].as_ref().to_string(), - code: 404.to_string(), + source: Error::DeleteFailed { + path: paths[id].as_ref().to_string(), + code: 404.to_string(), + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into(), + }); + } + (Some(id), Some(code)) => { + results[id] = Err(Error::DeleteFailed { + path: paths[id].as_ref().to_string(), + code: code.to_string(), reason: part_response.reason.unwrap_or_default().to_string(), } - .into(), - }); - } - (Some(id), Some(code)) => { - results[id] = Err(Error::DeleteFailed { - path: paths[id].as_ref().to_string(), - code: code.to_string(), - reason: part_response.reason.unwrap_or_default().to_string(), + .into()); } - .into()); - } - (None, Some(code)) => { - return Err(Error::BulkDeleteRequestInvalidInput { - code: code.to_string(), - reason: part_response.reason.unwrap_or_default().to_string(), + (None, Some(code)) => { + return Err(Error::BulkDeleteRequestInvalidInput { + code: code.to_string(), + reason: part_response.reason.unwrap_or_default().to_string(), + } + .into()) } - .into()) + _ => return Err(invalid_response("missing part response status code").into()), } - _ => return Err(invalid_response("missing part response status code").into()), } - Ok(()) + Ok(results) } #[derive(Debug)] @@ -601,7 +625,7 @@ impl AzureClient { // Url for part requests must be relative and without base let relative_url = self.config.service.make_relative(request.url()).unwrap(); - serialize_part_request(&mut body_bytes, &boundary, idx, request, relative_url) + serialize_part_delete_request(&mut body_bytes, &boundary, idx, request, relative_url) } // Encode end marker @@ -628,55 +652,7 @@ impl AzureClient { .await .context(BulkDeleteRequestSnafu {})?; - let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { - reason: msg.to_string(), - }; - - let content_type = batch_response - .headers() - .get(CONTENT_TYPE) - .ok_or_else(|| invalid_response("missing Content-Type"))?; - - let boundary = content_type - .as_ref() - .strip_prefix(b"multipart/mixed; boundary=") - .ok_or_else(|| invalid_response("invalid Content-Type value"))? - .to_vec(); - - let start_marker = [b"--".as_slice(), boundary.as_slice(), b"\r\n"].concat(); - - let end_marker = [b"--".as_slice(), boundary.as_slice(), b"--"].concat(); - - let response_body = batch_response - .bytes() - .await - .context(BulkDeleteRequestBodySnafu {})?; - - let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); - - let mut remaining = response_body.as_ref(); - - loop { - // Check for part marker - remaining = remaining - .strip_prefix(start_marker.as_slice()) - .ok_or_else(|| invalid_response("missing start marker for part"))?; - - parse_response_part(&mut remaining, &mut results, &paths)?; - - // Workaround for Azurite bug where it does not set content-length but still sends some - // body. This code skips to the next part or the end. - if let Some(pos) = remaining - .windows(start_marker.len()) - .position(|s| s == start_marker.as_slice() || s == end_marker.as_slice()) - { - remaining = &remaining[pos..]; - } - - if remaining == end_marker { - break; - } - } + let results = parse_blob_batch_delete_response(batch_response, &paths).await?; Ok(results) } From 098ab45a34c82dcde3568a5f65ee4cdac92dcf54 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 29 Jul 2024 17:40:43 -0300 Subject: [PATCH 07/14] Fix clippy --- object_store/src/azure/client.rs | 38 ++++++++++++-------------------- 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index bdccec6b8031..302a02691f0c 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -361,13 +361,16 @@ fn parse_multipart_response_boundary(response: &Response) -> Result { .ok_or_else(|| invalid_response("invalid Content-Type value"))? .to_vec(); - let boundary = String::from_utf8(boundary) - .map_err(|_| invalid_response("invalid multipart boundary"))?; + let boundary = + String::from_utf8(boundary).map_err(|_| invalid_response("invalid multipart boundary"))?; Ok(boundary) } -async fn parse_blob_batch_delete_response(batch_response: Response, paths: &[Path]) -> Result>> { +async fn parse_blob_batch_delete_response( + batch_response: Response, + paths: &[Path], +) -> Result>> { let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { reason: msg.to_string(), }; @@ -383,16 +386,17 @@ async fn parse_blob_batch_delete_response(batch_response: Response, paths: &[Pat while let Some(mut part) = multipart .next_field() .await - .context(BulkDeleteRequestBodySnafu {})? { + .context(BulkDeleteRequestBodySnafu {})? + { - let id = part.headers() + let id = part + .headers() .get("content-id") .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) .and_then(|v| v.parse::().ok()); let mut raw_part_response = Vec::with_capacity(2048); - while let Some(bytes) = part.chunk().await - .context(BulkDeleteRequestBodySnafu {})? { + while let Some(bytes) = part.chunk().await.context(BulkDeleteRequestBodySnafu {})? { raw_part_response.extend_from_slice(&bytes); } @@ -408,23 +412,9 @@ async fn parse_blob_batch_delete_response(batch_response: Response, paths: &[Pat // https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob?tabs=microsoft-entra-id#response-headers let mut headers = [httparse::EMPTY_HEADER; 48]; let mut part_response = httparse::Response::new(&mut headers); - let (body, content_length) = match part_response.parse(&raw_part_response) { - Ok(httparse::Status::Complete(pos)) => { - ( - &raw_part_response[pos..], - part_response - .headers - .iter() - .find(|h| h.name.to_lowercase() == "content-length") - .and_then(|h| std::str::from_utf8(h.value).ok()) - .and_then(|v| v.parse::().ok()) - .unwrap_or_default() - ) - } - e => { - println!("{:?}", e); - return Err(invalid_response("unable to parse response").into()) - }, + match part_response.parse(&raw_part_response) { + Ok(httparse::Status::Complete(pos)) => {} + e => return Err(invalid_response("unable to parse response").into()), }; match (id, part_response.code) { From 2012b5bd7656b798a73cf117e9e5ff526d7a61e6 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 29 Jul 2024 17:44:15 -0300 Subject: [PATCH 08/14] Fix clippy #2 --- object_store/src/azure/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 302a02691f0c..97fe3522620d 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -413,8 +413,8 @@ async fn parse_blob_batch_delete_response( let mut headers = [httparse::EMPTY_HEADER; 48]; let mut part_response = httparse::Response::new(&mut headers); match part_response.parse(&raw_part_response) { - Ok(httparse::Status::Complete(pos)) => {} - e => return Err(invalid_response("unable to parse response").into()), + Ok(httparse::Status::Complete(_)) => {} + _ => return Err(invalid_response("unable to parse response").into()), }; match (id, part_response.code) { From 452bbb98ae95326d8dc389de43e97c05b3e321a5 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 29 Jul 2024 18:46:50 -0300 Subject: [PATCH 09/14] Reuse part response buffer --- object_store/src/azure/client.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 97fe3522620d..f1918dd5807d 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -311,7 +311,7 @@ fn write_headers(headers: &HeaderMap, dst: &mut Vec) { } } -/// https://docs.oasis-open.org/odata/odata/v4.0/errata02/os/complete/part1-protocol/odata-v4.0-errata02-os-part1-protocol-complete.html#_Toc406398359 +// https://docs.oasis-open.org/odata/odata/v4.0/errata02/os/complete/part1-protocol/odata-v4.0-errata02-os-part1-protocol-complete.html#_Toc406398359 fn serialize_part_delete_request( dst: &mut Vec, boundary: &str, @@ -383,28 +383,29 @@ async fn parse_blob_batch_delete_response( let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); + let mut part_response_buffer = Vec::with_capacity(2048); + while let Some(mut part) = multipart .next_field() .await .context(BulkDeleteRequestBodySnafu {})? { - let id = part .headers() .get("content-id") .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) .and_then(|v| v.parse::().ok()); - let mut raw_part_response = Vec::with_capacity(2048); + part_response_buffer.clear(); + while let Some(bytes) = part.chunk().await.context(BulkDeleteRequestBodySnafu {})? { - raw_part_response.extend_from_slice(&bytes); + part_response_buffer.extend_from_slice(&bytes); } // We add this extra CRLF because multer will unconditionally skip it even for requests with // an empty body. - raw_part_response.extend_from_slice(b"\r\n"); + part_response_buffer.extend_from_slice(b"\r\n"); - println!("{:?}", String::from_utf8_lossy(&raw_part_response)); // Parse part response headers // Documentation mentions 5 headers and states that other standard HTTP headers // may be provided, in order to not incurr in more complexity to support an arbitrary @@ -412,7 +413,7 @@ async fn parse_blob_batch_delete_response( // https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob?tabs=microsoft-entra-id#response-headers let mut headers = [httparse::EMPTY_HEADER; 48]; let mut part_response = httparse::Response::new(&mut headers); - match part_response.parse(&raw_part_response) { + match part_response.parse(&part_response_buffer) { Ok(httparse::Status::Complete(_)) => {} _ => return Err(invalid_response("unable to parse response").into()), }; From cd29eed43c1cb0c6cab1bd80a1cc6556e80031db Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 29 Jul 2024 19:06:46 -0300 Subject: [PATCH 10/14] Make multer conditional to azure feature --- object_store/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index c86df894836a..4564539280e3 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -57,14 +57,14 @@ tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-ut md-5 = { version = "0.10.6", default-features = false, optional = true } uuid = { version = "1.7.0", default-features = false, features = ["v4"], optional = true } httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } -multer = "3.1.0" +multer = { version = "3.1.0", default-features = false, optional = true } [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.29.0", features = ["fs"] } [features] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] -azure = ["cloud", "uuid", "httparse"] +azure = ["cloud", "uuid", "httparse", "multer"] gcp = ["cloud", "rustls-pemfile"] aws = ["cloud", "md-5"] http = ["cloud"] From 3b44eef319cc1e55aea24b0f6b29616aaf385619 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Tue, 30 Jul 2024 10:00:31 -0300 Subject: [PATCH 11/14] One more HeaderValue::from_static --- object_store/src/azure/client.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index f1918dd5807d..7e719eb1c719 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -327,7 +327,10 @@ fn serialize_part_delete_request( // Encode part headers let mut part_headers = HeaderMap::new(); part_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/http")); - part_headers.insert("Content-Transfer-Encoding", "binary".parse().unwrap()); + part_headers.insert( + "Content-Transfer-Encoding", + HeaderValue::from_static("binary"), + ); // Azure returns 400 if we send `Content-Id` instead of `Content-ID` part_headers.insert("Content-ID", HeaderValue::from(idx)); write_headers(&part_headers, dst); From 412c3b23297b3594ec249518f33969a193c72bd9 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Tue, 5 Nov 2024 13:49:04 -0300 Subject: [PATCH 12/14] Add tests for bulk delete request building and response parsing --- object_store/Cargo.toml | 2 + object_store/src/azure/client.rs | 188 +++++++++++++++++++++++++++++-- 2 files changed, 178 insertions(+), 12 deletions(-) diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 4564539280e3..d6d66fc66e98 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -78,6 +78,8 @@ hyper-util = "0.1" http-body-util = "0.1" rand = "0.8" tempfile = "3.1.0" +regex = "1.11.1" +http = "1.1.0" [[test]] name = "get_range_file" diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 7e719eb1c719..ca8af934573f 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -589,16 +589,12 @@ impl AzureClient { Ok(()) } - pub async fn bulk_delete_request(&self, paths: Vec) -> Result>> { - if paths.is_empty() { - return Ok(Vec::new()); - } - - let credential = self.get_credential().await?; - - // https://www.ietf.org/rfc/rfc2046 - let boundary = format!("batch_{}", uuid::Uuid::new_v4()); - + fn build_bulk_delete_body( + &self, + boundary: &str, + paths: &[Path], + credential: &Option>, + ) -> Vec { let mut body_bytes = Vec::with_capacity(paths.len() * 2048); for (idx, path) in paths.iter().enumerate() { @@ -612,14 +608,14 @@ impl AzureClient { // Each subrequest must be authorized individually [1] and we use // the CredentialExt for this. // [1]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body - .with_azure_authorization(&credential, &self.config.account) + .with_azure_authorization(credential, &self.config.account) .build() .unwrap(); // Url for part requests must be relative and without base let relative_url = self.config.service.make_relative(request.url()).unwrap(); - serialize_part_delete_request(&mut body_bytes, &boundary, idx, request, relative_url) + serialize_part_delete_request(&mut body_bytes, boundary, idx, request, relative_url) } // Encode end marker @@ -627,6 +623,20 @@ impl AzureClient { extend(&mut body_bytes, boundary.as_bytes()); extend(&mut body_bytes, b"--"); extend(&mut body_bytes, b"\r\n"); + body_bytes + } + + pub(crate) async fn bulk_delete_request(&self, paths: Vec) -> Result>> { + if paths.is_empty() { + return Ok(Vec::new()); + } + + let credential = self.get_credential().await?; + + // https://www.ietf.org/rfc/rfc2046 + let boundary = format!("batch_{}", uuid::Uuid::new_v4()); + + let body_bytes = self.build_bulk_delete_body(&boundary, &paths, &credential); // Send multipart request let url = self.config.path_url(&Path::from("/")); @@ -1085,8 +1095,10 @@ pub(crate) struct UserDelegationKey { #[cfg(test)] mod tests { use bytes::Bytes; + use regex::bytes::Regex; use super::*; + use crate::StaticCredentialProvider; #[test] fn deserde_azure() { @@ -1276,4 +1288,156 @@ mod tests { let _delegated_key_response_internal: UserDelegationKey = quick_xml::de::from_str(S).unwrap(); } + + #[tokio::test] + async fn test_build_bulk_delete_body() { + let credential_provider = Arc::new(StaticCredentialProvider::new( + AzureCredential::BearerToken("static-token".to_string()), + )); + + let config = AzureConfig { + account: "testaccount".to_string(), + container: "testcontainer".to_string(), + credentials: credential_provider, + service: "http://example.com".try_into().unwrap(), + retry_config: Default::default(), + is_emulator: false, + skip_signature: false, + disable_tagging: false, + client_options: Default::default(), + }; + + let client = AzureClient::new(config).unwrap(); + + let credential = client.get_credential().await.unwrap(); + let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; + + let boundary = "batch_statictestboundary".to_string(); + + let body_bytes = client.build_bulk_delete_body(&boundary, paths, &credential); + + // Replace Date header value with a static date + let re = Regex::new("Date:[^\r]+").unwrap(); + let body_bytes = re + .replace_all(&body_bytes, b"Date: Tue, 05 Nov 2024 15:01:15 GMT") + .to_vec(); + + let expected_body = b"--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 0\r +\r +DELETE /testcontainer/a HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 1\r +\r +DELETE /testcontainer/b HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary\r +Content-Type: application/http\r +Content-Transfer-Encoding: binary\r +Content-ID: 2\r +\r +DELETE /testcontainer/c HTTP/1.1\r +Content-Length: 0\r +Date: Tue, 05 Nov 2024 15:01:15 GMT\r +X-Ms-Version: 2023-11-03\r +Authorization: Bearer static-token\r +\r +\r +--batch_statictestboundary--\r\n" + .to_vec(); + + assert_eq!(expected_body, body_bytes); + } + + #[tokio::test] + async fn test_parse_blob_batch_delete_response() { + let response_body = b"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 0\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 1\r +\r +HTTP/1.1 202 Accepted\r +x-ms-delete-type-permanent: true\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2851\r +x-ms-version: 2018-11-09\r +\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r +Content-Type: application/http\r +Content-ID: 2\r +\r +HTTP/1.1 404 The specified blob does not exist.\r +x-ms-error-code: BlobNotFound\r +x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r +x-ms-version: 2018-11-09\r +Content-Length: 216\r +Content-Type: application/xml\r +\r + +BlobNotFoundThe specified blob does not exist. +RequestId:778fdc83-801e-0000-62ff-0334671e2852 +Time:2018-06-14T16:46:54.6040685Z\r +--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r\n"; + + let response: reqwest::Response = http::Response::builder() + .status(202) + .header("Transfer-Encoding", "chunked") + .header( + "Content-Type", + "multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed", + ) + .header("x-ms-request-id", "778fdc83-801e-0000-62ff-033467000000") + .header("x-ms-version", "2018-11-09") + .body(Bytes::from(response_body.as_slice())) + .unwrap() + .into(); + + let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; + + let results = parse_blob_batch_delete_response(response, paths) + .await + .unwrap(); + + assert!(results[0].is_ok()); + assert_eq!(&paths[0], results[0].as_ref().unwrap()); + + assert!(results[1].is_ok()); + assert_eq!(&paths[1], results[1].as_ref().unwrap()); + + assert!(results[2].is_err()); + let err = results[2].as_ref().unwrap_err(); + let crate::Error::NotFound { source, .. } = err else { + unreachable!("must be not found") + }; + let Some(Error::DeleteFailed { path, code, reason }) = source.downcast_ref::() + else { + unreachable!("must be client error") + }; + + assert_eq!(paths[2].as_ref(), path); + assert_eq!("404", code); + assert_eq!("The specified blob does not exist.", reason); + } } From 87ad3f146d0c7d112a3ffacea9081d10f95c6e87 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Fri, 22 Nov 2024 18:33:53 -0300 Subject: [PATCH 13/14] Switch back to manual parsing to avoid multer dependency, other PR suggestions --- object_store/Cargo.toml | 4 +- object_store/src/azure/client.rs | 121 ++++++++++++++++++++++--------- 2 files changed, 87 insertions(+), 38 deletions(-) diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index d6d66fc66e98..536874fc9eef 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -55,16 +55,14 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] } md-5 = { version = "0.10.6", default-features = false, optional = true } -uuid = { version = "1.7.0", default-features = false, features = ["v4"], optional = true } httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } -multer = { version = "3.1.0", default-features = false, optional = true } [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.29.0", features = ["fs"] } [features] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] -azure = ["cloud", "uuid", "httparse", "multer"] +azure = ["cloud", "httparse"] gcp = ["cloud", "rustls-pemfile"] aws = ["cloud", "md-5"] http = ["cloud"] diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index ca8af934573f..3b48bdbec650 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -31,7 +31,7 @@ use crate::{ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, }; use async_trait::async_trait; -use base64::prelude::BASE64_STANDARD; +use base64::prelude::{BASE64_STANDARD, BASE64_STANDARD_NO_PAD}; use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; @@ -83,7 +83,7 @@ pub(crate) enum Error { BulkDeleteRequest { source: crate::client::retry::Error }, #[snafu(display("Error receiving bulk delete request body: {}", source))] - BulkDeleteRequestBody { source: multer::Error }, + BulkDeleteRequestBody { source: reqwest::Error }, #[snafu(display( "Bulk delete request failed due to invalid input: {} (code: {})", @@ -370,44 +370,86 @@ fn parse_multipart_response_boundary(response: &Response) -> Result { Ok(boundary) } -async fn parse_blob_batch_delete_response( - batch_response: Response, - paths: &[Path], -) -> Result>> { - let invalid_response = |msg: &str| Error::InvalidBulkDeleteResponse { +fn invalid_response(msg: &str) -> Error { + Error::InvalidBulkDeleteResponse { reason: msg.to_string(), - }; - - let boundary = parse_multipart_response_boundary(&batch_response)?; + } +} - let stream = batch_response.bytes_stream(); +#[derive(Debug)] +struct MultipartField { + headers: HeaderMap, + content: Bytes +} - let mut multipart = multer::Multipart::new(stream, boundary); +fn parse_multipart_body_fields(body: Bytes, boundary: &[u8]) -> Result> { + let start_marker = [b"--", boundary, b"\r\n"].concat(); + let next_marker = &start_marker[..start_marker.len() - 2]; + let end_marker = [b"--", boundary, b"--\r\n"].concat(); + + // There should be at most 256 responses per batch + let mut fields = Vec::with_capacity(256); + let mut remaining: &[u8] = body.as_ref(); + loop { + remaining = remaining + .strip_prefix(start_marker.as_slice()) + .ok_or_else(|| { + invalid_response("missing start marker for field") + })?; - let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); + // The documentation only mentions two headers for fields, we leave some extra margin + let mut scratch = [httparse::EMPTY_HEADER; 10]; + let mut headers = HeaderMap::new(); + match httparse::parse_headers(remaining, &mut scratch) { + Ok(httparse::Status::Complete((pos, headers_slice))) => { + remaining = &remaining[pos..]; + for header in headers_slice { + headers.insert( + HeaderName::from_bytes(header.name.as_bytes()).expect("valid"), + HeaderValue::from_bytes(header.value).expect("valid") + ); + } + } + _ => { + return Err(invalid_response("unable to parse field headers").into()) + } + }; - let mut part_response_buffer = Vec::with_capacity(2048); + let next_pos = remaining + .windows(next_marker.len()) + .position(|window| window == next_marker) + .ok_or_else(|| invalid_response("early EOF while seeking to next boundary"))?; - while let Some(mut part) = multipart - .next_field() - .await - .context(BulkDeleteRequestBodySnafu {})? - { - let id = part - .headers() - .get("content-id") - .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) - .and_then(|v| v.parse::().ok()); + fields.push(MultipartField { + headers, + content: body.slice_ref(&remaining[..next_pos]) + }); - part_response_buffer.clear(); + remaining = &remaining[next_pos..]; - while let Some(bytes) = part.chunk().await.context(BulkDeleteRequestBodySnafu {})? { - part_response_buffer.extend_from_slice(&bytes); + if remaining == end_marker + || remaining == &end_marker[..end_marker.len() - 2] // Missing final CRLF + { + break; } + } + Ok(fields) +} + +async fn parse_blob_batch_delete_body( + batch_body: Bytes, + boundary: String, + paths: &[Path], +) -> Result>> { - // We add this extra CRLF because multer will unconditionally skip it even for requests with - // an empty body. - part_response_buffer.extend_from_slice(b"\r\n"); + let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); + + for field in parse_multipart_body_fields(batch_body, boundary.as_bytes())? { + let id = field + .headers + .get("content-id") + .and_then(|v| std::str::from_utf8(v.as_bytes()).ok()) + .and_then(|v| v.parse::().ok()); // Parse part response headers // Documentation mentions 5 headers and states that other standard HTTP headers @@ -416,7 +458,7 @@ async fn parse_blob_batch_delete_response( // https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob?tabs=microsoft-entra-id#response-headers let mut headers = [httparse::EMPTY_HEADER; 48]; let mut part_response = httparse::Response::new(&mut headers); - match part_response.parse(&part_response_buffer) { + match part_response.parse(&field.content) { Ok(httparse::Status::Complete(_)) => {} _ => return Err(invalid_response("unable to parse response").into()), }; @@ -634,7 +676,8 @@ impl AzureClient { let credential = self.get_credential().await?; // https://www.ietf.org/rfc/rfc2046 - let boundary = format!("batch_{}", uuid::Uuid::new_v4()); + let random_bytes = rand::random::<[u8; 16]>(); // 128 bits + let boundary = format!("batch_{}", BASE64_STANDARD_NO_PAD.encode(&random_bytes)); let body_bytes = self.build_bulk_delete_body(&boundary, &paths, &credential); @@ -656,7 +699,12 @@ impl AzureClient { .await .context(BulkDeleteRequestSnafu {})?; - let results = parse_blob_batch_delete_response(batch_response, &paths).await?; + let boundary = parse_multipart_response_boundary(&batch_response)?; + + let batch_body = batch_response.bytes().await + .context(BulkDeleteRequestBodySnafu {})?; + + let results = parse_blob_batch_delete_body(batch_body, boundary, &paths).await?; Ok(results) } @@ -1365,7 +1413,7 @@ Authorization: Bearer static-token\r } #[tokio::test] - async fn test_parse_blob_batch_delete_response() { + async fn test_parse_blob_batch_delete_body() { let response_body = b"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r Content-Type: application/http\r Content-ID: 0\r @@ -1414,9 +1462,12 @@ Time:2018-06-14T16:46:54.6040685Z\r .unwrap() .into(); + let boundary = parse_multipart_response_boundary(&response).unwrap(); + let body = response.bytes().await.unwrap(); + let paths = &[Path::from("a"), Path::from("b"), Path::from("c")]; - let results = parse_blob_batch_delete_response(response, paths) + let results = parse_blob_batch_delete_body(body, boundary, paths) .await .unwrap(); From 68d023f420380d9b473703bb2b7a436b5c52a996 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Fri, 22 Nov 2024 18:37:26 -0300 Subject: [PATCH 14/14] Fixes lint --- object_store/src/azure/client.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 3b48bdbec650..76dedd71aa50 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -379,7 +379,7 @@ fn invalid_response(msg: &str) -> Error { #[derive(Debug)] struct MultipartField { headers: HeaderMap, - content: Bytes + content: Bytes, } fn parse_multipart_body_fields(body: Bytes, boundary: &[u8]) -> Result> { @@ -393,9 +393,7 @@ fn parse_multipart_body_fields(body: Bytes, boundary: &[u8]) -> Result Result { - return Err(invalid_response("unable to parse field headers").into()) - } + _ => return Err(invalid_response("unable to parse field headers").into()), }; let next_pos = remaining @@ -422,14 +418,13 @@ fn parse_multipart_body_fields(body: Bytes, boundary: &[u8]) -> Result Result>> { - let mut results: Vec> = paths.iter().cloned().map(Ok).collect(); for field in parse_multipart_body_fields(batch_body, boundary.as_bytes())? { @@ -677,7 +671,7 @@ impl AzureClient { // https://www.ietf.org/rfc/rfc2046 let random_bytes = rand::random::<[u8; 16]>(); // 128 bits - let boundary = format!("batch_{}", BASE64_STANDARD_NO_PAD.encode(&random_bytes)); + let boundary = format!("batch_{}", BASE64_STANDARD_NO_PAD.encode(random_bytes)); let body_bytes = self.build_bulk_delete_body(&boundary, &paths, &credential); @@ -701,7 +695,9 @@ impl AzureClient { let boundary = parse_multipart_response_boundary(&batch_response)?; - let batch_body = batch_response.bytes().await + let batch_body = batch_response + .bytes() + .await .context(BulkDeleteRequestBodySnafu {})?; let results = parse_blob_batch_delete_body(batch_body, boundary, &paths).await?;