Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: Add support for requester pays buckets #6768

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct AmazonS3Builder {
encryption_bucket_key_enabled: Option<ConfigValue<bool>>,
/// base64-encoded 256-bit customer encryption key for SSE-C.
encryption_customer_key_base64: Option<String>,
/// When set to true, charge requester for bucket operations
request_payer: ConfigValue<bool>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -330,6 +332,13 @@ pub enum AmazonS3ConfigKey {
/// - `s3_express`
S3Express,

/// Enable Support for S3 Requester Pays
///
/// Supported keys:
/// - `aws_request_payer`
/// - `request_payer`
RequestPayer,

/// Client options
Client(ClientConfigKey),

Expand Down Expand Up @@ -358,6 +367,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::DisableTagging => "aws_disable_tagging",
Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
}
Expand Down Expand Up @@ -389,6 +399,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
"aws_server_side_encryption" => Ok(Self::Encryption(
Expand Down Expand Up @@ -510,6 +521,9 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ConditionalPut => {
self.conditional_put = Some(ConfigValue::Deferred(value.into()))
}
AmazonS3ConfigKey::RequestPayer => {
self.request_payer = ConfigValue::Deferred(value.into())
}
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
self.encryption_type = Some(ConfigValue::Deferred(value.into()))
Expand Down Expand Up @@ -567,6 +581,7 @@ impl AmazonS3Builder {
self.conditional_put.as_ref().map(ToString::to_string)
}
AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
AmazonS3ConfigKey::RequestPayer => Some(self.request_payer.to_string()),
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
self.encryption_type.as_ref().map(ToString::to_string)
Expand Down Expand Up @@ -845,6 +860,14 @@ impl AmazonS3Builder {
self
}

/// Set whether to charge requester for bucket operations.
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html>
pub fn with_request_payer(mut self, enabled: bool) -> Self {
self.request_payer = ConfigValue::Parsed(enabled);
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -996,6 +1019,7 @@ impl AmazonS3Builder {
copy_if_not_exists,
conditional_put: put_precondition,
encryption_headers,
request_payer: self.request_payer.get()?,
};

let client = Arc::new(S3Client::new(config)?);
Expand Down
4 changes: 3 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub(crate) struct S3Config {
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
pub request_payer: bool,
pub(super) encryption_headers: S3EncryptionHeaders,
}

Expand Down Expand Up @@ -249,7 +250,8 @@ impl<'a> SessionCredential<'a> {
fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
let mut authorizer =
AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region)
.with_sign_payload(self.config.sign_payload);
.with_sign_payload(self.config.sign_payload)
.with_request_payer(self.config.request_payer);

if self.session_token {
let token = HeaderName::from_static("x-amz-s3session-token");
Expand Down
114 changes: 114 additions & 0 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ pub struct AwsAuthorizer<'a> {
region: &'a str,
token_header: Option<HeaderName>,
sign_payload: bool,
request_payer: bool,
}

static DATE_HEADER: HeaderName = HeaderName::from_static("x-amz-date");
static HASH_HEADER: HeaderName = HeaderName::from_static("x-amz-content-sha256");
static TOKEN_HEADER: HeaderName = HeaderName::from_static("x-amz-security-token");
static REQUEST_PAYER_HEADER: HeaderName = HeaderName::from_static("x-amz-request-payer");
static REQUEST_PAYER_HEADER_VALUE: HeaderValue = HeaderValue::from_static("requester");
const ALGORITHM: &str = "AWS4-HMAC-SHA256";

impl<'a> AwsAuthorizer<'a> {
Expand All @@ -118,6 +121,7 @@ impl<'a> AwsAuthorizer<'a> {
date: None,
sign_payload: true,
token_header: None,
request_payer: false,
}
}

Expand All @@ -134,6 +138,14 @@ impl<'a> AwsAuthorizer<'a> {
self
}

/// Set whether to include requester pays headers
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html>
pub fn with_request_payer(mut self, request_payer: bool) -> Self {
self.request_payer = request_payer;
self
}

/// Authorize `request` with an optional pre-calculated SHA256 digest by attaching
/// the relevant [AWS SigV4] headers
///
Expand Down Expand Up @@ -180,6 +192,15 @@ impl<'a> AwsAuthorizer<'a> {
let header_digest = HeaderValue::from_str(&digest).unwrap();
request.headers_mut().insert(&HASH_HEADER, header_digest);

if self.request_payer {
// For DELETE, GET, HEAD, POST, and PUT requests, include x-amz-request-payer :
// requester in the header
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
request
.headers_mut()
.insert(&REQUEST_PAYER_HEADER, REQUEST_PAYER_HEADER_VALUE.clone());
}

let (signed_headers, canonical_headers) = canonicalize_headers(request.headers());

let scope = self.scope(date);
Expand Down Expand Up @@ -226,6 +247,13 @@ impl<'a> AwsAuthorizer<'a> {
.append_pair("X-Amz-Expires", &expires_in.as_secs().to_string())
.append_pair("X-Amz-SignedHeaders", "host");

if self.request_payer {
// For signed URLs, include x-amz-request-payer=requester in the request
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
url.query_pairs_mut()
.append_pair("x-amz-request-payer", "requester");
}

// For S3, you must include the X-Amz-Security-Token query parameter in the URL if
// using credentials sourced from the STS service.
if let Some(ref token) = self.credential.token {
Expand Down Expand Up @@ -763,12 +791,53 @@ mod tests {
region: "us-east-1",
sign_payload: true,
token_header: None,
request_payer: false,
};

signer.authorize(&mut request, None);
assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=a3c787a7ed37f7fdfbfd2d7056a3d7c9d85e6d52a2bfbec73793c0be6e7862d4")
}

#[test]
fn test_sign_with_signed_payload_request_payer() {
let client = Client::new();

// Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html
let credential = AwsCredential {
key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
token: None,
};

// method = 'GET'
// service = 'ec2'
// host = 'ec2.amazonaws.com'
// region = 'us-east-1'
// endpoint = 'https://ec2.amazonaws.com'
// request_parameters = ''
let date = DateTime::parse_from_rfc3339("2022-08-06T18:01:34Z")
.unwrap()
.with_timezone(&Utc);

let mut request = client
.request(Method::GET, "https://ec2.amazon.com/")
.build()
.unwrap();

let signer = AwsAuthorizer {
date: Some(date),
credential: &credential,
service: "ec2",
region: "us-east-1",
sign_payload: true,
token_header: None,
request_payer: true,
};

signer.authorize(&mut request, None);
assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-request-payer, Signature=7030625a9e9b57ed2a40e63d749f4a4b7714b6e15004cab026152f870dd8565d")
}

#[test]
fn test_sign_with_unsigned_payload() {
let client = Client::new();
Expand Down Expand Up @@ -802,6 +871,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: false,
request_payer: false,
};

authorizer.authorize(&mut request, None);
Expand All @@ -828,6 +898,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: false,
request_payer: false,
};

let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
Expand All @@ -848,6 +919,48 @@ mod tests {
);
}

#[test]
fn signed_get_url_request_payer() {
// Values from https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
let credential = AwsCredential {
key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
token: None,
};

let date = DateTime::parse_from_rfc3339("2013-05-24T00:00:00Z")
.unwrap()
.with_timezone(&Utc);

let authorizer = AwsAuthorizer {
date: Some(date),
credential: &credential,
service: "s3",
region: "us-east-1",
token_header: None,
sign_payload: false,
request_payer: true,
};

let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400));

assert_eq!(
url,
Url::parse(
"https://examplebucket.s3.amazonaws.com/test.txt?\
X-Amz-Algorithm=AWS4-HMAC-SHA256&\
X-Amz-Credential=AKIAIOSFODNN7EXAMPLE%2F20130524%2Fus-east-1%2Fs3%2Faws4_request&\
X-Amz-Date=20130524T000000Z&\
X-Amz-Expires=86400&\
X-Amz-SignedHeaders=host&\
x-amz-request-payer=requester&\
X-Amz-Signature=9ad7c781cc30121f199b47d35ed3528473e4375b63c5d91cd87c927803e4e00a"
)
.unwrap()
);
}

#[test]
fn test_sign_port() {
let client = Client::new();
Expand Down Expand Up @@ -880,6 +993,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: true,
request_payer: false,
};

authorizer.authorize(&mut request, None);
Expand Down
3 changes: 2 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ impl Signer for AmazonS3 {
/// ```
async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
let credential = self.credentials().get_credential().await?;
let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region);
let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region)
.with_request_payer(self.client.config.request_payer);

let path_url = self.path_url(path);
let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
Expand Down
Loading