From dc6ede3db1ea88f1115eb8d67d1d8ca0824816c1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 Oct 2023 17:37:58 +0100 Subject: [PATCH 01/11] Implement DynamoDBLock (#4880) --- .github/workflows/object_store.yml | 2 + object_store/src/aws/builder.rs | 24 +- object_store/src/aws/client.rs | 99 +++---- object_store/src/aws/dynamo.rs | 416 +++++++++++++++++++++++++++ object_store/src/aws/mod.rs | 46 ++- object_store/src/aws/precondition.rs | 11 + 6 files changed, 508 insertions(+), 90 deletions(-) create mode 100644 object_store/src/aws/dynamo.rs diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 1b991e33c097..06ba203c55b0 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -112,6 +112,7 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" @@ -136,6 +137,7 @@ jobs: docker run -d -p 4566:4566 localstack/localstack:2.0 docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2 aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket + aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 - name: Configure Azurite (Azure emulation) # the magical connection string is from diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs index cf9490d96eae..8798150196a8 100644 --- a/object_store/src/aws/builder.rs +++ b/object_store/src/aws/builder.rs @@ -844,27 +844,23 @@ impl AmazonS3Builder { )) as _ }; - let endpoint: String; - let bucket_endpoint: String; - // If `endpoint` is provided then its assumed to be consistent with // `virtual_hosted_style_request`. i.e. if `virtual_hosted_style_request` is true then // `endpoint` should have bucket name included. - if self.virtual_hosted_style_request.get()? { - endpoint = self - .endpoint - .unwrap_or_else(|| format!("https://{bucket}.s3.{region}.amazonaws.com")); - bucket_endpoint = endpoint.clone(); + let bucket_endpoint = if self.virtual_hosted_style_request.get()? { + self.endpoint + .clone() + .unwrap_or_else(|| format!("https://{bucket}.s3.{region}.amazonaws.com")) } else { - endpoint = self - .endpoint - .unwrap_or_else(|| format!("https://s3.{region}.amazonaws.com")); - bucket_endpoint = format!("{endpoint}/{bucket}"); - } + match &self.endpoint { + None => format!("https://s3.{region}.amazonaws.com/{bucket}"), + Some(endpoint) => format!("{endpoint}/{bucket}"), + } + }; let config = S3Config { region, - endpoint, + endpoint: self.endpoint, bucket, bucket_endpoint, credentials, diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 3e47abd4bcc5..203d03d19910 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -21,7 +21,7 @@ use crate::aws::{ AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, }; use crate::client::get::GetClient; -use crate::client::header::HeaderConfig; +use crate::client::header::{get_etag, HeaderConfig}; use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; @@ -39,13 +39,14 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; +use hyper::http; use hyper::http::HeaderName; use itertools::Itertools; use percent_encoding::{utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; use reqwest::{ header::{CONTENT_LENGTH, CONTENT_TYPE}, - Client as ReqwestClient, Method, RequestBuilder, Response, StatusCode, + Client as ReqwestClient, Method, RequestBuilder, Response, }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; @@ -196,7 +197,7 @@ impl From for Error { #[derive(Debug)] pub struct S3Config { pub region: String, - pub endpoint: String, + pub endpoint: Option, pub bucket: String, pub bucket_endpoint: String, pub credentials: AwsCredentialProvider, @@ -215,7 +216,7 @@ impl S3Config { format!("{}/{}", self.bucket_endpoint, encode_path(path)) } - async fn get_credential(&self) -> Result>> { + pub(crate) async fn get_credential(&self) -> Result>> { Ok(match self.skip_signature { false => Some(self.credentials.get_credential().await?), true => None, @@ -223,26 +224,30 @@ impl S3Config { } } -/// A builder for a put request allowing customisation of the headers and query string -pub(crate) struct PutRequest<'a> { +/// A builder for a request allowing customisation of the headers and query string +pub(crate) struct Request<'a> { path: &'a Path, config: &'a S3Config, builder: RequestBuilder, payload_sha256: Option>, } -impl<'a> PutRequest<'a> { +impl<'a> Request<'a> { pub fn query(self, query: &T) -> Self { let builder = self.builder.query(query); Self { builder, ..self } } - pub fn header(self, k: &HeaderName, v: &str) -> Self { + pub fn header(self, k: K, v: &str) -> Self + where + HeaderName: TryFrom, + >::Error: Into, + { let builder = self.builder.header(k, v); Self { builder, ..self } } - pub async fn send(self) -> Result { + pub async fn send(self) -> Result { let credential = self.config.get_credential().await?; let response = self @@ -260,14 +265,19 @@ impl<'a> PutRequest<'a> { path: self.path.as_ref(), })?; + Ok(response) + } + + pub async fn do_put(self) -> Result { + let response = self.send().await?; Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } } #[derive(Debug)] pub(crate) struct S3Client { - config: S3Config, - client: ReqwestClient, + pub config: S3Config, + pub client: ReqwestClient, } impl S3Client { @@ -276,20 +286,15 @@ impl S3Client { Ok(Self { config, client }) } - /// Returns the config - pub fn config(&self) -> &S3Config { - &self.config - } - /// Make an S3 PUT request /// /// Returns the ETag - pub fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> PutRequest<'a> { + pub fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> Request<'a> { let url = self.config.path_url(path); let mut builder = self.client.request(Method::PUT, url); let mut payload_sha256 = None; - if let Some(checksum) = self.config().checksum { + if let Some(checksum) = self.config.checksum { let digest = checksum.digest(&bytes); builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest)); if checksum == Checksum::SHA256 { @@ -302,11 +307,11 @@ impl S3Client { false => builder.body(bytes), }; - if let Some(value) = self.config().client_options.get_content_type(path) { + if let Some(value) = self.config.client_options.get_content_type(path) { builder = builder.header(CONTENT_TYPE, value); } - PutRequest { + Request { path, builder, payload_sha256, @@ -400,7 +405,7 @@ impl S3Client { // Compute checksum - S3 *requires* this for DeleteObjects requests, so we default to // their algorithm if the user hasn't specified one. - let checksum = self.config().checksum.unwrap_or(Checksum::SHA256); + let checksum = self.config.checksum.unwrap_or(Checksum::SHA256); let digest = checksum.digest(&body); builder = builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest)); let payload_sha256 = if checksum == Checksum::SHA256 { @@ -451,52 +456,21 @@ impl S3Client { } /// Make an S3 Copy request - pub async fn copy_request(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { - let credential = self.config.get_credential().await?; + pub fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) -> Request<'a> { let url = self.config.path_url(to); let source = format!("{}/{}", self.config.bucket, encode_path(from)); - let mut builder = self + let builder = self .client .request(Method::PUT, url) .header("x-amz-copy-source", source); - if !overwrite { - match &self.config.copy_if_not_exists { - Some(S3CopyIfNotExists::Header(k, v)) => { - builder = builder.header(k, v); - } - None => { - return Err(crate::Error::NotSupported { - source: "S3 does not support copy-if-not-exists".to_string().into(), - }) - } - } + Request { + builder, + path: to, + config: &self.config, + payload_sha256: None, } - - builder - .with_aws_sigv4( - credential.as_deref(), - &self.config.region, - "s3", - self.config.sign_payload, - None, - ) - .send_retry(&self.config.retry_config) - .await - .map_err(|source| match source.status() { - Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists { - source: Box::new(source), - path: to.to_string(), - }, - _ => Error::CopyRequest { - source, - path: from.to_string(), - } - .into(), - })?; - - Ok(()) } pub async fn create_multipart(&self, location: &Path) -> Result { @@ -535,15 +509,14 @@ impl S3Client { ) -> Result { let part = (part_idx + 1).to_string(); - let result = self + let response = self .put_request(path, data) .query(&[("partNumber", &part), ("uploadId", upload_id)]) .send() .await?; - Ok(PartId { - content_id: result.e_tag.unwrap(), - }) + let content_id = get_etag(response.headers()).context(MetadataSnafu)?; + Ok(PartId { content_id }) } pub async fn complete_multipart( diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs new file mode 100644 index 000000000000..cb6a7b2c225e --- /dev/null +++ b/object_store/src/aws/dynamo.rs @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A DynamoDB based lock system + +use crate::aws::client::S3Client; +use crate::aws::credential::CredentialExt; +use crate::client::get::GetClientExt; +use crate::client::retry::Error as RetryError; +use crate::client::retry::RetryExt; +use crate::path::Path; +use crate::{Error, GetOptions, Result}; +use chrono::Utc; +use reqwest::StatusCode; +use serde::ser::SerializeMap; +use serde::{Deserialize, Serialize, Serializer}; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +/// The exception returned by DynamoDB on conflict +const CONFLICT: &str = "com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedException"; + +/// A DynamoDB-based commit protocol, used to provide conditional write support for S3 +/// +/// ## Limitations +/// +/// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can +/// therefore race with non-conditional operations, e.g. `put`, `copy`, or conditional +/// operations performed by writers not configured to synchronize with DynamoDB. +/// +/// Workloads making use of this mechanism **must** ensure: +/// +/// * Conditional and non-conditional operations are not performed on the same paths +/// * Conditional operations are only performed via similarly configured clients +/// +/// Additionally as the locking mechanism relies on timeouts to detect stale locks, +/// performance will be poor for systems that frequently delete and then create +/// objects at the same path, instead being optimised for systems that primarily create +/// files with paths never used before, or perform conditional updates to existing files +/// +/// ## Commit Protocol +/// +/// The DynamoDB schema is as follows: +/// +/// * A string hash key named `"key"` +/// * A numeric [TTL] attribute named `"ttl"` +/// * A numeric attribute named `"generation"` +/// * A numeric attribute named `"timeout"` +/// +/// To perform a conditional operation on an object with a given `path` and `etag` (if exists), +/// the commit protocol is as follows: +/// +/// 1. Perform HEAD request on `path` and error on precondition mismatch +/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured timeout +/// 1. On Success: Perform operation with the configured timeout +/// 2. On Conflict: +/// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch +/// 2. If `timeout * max_skew_rate` passed, replace the record incrementing the `"generation"` +/// 1. On Success: GOTO 2.1 +/// 2. On Conflict: GOTO 2.2 +/// +/// Provided no writer modifies an object with a given `path` and `etag` without first adding a +/// corresponding record to DynamoDB, we are guaranteed that only one writer will ever commit. +/// +/// This is inspired by the [DynamoDB Lock Client] but simplified for the more limited +/// requirements of synchronizing object storage. The major changes are: +/// +/// * Uses a monotonic generation count instead of a UUID rvn, as this is: +/// * Cheaper to generate, serialize and compare +/// * Cannot collide +/// * More human readable / interpretable +/// * Relies on [TTL] to eventually clean up old locks +/// +/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit protocol, but +/// generalised to not make assumptions about the workload and not rely on first writing +/// to a temporary path. +/// +/// [TTL]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html +/// [DynamoDB Lock Client]: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/ +/// [S3 Multi-Cluster]: https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h +#[derive(Debug, Clone)] +pub struct DynamoCommit { + table_name: String, + /// The number of seconds a lease is valid for + timeout: usize, + /// The maximum clock skew rate tolerated by the system + max_clock_skew_rate: u32, + /// The length of time a record will be retained in DynamoDB before being cleaned up + /// + /// This is purely an optimisation to avoid indefinite growth of the DynamoDB table + /// and does not impact how long clients may wait to acquire a lock + ttl: Duration, + /// The backoff duration before retesting a condition + test_interval: Duration, +} + +impl DynamoCommit { + /// Create a new [`DynamoCommit`] with a given table name + pub fn new(table_name: String) -> Self { + Self { + table_name, + timeout: 20, + max_clock_skew_rate: 3, + ttl: Duration::from_secs(60 * 60), + test_interval: Duration::from_millis(100), + } + } + + /// Returns the name of the DynamoDB table + pub fn table_name(&self) -> &str { + &self.table_name + } + + pub(crate) async fn copy_if_not_exists( + &self, + client: &S3Client, + from: &Path, + to: &Path, + ) -> Result<()> { + check_not_exists(client, to).await?; + + let mut previous_lease = None; + + loop { + let existing = previous_lease.as_ref(); + match self.try_lock(client, to.as_ref(), existing).await? { + TryLockResult::Ok(lease) => { + let fut = client.copy_request(from, to).send(); + let expiry = lease.acquire + lease.timeout; + return match tokio::time::timeout_at(expiry.into(), fut).await { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => Err(e), + Err(_) => Err(Error::Generic { + store: "DynamoDB", + source: format!( + "Failed to perform copy operation in {} seconds", + self.timeout + ) + .into(), + }), + }; + } + TryLockResult::Conflict(conflict) => { + let mut interval = tokio::time::interval(self.test_interval); + let expiry = conflict.timeout * self.max_clock_skew_rate; + loop { + interval.tick().await; + check_not_exists(client, to).await?; + if conflict.acquire.elapsed() > expiry { + previous_lease = Some(conflict); + break; + } + } + } + } + } + } + + async fn try_lock( + &self, + s3: &S3Client, + key: &str, + existing: Option<&Lease>, + ) -> Result { + let attributes; + let (next_gen, condition_expression, expression_attribute_values) = match existing { + None => (0_usize, "attribute_not_exists(#pk)", Map(&[])), + Some(existing) => { + attributes = [(":g", AttributeValue::Number(existing.generation))]; + ( + existing.generation.checked_add(1).unwrap(), + "attribute_exists(#pk) AND generation == :g", + Map(attributes.as_slice()), + ) + } + }; + + let ttl = (Utc::now() + self.ttl).timestamp(); + let items = [ + ("key", AttributeValue::String(key)), + ("generation", AttributeValue::Number(next_gen)), + ("timeout", AttributeValue::Number(self.timeout)), + ("ttl", AttributeValue::Number(ttl as _)), + ]; + let names = [("#pk", "key")]; + + let req = PutItem { + table_name: &self.table_name, + condition_expression, + expression_attribute_values, + expression_attribute_names: Map(&names), + item: Map(&items), + return_values: None, + return_values_on_condition_check_failure: Some(ReturnValues::AllOld), + }; + + let credential = s3.config.get_credential().await?; + + let acquire = Instant::now(); + let region = &s3.config.region; + + let builder = match &s3.config.endpoint { + Some(e) => s3.client.post(e), + None => { + let url = format!("https://dynamodb.{region}.amazonaws.com",); + s3.client.post(url) + } + }; + + let response = builder + .json(&req) + .header("X-Amz-Target", "DynamoDB_20120810.PutItem") + .with_aws_sigv4(credential.as_deref(), region, "dynamodb", true, None) + .send_retry(&s3.config.retry_config) + .await; + + match response { + Ok(_) => Ok(TryLockResult::Ok(Lease { + acquire, + generation: next_gen, + timeout: Duration::from_secs(self.timeout as _), + })), + Err(e) => match try_extract_lease(&e) { + Some(lease) => Ok(TryLockResult::Conflict(lease)), + None => Err(Error::Generic { + store: "DynamoDB", + source: Box::new(e), + }), + }, + } + } +} + +#[derive(Debug)] +enum TryLockResult { + /// Successfully acquired a lease + Ok(Lease), + /// An existing lease was found + Conflict(Lease), +} + +/// Returns an [`Error::AlreadyExists`] if `path` exists +async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> { + let options = GetOptions { + head: true, + ..Default::default() + }; + match client.get_opts(path, options).await { + Ok(_) => Err(Error::AlreadyExists { + path: path.to_string(), + source: "Already Exists".to_string().into(), + }), + Err(Error::NotFound { .. }) => Ok(()), + Err(e) => Err(e), + } +} + +/// If [`RetryError`] corresponds to [`CONFLICT`] extracts the pre-existing [`Lease`] +fn try_extract_lease(e: &RetryError) -> Option { + match e { + RetryError::Client { + status: StatusCode::BAD_REQUEST, + body: Some(b), + } => { + let resp: ErrorResponse<'_> = serde_json::from_str(b).ok()?; + if resp.error != CONFLICT { + return None; + } + + let generation = match resp.item.get("generation") { + Some(AttributeValue::Number(generation)) => generation, + _ => return None, + }; + + let timeout = match resp.item.get("timeout") { + Some(AttributeValue::Number(timeout)) => *timeout, + _ => return None, + }; + + Some(Lease { + acquire: Instant::now(), + generation: *generation, + timeout: Duration::from_secs(timeout as _), + }) + } + _ => None, + } +} + +/// A lock lease +#[derive(Debug, Clone)] +struct Lease { + acquire: Instant, + generation: usize, + timeout: Duration, +} + +/// A DynamoDB [PutItem] payload +/// +/// [PutItem]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html +#[derive(Serialize)] +#[serde(rename_all = "PascalCase")] +struct PutItem<'a> { + /// The table name + table_name: &'a str, + + /// A condition that must be satisfied in order for a conditional PutItem operation to succeed. + condition_expression: &'a str, + + /// One or more substitution tokens for attribute names in an expression + expression_attribute_names: Map<'a, &'a str, &'a str>, + + /// One or more values that can be substituted in an expression + expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>, + + /// A map of attribute name/value pairs, one for each attribute + item: Map<'a, &'a str, AttributeValue<'a>>, + + /// Use ReturnValues if you want to get the item attributes as they appeared + /// before they were updated with the PutItem request. + #[serde(skip_serializing_if = "Option::is_none")] + return_values: Option, + + /// An optional parameter that returns the item attributes for a PutItem operation + /// that failed a condition check. + #[serde(skip_serializing_if = "Option::is_none")] + return_values_on_condition_check_failure: Option, +} + +#[derive(Deserialize)] +struct ErrorResponse<'a> { + #[serde(rename = "__type")] + error: &'a str, + + #[serde(borrow, default, rename = "Item")] + item: HashMap<&'a str, AttributeValue<'a>>, +} + +#[derive(Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +enum ReturnValues { + AllOld, +} + +/// A collection of key value pairs +/// +/// This provides cheap, ordered serialization of maps +struct Map<'a, K, V>(&'a [(K, V)]); + +impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if self.0.is_empty() { + return serializer.serialize_none(); + } + let mut map = serializer.serialize_map(Some(self.0.len()))?; + for (k, v) in self.0 { + map.serialize_entry(k, v)? + } + map.end() + } +} + +/// A DynamoDB [AttributeValue] +/// +/// [AttributeValue]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html +#[derive(Debug, Serialize, Deserialize)] +enum AttributeValue<'a> { + #[serde(rename = "S")] + String(&'a str), + #[serde(rename = "N", with = "number")] + Number(usize), +} + +/// Numbers are serialized as strings +mod number { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(v: &usize, s: S) -> Result { + s.serialize_str(&v.to_string()) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result { + let v: &str = Deserialize::deserialize(d)?; + v.parse().map_err(serde::de::Error::custom) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_attribute_serde() { + let serde = serde_json::to_string(&AttributeValue::Number(23)).unwrap(); + assert_eq!(serde, "{\"N\":\"23\"}"); + let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap(); + assert!(matches!(back, AttributeValue::Number(23))); + } +} diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index cbb3cffdf494..6f4f1e35b949 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -58,11 +58,13 @@ mod builder; mod checksum; mod client; mod credential; +mod dynamo; mod precondition; mod resolve; pub use builder::{AmazonS3Builder, AmazonS3ConfigKey}; pub use checksum::Checksum; +pub use dynamo::DynamoCommit; pub use precondition::{S3ConditionalPut, S3CopyIfNotExists}; pub use resolve::resolve_bucket_region; @@ -93,19 +95,19 @@ pub struct AmazonS3 { impl std::fmt::Display for AmazonS3 { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "AmazonS3({})", self.client.config().bucket) + write!(f, "AmazonS3({})", self.client.config.bucket) } } impl AmazonS3 { /// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`] pub fn credentials(&self) -> &AwsCredentialProvider { - &self.client.config().credentials + &self.client.config.credentials } /// Create a full URL to the resource specified by `path` with this instance's configuration. fn path_url(&self, path: &Path) -> String { - self.client.config().path_url(path) + self.client.config.path_url(path) } } @@ -145,7 +147,7 @@ impl Signer for AmazonS3 { /// ``` async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { let credential = self.credentials().get_credential().await?; - let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config().region); + let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region); let path_url = self.path_url(path); let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic { @@ -164,15 +166,15 @@ impl ObjectStore for AmazonS3 { async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { let mut request = self.client.put_request(location, bytes); let tags = opts.tags.encoded(); - if !tags.is_empty() && !self.client.config().disable_tagging { + if !tags.is_empty() && !self.client.config.disable_tagging { request = request.header(&TAGS_HEADER, tags); } - match (opts.mode, &self.client.config().conditional_put) { - (PutMode::Overwrite, _) => request.send().await, + match (opts.mode, &self.client.config.conditional_put) { + (PutMode::Overwrite, _) => request.do_put().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { - match request.header(&IF_NONE_MATCH, "*").send().await { + match request.header(&IF_NONE_MATCH, "*").do_put().await { // Technically If-None-Match should return NotModified but some stores, // such as R2, instead return PreconditionFailed // https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject @@ -190,7 +192,7 @@ impl ObjectStore for AmazonS3 { store: STORE, source: "ETag required for conditional put".to_string().into(), })?; - request.header(&IF_MATCH, etag.as_str()).send().await + request.header(&IF_MATCH, etag.as_str()).do_put().await } } } @@ -261,11 +263,29 @@ impl ObjectStore for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.client.copy_request(from, to, true).await + self.client.copy_request(from, to).send().await?; + Ok(()) } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.client.copy_request(from, to, false).await + match &self.client.config.copy_if_not_exists { + Some(S3CopyIfNotExists::Header(k, v)) => { + let req = self.client.copy_request(from, to); + match req.header(k, v).send().await { + Err(Error::Precondition { path, source }) => { + Err(Error::AlreadyExists { path, source }) + } + Err(e) => Err(e), + Ok(_) => Ok(()), + } + } + Some(S3CopyIfNotExists::Dynamo(lock)) => { + lock.copy_if_not_exists(&self.client, from, to).await + } + None => Err(Error::NotSupported { + source: "S3 does not support copy-if-not-exists".to_string().into(), + }), + } } } @@ -335,8 +355,8 @@ mod tests { let config = AmazonS3Builder::from_env(); let integration = config.build().unwrap(); - let config = integration.client.config(); - let is_local = config.endpoint.starts_with("http://"); + let config = &integration.client.config; + let is_local = matches!(&config.endpoint, Some(e) if e.starts_with("http://")); let test_not_exists = config.copy_if_not_exists.is_some(); let test_conditional_put = config.conditional_put.is_some(); diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index a50b57fe23f7..b59699c4f2cf 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::aws::dynamo::DynamoCommit; use crate::config::Parse; /// Configure how to provide [`ObjectStore::copy_if_not_exists`] for [`AmazonS3`]. @@ -38,12 +39,21 @@ pub enum S3CopyIfNotExists { /// /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists Header(String, String), + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as `dynamodb:` ignoring whitespace + /// + /// See [`DynamoCommit`] for more information + /// + /// This will use the same region, credentials and endpoint as configured for S3 + Dynamo(DynamoCommit), } impl std::fmt::Display for S3CopyIfNotExists { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Header(k, v) => write!(f, "header: {}: {}", k, v), + Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } } @@ -56,6 +66,7 @@ impl S3CopyIfNotExists { let (k, v) = value.split_once(':')?; Some(Self::Header(k.trim().to_string(), v.trim().to_string())) } + "dynamo" => Some(Self::Dynamo(DynamoCommit::new(value.trim().to_string()))), _ => None, } } From eb6a328397c36af1df2f15e261f90e1e25404df3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 Oct 2023 18:49:06 +0100 Subject: [PATCH 02/11] Cleanup error handling --- object_store/src/aws/client.rs | 44 ++++------------------------------ 1 file changed, 5 insertions(+), 39 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 203d03d19910..3af5b322b0f5 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -58,30 +58,12 @@ const VERSION_HEADER: &str = "x-amz-version-id"; #[derive(Debug, Snafu)] #[allow(missing_docs)] pub(crate) enum Error { - #[snafu(display("Error performing get request {}: {}", path, source))] - GetRequest { - source: crate::client::retry::Error, - path: String, - }, - #[snafu(display("Error fetching get response body {}: {}", path, source))] GetResponseBody { source: reqwest::Error, path: String, }, - #[snafu(display("Error performing put request {}: {}", path, source))] - PutRequest { - source: crate::client::retry::Error, - path: String, - }, - - #[snafu(display("Error performing delete request {}: {}", path, source))] - DeleteRequest { - source: crate::client::retry::Error, - path: String, - }, - #[snafu(display("Error performing DeleteObjects request: {}", source))] DeleteObjectsRequest { source: crate::client::retry::Error }, @@ -105,12 +87,6 @@ pub(crate) enum Error { source: Box, }, - #[snafu(display("Error performing copy request {}: {}", path, source))] - CopyRequest { - source: crate::client::retry::Error, - path: String, - }, - #[snafu(display("Error performing list request: {}", source))] ListRequest { source: crate::client::retry::Error }, @@ -144,10 +120,6 @@ pub(crate) enum Error { impl From for crate::Error { fn from(err: Error) -> Self { match err { - Error::GetRequest { source, path } - | Error::DeleteRequest { source, path } - | Error::CopyRequest { source, path } - | Error::PutRequest { source, path } => source.error(STORE, path), _ => Self::Generic { store: STORE, source: Box::new(err), @@ -261,9 +233,7 @@ impl<'a> Request<'a> { ) .send_retry(&self.config.retry_config) .await - .context(PutRequestSnafu { - path: self.path.as_ref(), - })?; + .map_err(|e| e.error(STORE, self.path.to_string()))?; Ok(response) } @@ -340,9 +310,7 @@ impl S3Client { ) .send_retry(&self.config.retry_config) .await - .context(DeleteRequestSnafu { - path: path.as_ref(), - })?; + .map_err(|e| e.error(STORE, path.to_string()))?; Ok(()) } @@ -456,7 +424,7 @@ impl S3Client { } /// Make an S3 Copy request - pub fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) -> Request<'a> { + pub fn copy_request<'a>(&'a self, from: &'a Path, to: &Path) -> Request<'a> { let url = self.config.path_url(to); let source = format!("{}/{}", self.config.bucket, encode_path(from)); @@ -467,7 +435,7 @@ impl S3Client { Request { builder, - path: to, + path: from, config: &self.config, payload_sha256: None, } @@ -622,9 +590,7 @@ impl GetClient for S3Client { ) .send_retry(&self.config.retry_config) .await - .context(GetRequestSnafu { - path: path.as_ref(), - })?; + .map_err(|e| e.error(STORE, path.to_string()))?; Ok(response) } From 8b637972d3a9c75761e56660f2eacc80e3634467 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 Oct 2023 19:40:54 +0100 Subject: [PATCH 03/11] Clippy --- object_store/src/aws/client.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 3af5b322b0f5..ef79fcd14848 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -119,11 +119,9 @@ pub(crate) enum Error { impl From for crate::Error { fn from(err: Error) -> Self { - match err { - _ => Self::Generic { - store: STORE, - source: Box::new(err), - }, + Self::Generic { + store: STORE, + source: Box::new(err), } } } From 1a73e388117d42969f75c99ed9b5da2e0b7515de Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 29 Oct 2023 18:14:04 +0000 Subject: [PATCH 04/11] Localstack support --- .github/workflows/object_store.yml | 3 +- object_store/src/aws/dynamo.rs | 274 +++++++++++++++++++++------ object_store/src/aws/precondition.rs | 10 +- 3 files changed, 221 insertions(+), 66 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 06ba203c55b0..9d58a9f14814 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -112,7 +112,8 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true - AWS_COPY_IF_NOT_EXISTS: dynamo:test-table + # We use an extremely low timeout to speed up tests, this should not be replicated for production environments + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:100 HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index cb6a7b2c225e..fcbc60a07ed8 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -17,22 +17,27 @@ //! A DynamoDB based lock system +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use chrono::Utc; +use reqwest::{Response, StatusCode}; +use serde::ser::SerializeMap; +use serde::{Deserialize, Serialize, Serializer}; + use crate::aws::client::S3Client; use crate::aws::credential::CredentialExt; +use crate::aws::AwsCredential; use crate::client::get::GetClientExt; use crate::client::retry::Error as RetryError; use crate::client::retry::RetryExt; use crate::path::Path; use crate::{Error, GetOptions, Result}; -use chrono::Utc; -use reqwest::StatusCode; -use serde::ser::SerializeMap; -use serde::{Deserialize, Serialize, Serializer}; -use std::collections::HashMap; -use std::time::{Duration, Instant}; /// The exception returned by DynamoDB on conflict -const CONFLICT: &str = "com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedException"; +const CONFLICT: &str = "ConditionalCheckFailedException"; + +const STORE: &str = "DynamoDB"; /// A DynamoDB-based commit protocol, used to provide conditional write support for S3 /// @@ -95,8 +100,8 @@ const CONFLICT: &str = "com.amazonaws.dynamodb.v20120810#ConditionalCheckFailedE #[derive(Debug, Clone)] pub struct DynamoCommit { table_name: String, - /// The number of seconds a lease is valid for - timeout: usize, + /// The number of milliseconds a lease is valid for + timeout: u64, /// The maximum clock skew rate tolerated by the system max_clock_skew_rate: u32, /// The length of time a record will be retained in DynamoDB before being cleaned up @@ -113,15 +118,44 @@ impl DynamoCommit { pub fn new(table_name: String) -> Self { Self { table_name, - timeout: 20, + timeout: 2_000, max_clock_skew_rate: 3, ttl: Duration::from_secs(60 * 60), test_interval: Duration::from_millis(100), } } - /// Returns the name of the DynamoDB table - pub fn table_name(&self) -> &str { + /// Overrides the lock timeout. + /// + /// A longer lock timeout reduces the probability of spurious commit failures and multi-writer + /// races, but will increase the time that writers must wait to reclaim a lock lost. The + /// default value of 2 seconds should be appropriate for must use-cases. + pub fn with_timeout(mut self, millis: u64) -> Self { + self.timeout = millis; + self + } + + /// The maximum clock skew rate tolerated by the system. + /// + /// An environment in which the clock on the fastest node ticks twice as fast as the slowest + /// node, would have a clock skew rate of 2. The default value of 3 should be appropriate + /// for most environments. + pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self { + self.max_clock_skew_rate = rate; + self + } + + /// The length of time a record should be retained in DynamoDB before being cleaned up + /// + /// This should be significantly larger than the configured lock timeout, with the default + /// value of 1 hour appropriate for most use-cases. + pub fn with_ttl(mut self, ttl: Duration) -> Self { + self.ttl = ttl; + self + } + + /// Returns the name of the DynamoDB table. + pub(crate) fn table_name(&self) -> &str { &self.table_name } @@ -147,7 +181,7 @@ impl DynamoCommit { Err(_) => Err(Error::Generic { store: "DynamoDB", source: format!( - "Failed to perform copy operation in {} seconds", + "Failed to perform copy operation in {} milliseconds", self.timeout ) .into(), @@ -170,6 +204,36 @@ impl DynamoCommit { } } + async fn get_lock(&self, s3: &S3Client, key: &str) -> Result { + let key_attributes = [("key", AttributeValue::String(key))]; + let req = GetItem { + table_name: &self.table_name, + key: Map(&key_attributes), + }; + let credential = s3.config.get_credential().await?; + + let resp = self + .request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req) + .await + .map_err(|e| e.error(STORE, key.to_string()))?; + + let body = resp.bytes().await.map_err(|e| Error::Generic { + store: STORE, + source: Box::new(e), + })?; + + let response: GetItemResponse<'_> = + serde_json::from_slice(body.as_ref()).map_err(|e| Error::Generic { + store: STORE, + source: Box::new(e), + })?; + + extract_lease(&response.item).ok_or_else(|| Error::NotFound { + path: key.into(), + source: "DynamoDB GetItem returned not items".to_string().into(), + }) + } + async fn try_lock( &self, s3: &S3Client, @@ -178,12 +242,12 @@ impl DynamoCommit { ) -> Result { let attributes; let (next_gen, condition_expression, expression_attribute_values) = match existing { - None => (0_usize, "attribute_not_exists(#pk)", Map(&[])), + None => (0_u64, "attribute_not_exists(#pk)", Map(&[])), Some(existing) => { attributes = [(":g", AttributeValue::Number(existing.generation))]; ( existing.generation.checked_add(1).unwrap(), - "attribute_exists(#pk) AND generation == :g", + "attribute_exists(#pk) AND generation = :g", Map(attributes.as_slice()), ) } @@ -211,37 +275,60 @@ impl DynamoCommit { let credential = s3.config.get_credential().await?; let acquire = Instant::now(); + match self + .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", req) + .await + { + Ok(_) => Ok(TryLockResult::Ok(Lease { + acquire, + generation: next_gen, + timeout: Duration::from_millis(self.timeout), + })), + Err(e) => match parse_error_response(&e) { + Some(e) if e.error.ends_with(CONFLICT) => match extract_lease(&e.item) { + Some(lease) => Ok(TryLockResult::Conflict(lease)), + // ReturnValuesOnConditionCheckFailure is a relatively recent addition + // to DynamoDB and is not supported by dynamodb-local, which is used + // by localstack. In such cases the conflict error will not contain + // the conflicting item, and we must instead perform a get request + // + // + // + // + None => Ok(TryLockResult::Conflict(self.get_lock(s3, key).await?)), + }, + _ => Err(Error::Generic { + store: STORE, + source: Box::new(e), + }), + }, + } + } + + async fn request( + &self, + s3: &S3Client, + cred: Option<&AwsCredential>, + target: &str, + req: R, + ) -> Result { let region = &s3.config.region; let builder = match &s3.config.endpoint { Some(e) => s3.client.post(e), None => { - let url = format!("https://dynamodb.{region}.amazonaws.com",); + let url = format!("https://dynamodb.{region}.amazonaws.com"); s3.client.post(url) } }; - let response = builder + builder + .timeout(Duration::from_millis(self.timeout)) .json(&req) - .header("X-Amz-Target", "DynamoDB_20120810.PutItem") - .with_aws_sigv4(credential.as_deref(), region, "dynamodb", true, None) + .header("X-Amz-Target", target) + .with_aws_sigv4(cred, region, "dynamodb", true, None) .send_retry(&s3.config.retry_config) - .await; - - match response { - Ok(_) => Ok(TryLockResult::Ok(Lease { - acquire, - generation: next_gen, - timeout: Duration::from_secs(self.timeout as _), - })), - Err(e) => match try_extract_lease(&e) { - Some(lease) => Ok(TryLockResult::Conflict(lease)), - None => Err(Error::Generic { - store: "DynamoDB", - source: Box::new(e), - }), - }, - } + .await } } @@ -269,43 +356,41 @@ async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> { } } -/// If [`RetryError`] corresponds to [`CONFLICT`] extracts the pre-existing [`Lease`] -fn try_extract_lease(e: &RetryError) -> Option { +/// Parses the error response if any +fn parse_error_response(e: &RetryError) -> Option> { match e { RetryError::Client { status: StatusCode::BAD_REQUEST, body: Some(b), - } => { - let resp: ErrorResponse<'_> = serde_json::from_str(b).ok()?; - if resp.error != CONFLICT { - return None; - } - - let generation = match resp.item.get("generation") { - Some(AttributeValue::Number(generation)) => generation, - _ => return None, - }; - - let timeout = match resp.item.get("timeout") { - Some(AttributeValue::Number(timeout)) => *timeout, - _ => return None, - }; - - Some(Lease { - acquire: Instant::now(), - generation: *generation, - timeout: Duration::from_secs(timeout as _), - }) - } + } => serde_json::from_str(b).ok(), _ => None, } } +/// Extracts a lease from `item`, returning `None` on error +fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option { + let generation = match item.get("generation") { + Some(AttributeValue::Number(generation)) => generation, + _ => return None, + }; + + let timeout = match item.get("timeout") { + Some(AttributeValue::Number(timeout)) => *timeout, + _ => return None, + }; + + Some(Lease { + acquire: Instant::now(), + generation: *generation, + timeout: Duration::from_millis(timeout), + }) +} + /// A lock lease #[derive(Debug, Clone)] struct Lease { acquire: Instant, - generation: usize, + generation: u64, timeout: Duration, } @@ -341,6 +426,24 @@ struct PutItem<'a> { return_values_on_condition_check_failure: Option, } +/// A DynamoDB [GetItem] payload +/// +/// [GetItem]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html +#[derive(Serialize)] +#[serde(rename_all = "PascalCase")] +struct GetItem<'a> { + /// The table name + table_name: &'a str, + /// The primary key + key: Map<'a, &'a str, AttributeValue<'a>>, +} + +#[derive(Deserialize)] +struct GetItemResponse<'a> { + #[serde(borrow, default, rename = "Item")] + item: HashMap<&'a str, AttributeValue<'a>>, +} + #[derive(Deserialize)] struct ErrorResponse<'a> { #[serde(rename = "__type")] @@ -385,18 +488,18 @@ enum AttributeValue<'a> { #[serde(rename = "S")] String(&'a str), #[serde(rename = "N", with = "number")] - Number(usize), + Number(u64), } /// Numbers are serialized as strings mod number { use serde::{Deserialize, Deserializer, Serializer}; - pub fn serialize(v: &usize, s: S) -> Result { + pub fn serialize(v: &u64, s: S) -> Result { s.serialize_str(&v.to_string()) } - pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result { + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result { let v: &str = Deserialize::deserialize(d)?; v.parse().map_err(serde::de::Error::custom) } @@ -404,6 +507,10 @@ mod number { #[cfg(test)] mod tests { + use crate::aws::{AmazonS3Builder, S3CopyIfNotExists}; + use crate::test_util::maybe_skip_integration; + use crate::ObjectStore; + use super::*; #[test] @@ -413,4 +520,45 @@ mod tests { let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap(); assert!(matches!(back, AttributeValue::Number(23))); } + + #[tokio::test] + async fn test_dynamo() { + maybe_skip_integration!(); + let integration = AmazonS3Builder::from_env().build().unwrap(); + let d = match &integration.client.config.copy_if_not_exists { + Some(S3CopyIfNotExists::Dynamo(d)) => d, + _ => { + eprintln!("Skipping dynamo integration test - dynamo not configured"); + return; + } + }; + let client = integration.client.as_ref(); + + let src = Path::from("dynamo_path_src"); + integration.put(&src, "asd".into()).await.unwrap(); + + let dst = Path::from("dynamo_path"); + let _ = integration.delete(&dst).await; // Delete if present + + // Create a lock if not already exists + let existing = match d.try_lock(client, dst.as_ref(), None).await.unwrap() { + TryLockResult::Conflict(l) => l, + TryLockResult::Ok(l) => l, + }; + + // Should not be able to acquire a lock again + let r = d.try_lock(client, dst.as_ref(), None).await; + assert!(matches!(r, Ok(TryLockResult::Conflict(_)))); + + // But should still be able to reclaim lock and perform copy + d.copy_if_not_exists(client, &src, &dst).await.unwrap(); + + match d.try_lock(client, dst.as_ref(), None).await.unwrap() { + TryLockResult::Conflict(new) => { + // Should have incremented generation to do so + assert_eq!(new.generation, existing.generation + 1); + } + _ => panic!("Should conflict"), + } + } } diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index b59699c4f2cf..f212dc2dd228 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -41,7 +41,8 @@ pub enum S3CopyIfNotExists { Header(String, String), /// The name of a DynamoDB table to use for coordination /// - /// Encoded as `dynamodb:` ignoring whitespace + /// Encoded as either `dynamodb:` or `dynamodb::` + /// ignoring whitespace. The default timeout is used if not specified /// /// See [`DynamoCommit`] for more information /// @@ -66,7 +67,12 @@ impl S3CopyIfNotExists { let (k, v) = value.split_once(':')?; Some(Self::Header(k.trim().to_string(), v.trim().to_string())) } - "dynamo" => Some(Self::Dynamo(DynamoCommit::new(value.trim().to_string()))), + "dynamo" => Some(Self::Dynamo(match value.split_once(':') { + Some((table_name, timeout)) => DynamoCommit::new(table_name.trim().to_string()) + .with_timeout(timeout.parse().ok()?), + None => DynamoCommit::new(value.trim().to_string()), + })), + _ => None, } } From 1ea8ed1d85d3029c580350b11c06f69144370eaa Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 29 Oct 2023 20:06:05 +0000 Subject: [PATCH 05/11] Clippy --- object_store/src/aws/dynamo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index fcbc60a07ed8..5bcb9bbf6b29 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -305,7 +305,7 @@ impl DynamoCommit { } } - async fn request( + async fn request( &self, s3: &S3Client, cred: Option<&AwsCredential>, From 4bbca2ad02747c71fed44105cb830768f2d20bd8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 29 Oct 2023 20:29:21 +0000 Subject: [PATCH 06/11] Handle integration test concurrency --- .github/workflows/object_store.yml | 4 ++-- object_store/src/aws/dynamo.rs | 28 ++++++++++++---------------- object_store/src/aws/mod.rs | 5 +++++ 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 9d58a9f14814..a1906b91d3f4 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -112,8 +112,8 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true - # We use an extremely low timeout to speed up tests, this should not be replicated for production environments - AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:100 + # We use a low timeout to speed up tests, this should not be replicated for production environments + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:1000 HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 5bcb9bbf6b29..ec815b7d21be 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -118,7 +118,7 @@ impl DynamoCommit { pub fn new(table_name: String) -> Self { Self { table_name, - timeout: 2_000, + timeout: 20_000, max_clock_skew_rate: 3, ttl: Duration::from_secs(60 * 60), test_interval: Duration::from_millis(100), @@ -230,7 +230,7 @@ impl DynamoCommit { extract_lease(&response.item).ok_or_else(|| Error::NotFound { path: key.into(), - source: "DynamoDB GetItem returned not items".to_string().into(), + source: "DynamoDB GetItem returned no items".to_string().into(), }) } @@ -505,13 +505,16 @@ mod number { } } +/// Re-export integration_test to be called by s3_test +#[cfg(test)] +pub(crate) use tests::integration_test; + #[cfg(test)] mod tests { - use crate::aws::{AmazonS3Builder, S3CopyIfNotExists}; - use crate::test_util::maybe_skip_integration; - use crate::ObjectStore; use super::*; + use crate::aws::AmazonS3; + use crate::ObjectStore; #[test] fn test_attribute_serde() { @@ -521,17 +524,10 @@ mod tests { assert!(matches!(back, AttributeValue::Number(23))); } - #[tokio::test] - async fn test_dynamo() { - maybe_skip_integration!(); - let integration = AmazonS3Builder::from_env().build().unwrap(); - let d = match &integration.client.config.copy_if_not_exists { - Some(S3CopyIfNotExists::Dynamo(d)) => d, - _ => { - eprintln!("Skipping dynamo integration test - dynamo not configured"); - return; - } - }; + /// An integration test for DynamoDB + /// + /// This is a function called by s3_test to avoid test concurrency issues + pub async fn integration_test(integration: &AmazonS3, d: &DynamoCommit) { let client = integration.client.as_ref(); let src = Path::from("dynamo_path_src"); diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 6f4f1e35b949..bab6b9327c3b 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -390,6 +390,11 @@ mod tests { let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256); let integration = builder.build().unwrap(); put_get_delete_list_opts(&integration, is_local).await; + + match &integration.client.config.copy_if_not_exists { + Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await, + _ => eprintln!("Skipping dynamo integration test - dynamo not configured"), + }; } #[tokio::test] From cbedff113353fb8da20c2d53a7c5ee86d960a73e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 10:41:16 +0000 Subject: [PATCH 07/11] More docs --- object_store/src/aws/dynamo.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index ec815b7d21be..62e1d06d2628 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -44,8 +44,8 @@ const STORE: &str = "DynamoDB"; /// ## Limitations /// /// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can -/// therefore race with non-conditional operations, e.g. `put`, `copy`, or conditional -/// operations performed by writers not configured to synchronize with DynamoDB. +/// therefore race with non-conditional operations, e.g. `put`, `copy`, `delete`, or +/// conditional operations performed by writers not configured to synchronize with DynamoDB. /// /// Workloads making use of this mechanism **must** ensure: /// @@ -129,7 +129,7 @@ impl DynamoCommit { /// /// A longer lock timeout reduces the probability of spurious commit failures and multi-writer /// races, but will increase the time that writers must wait to reclaim a lock lost. The - /// default value of 2 seconds should be appropriate for must use-cases. + /// default value of 20 seconds should be appropriate for must use-cases. pub fn with_timeout(mut self, millis: u64) -> Self { self.timeout = millis; self @@ -204,6 +204,7 @@ impl DynamoCommit { } } + /// Retrieve a lock, returning an error if it doesn't exist async fn get_lock(&self, s3: &S3Client, key: &str) -> Result { let key_attributes = [("key", AttributeValue::String(key))]; let req = GetItem { @@ -234,6 +235,7 @@ impl DynamoCommit { }) } + /// Attempt to acquire a lock, reclaiming an existing lease if provided async fn try_lock( &self, s3: &S3Client, @@ -292,6 +294,11 @@ impl DynamoCommit { // by localstack. In such cases the conflict error will not contain // the conflicting item, and we must instead perform a get request // + // There is a potential race here if the conflicting record is removed + // before we retrieve it. We could retry the transaction in such a scenario, + // but as this only occurs for emulators, we simply abort with a + // not found error + // // // // From 9ac05eb878a9da05df66bd89d46dad7883e9a92e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 12:01:24 +0000 Subject: [PATCH 08/11] Disable request timeout --- .github/workflows/object_store.yml | 2 +- object_store/src/aws/dynamo.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index a1906b91d3f4..2d76060340f2 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -113,7 +113,7 @@ jobs: AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true # We use a low timeout to speed up tests, this should not be replicated for production environments - AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:1000 + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:100 HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 62e1d06d2628..98c48b45e07c 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -330,7 +330,7 @@ impl DynamoCommit { }; builder - .timeout(Duration::from_millis(self.timeout)) + // .timeout(Duration::from_millis(self.timeout)) .json(&req) .header("X-Amz-Target", target) .with_aws_sigv4(cred, region, "dynamodb", true, None) From ee4f321e81c88d7055a46b601111cf9e256ec8da Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 12:07:39 +0000 Subject: [PATCH 09/11] Fix merge conflicts --- object_store/src/aws/client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index ef79fcd14848..5cfbc8451177 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -545,9 +545,7 @@ impl S3Client { ) .send_retry(&self.config.retry_config) .await - .context(GetRequestSnafu { - path: path.as_ref(), - })?; + .map_err(|e| e.error(STORE, path.to_string()))?; Ok(response) } } From 7bef442fbb35efd3de97aa4713fa896f196dc58d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 12:26:34 +0000 Subject: [PATCH 10/11] Reduce test concurrency --- .github/workflows/object_store.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 2d76060340f2..7dc0668ab51f 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -153,7 +153,7 @@ jobs: rustup default stable - name: Run object_store tests - run: cargo test --features=aws,azure,gcp,http + run: cargo test --features=aws,azure,gcp,http -- --test-threads=1 # test the object_store crate builds against wasm32 in stable rust wasm32-build: From d9aae852961a346181b2fbf26696cc9ae29185f0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 Oct 2023 12:38:40 +0000 Subject: [PATCH 11/11] Increase timeouts --- .github/workflows/object_store.yml | 5 ++--- object_store/src/aws/dynamo.rs | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 7dc0668ab51f..fa8731814fb8 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -112,8 +112,7 @@ jobs: AWS_SECRET_ACCESS_KEY: test AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true - # We use a low timeout to speed up tests, this should not be replicated for production environments - AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:100 + AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000 HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" @@ -153,7 +152,7 @@ jobs: rustup default stable - name: Run object_store tests - run: cargo test --features=aws,azure,gcp,http -- --test-threads=1 + run: cargo test --features=aws,azure,gcp,http # test the object_store crate builds against wasm32 in stable rust wasm32-build: diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index 98c48b45e07c..62e1d06d2628 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -330,7 +330,7 @@ impl DynamoCommit { }; builder - // .timeout(Duration::from_millis(self.timeout)) + .timeout(Duration::from_millis(self.timeout)) .json(&req) .header("X-Amz-Target", target) .with_aws_sigv4(cred, region, "dynamodb", true, None)