Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[storage] Automatically set Content-Length: 0 when uploading body is empty #191

Merged
merged 3 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "google-cloud-storage"
version = "0.13.0"
version = "0.13.1"
edition = "2021"
authors = ["yoshidan <[email protected]>"]
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/storage"
Expand Down
100 changes: 79 additions & 21 deletions storage/src/http/storage_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use futures_util::{Stream, TryStream, TryStreamExt};
use reqwest::header::LOCATION;
use reqwest::{Body, Client, RequestBuilder};
use reqwest::header::{HeaderValue, CONTENT_LENGTH, LOCATION};
use reqwest::{Body, Client, Request, RequestBuilder};

use google_cloud_token::TokenSource;

Expand Down Expand Up @@ -1034,7 +1034,19 @@ impl StorageClient {
}
UploadType::Simple(media) => {
let builder = objects::upload::build(self.v1_upload_endpoint.as_str(), &self.http, req, media, data);
self.send(builder).await
let builder = self.with_headers(builder).await?;
let mut request = builder.build()?;
// In the case of not streamed and 0 bytes, Content-Length=0 must be explicitly specified.
if !request.headers().contains_key(CONTENT_LENGTH) {
if let Some(Some(is_empty)) = request.body().map(|b| b.as_bytes().map(|b| b.is_empty())) {
if is_empty {
request
.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
}
}
}
self.send_request(request).await
}
}
}
Expand Down Expand Up @@ -1277,12 +1289,21 @@ impl StorageClient {
.header(reqwest::header::AUTHORIZATION, token))
}

async fn send_request<T>(&self, request: Request) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let response = self.http.execute(request).await?;
let response = check_response_status(response).await?;
Ok(response.json().await?)
}

async fn send<T>(&self, builder: RequestBuilder) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let request = self.with_headers(builder).await?;
let response = request.send().await?;
let builder = self.with_headers(builder).await?;
let response = builder.send().await?;
let response = check_response_status(response).await?;
Ok(response.json().await?)
}
Expand Down Expand Up @@ -1366,7 +1387,7 @@ mod test {

#[ctor::ctor]
fn init() {
let _ = tracing_subscriber::fmt::try_init();
let _ = tracing_subscriber::fmt().try_init();
}

async fn client() -> (StorageClient, String) {
Expand Down Expand Up @@ -1845,17 +1866,30 @@ mod test {
..Default::default()
},
vec![1, 2, 3, 4, 5, 6],
&UploadType::Simple(media),
&UploadType::Simple(media.clone()),
)
.await
.unwrap();

assert_eq!(uploaded.content_type.unwrap(), "text/plain".to_string());

let download = |range: Range| {
let media = Media::new("test1_zero");
let uploaded_empty = client
.upload_object(
&UploadObjectRequest {
bucket: bucket_name.to_string(),
..Default::default()
},
vec![],
&UploadType::Simple(media),
)
.await
.unwrap();

let download = |name: &str, range: Range| {
let client = client.clone();
let bucket_name = uploaded.bucket.clone();
let object_name = uploaded.name.clone();
let object_name = name.to_string();
async move {
client
.download_object(
Expand All @@ -1871,15 +1905,18 @@ mod test {
}
};

let downloaded = download(Range::default()).await;
let downloaded = download(&uploaded.name, Range::default()).await;
assert_eq!(downloaded, vec![1, 2, 3, 4, 5, 6]);
let downloaded = download(Range(Some(1), None)).await;
let downloaded = download(&uploaded.name, Range(Some(1), None)).await;
assert_eq!(downloaded, vec![2, 3, 4, 5, 6]);
let downloaded = download(Range(Some(1), Some(2))).await;
let downloaded = download(&uploaded.name, Range(Some(1), Some(2))).await;
assert_eq!(downloaded, vec![2, 3]);
let downloaded = download(Range(None, Some(2))).await;
let downloaded = download(&uploaded.name, Range(None, Some(2))).await;
assert_eq!(downloaded, vec![5, 6]);

let downloaded = download(&uploaded_empty.name, Range::default()).await;
assert!(downloaded.is_empty());

let _copied = client
.copy_object(&CopyObjectRequest {
destination_bucket: bucket_name.to_string(),
Expand Down Expand Up @@ -1932,11 +1969,10 @@ mod test {

// let stream= reqwest::Client::default().get("https://avatars.githubusercontent.com/u/958174?s=96&v=4").send().await.unwrap().bytes_stream();
let source = vec!["hello", " ", "world"];
let size = source.iter().map(|x| x.len() as u64).sum();
let size: u64 = source.iter().map(|x| x.len() as u64).sum();
let chunks: Vec<Result<_, ::std::io::Error>> = source.clone().into_iter().map(Ok).collect();
let stream = futures_util::stream::iter(chunks);
let mut media = Media::new(file_name);
media.content_length = Some(size);
let upload_type = UploadType::Simple(media);
let uploaded = client
.upload_streamed_object(
Expand All @@ -1951,10 +1987,29 @@ mod test {
.await
.unwrap();

let download = |range: Range| {
let file_name = format!("stream_empty_{}", time::OffsetDateTime::now_utc().unix_timestamp());
let source: Vec<&str> = vec![];
let chunks: Vec<Result<_, ::std::io::Error>> = source.clone().into_iter().map(Ok).collect();
let stream = futures_util::stream::iter(chunks);
let media = Media::new(file_name);
let upload_type = UploadType::Simple(media);
let uploaded_empty = client
.upload_streamed_object(
&UploadObjectRequest {
bucket: bucket_name.to_string(),
predefined_acl: None,
..Default::default()
},
stream,
&upload_type,
)
.await
.unwrap();

let download = |name: &str, range: Range| {
let client = client.clone();
let bucket_name = uploaded.bucket.clone();
let object_name = uploaded.name.clone();
let object_name = name.to_string();
async move {
let mut downloaded = client
.download_streamed_object(
Expand All @@ -1975,14 +2030,17 @@ mod test {
data
}
};
let downloaded = download(Range::default()).await;
let downloaded = download(&uploaded.name, Range::default()).await;
assert_eq!("hello world", String::from_utf8_lossy(downloaded.as_slice()));
let downloaded = download(Range(Some(1), None)).await;
let downloaded = download(&uploaded.name, Range(Some(1), None)).await;
assert_eq!("ello world", String::from_utf8_lossy(downloaded.as_slice()));
let downloaded = download(Range(Some(1), Some(2))).await;
let downloaded = download(&uploaded.name, Range(Some(1), Some(2))).await;
assert_eq!("el", String::from_utf8_lossy(downloaded.as_slice()));
let downloaded = download(Range(None, Some(2))).await;
let downloaded = download(&uploaded.name, Range(None, Some(2))).await;
assert_eq!("ld", String::from_utf8_lossy(downloaded.as_slice()));

let downloaded = download(&uploaded_empty.name, Range::default()).await;
assert!(downloaded.is_empty());
}

#[tokio::test]
Expand Down