diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 5e256793fcd..3a168e622a2 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,7 +80,8 @@ pub use crate::node_config::{ use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + S3ServerSideEncryption, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 651271d0c61..81776e5aff5 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -312,7 +312,13 @@ impl fmt::Debug for AzureStorageConfig { .finish() } } - +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum S3ServerSideEncryption { + Aes256, + AwsKms, + AwsKmsDsse, +} #[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct S3StorageConfig { @@ -334,6 +340,10 @@ pub struct S3StorageConfig { pub disable_multi_object_delete: bool, #[serde(default)] pub disable_multipart_upload: bool, + #[serde(default)] + pub server_side_encryption: Option, + #[serde(default)] + pub sse_kms_key_id: Option, } impl S3StorageConfig { @@ -384,6 +394,12 @@ impl S3StorageConfig { impl fmt::Debug for S3StorageConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let display_sse_kms_key_id = match &self.server_side_encryption { + Some(S3ServerSideEncryption::AwsKms) | Some(S3ServerSideEncryption::AwsKmsDsse) => { + &self.sse_kms_key_id + } + _ => &None, + }; f.debug_struct("S3StorageConfig") .field("access_key_id", &self.access_key_id) .field( @@ -397,6 +413,8 @@ impl fmt::Debug for S3StorageConfig { "disable_multi_object_delete", &self.disable_multi_object_delete, ) + .field("server_side_encryption", &self.server_side_encryption) + .field("sse_kms_key_id", &display_sse_kms_key_id) .finish() } } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 86ef692c671..61d748037a4 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -33,7 +33,9 @@ use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; +use aws_sdk_s3::types::{ + CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, ServerSideEncryption, +}; use aws_sdk_s3::Client as S3Client; use base64::prelude::{Engine, BASE64_STANDARD}; use futures::{stream, StreamExt}; @@ -43,7 +45,7 @@ use quickwit_aws::retry::{aws_retry, AwsRetryable}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::S3StorageConfig; +use quickwit_config::{S3ServerSideEncryption, S3StorageConfig}; use regex::Regex; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; @@ -91,6 +93,8 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + server_side_encryption: Option, + sse_kms_key_id: Option, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -177,6 +181,8 @@ impl S3CompatibleObjectStorage { let retry_params = RetryParams::aggressive(); let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; let disable_multipart_upload = s3_storage_config.disable_multipart_upload; + let server_side_encryption = s3_storage_config.server_side_encryption; + let sse_kms_key_id = s3_storage_config.sse_kms_key_id.clone(); Ok(Self { s3_client, uri: uri.clone(), @@ -186,6 +192,8 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, + server_side_encryption, + sse_kms_key_id, }) } @@ -203,6 +211,8 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + server_side_encryption: self.server_side_encryption, + sse_kms_key_id: self.sse_kms_key_id, } } @@ -278,6 +288,26 @@ impl S3CompatibleObjectStorage { .to_path_buf() } + fn apply_server_side_encryption( + &self, + encryption: Option, + kms_key_id: Option, + ) -> (Option, Option) { + let server_side_encryption = match encryption { + Some(S3ServerSideEncryption::Aes256) => Some(ServerSideEncryption::Aes256), + Some(S3ServerSideEncryption::AwsKms) => Some(ServerSideEncryption::AwsKms), + Some(S3ServerSideEncryption::AwsKmsDsse) => Some(ServerSideEncryption::AwsKmsDsse), + None => None, + }; + let kms_key_id = match server_side_encryption { + Some(ServerSideEncryption::AwsKms) | Some(ServerSideEncryption::AwsKmsDsse) => { + kms_key_id + } + _ => None, + }; + (server_side_encryption, kms_key_id) + } + async fn put_single_part_single_try<'a>( &'a self, bucket: &'a str, @@ -289,21 +319,28 @@ impl S3CompatibleObjectStorage { .byte_stream() .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - self.s3_client + let mut put_object_request = self + .s3_client .put_object() .bucket(bucket) .key(key) .body(body) - .content_length(len as i64) - .send() - .await - .map_err(|sdk_error| { - if sdk_error.is_retryable() { - Retry::Transient(StorageError::from(sdk_error)) - } else { - Retry::Permanent(StorageError::from(sdk_error)) - } - })?; + .content_length(len as i64); + let (s3_sse, kms_key_id) = self + .apply_server_side_encryption(self.server_side_encryption, self.sse_kms_key_id.clone()); + if let Some(encryption) = s3_sse { + put_object_request = put_object_request.server_side_encryption(encryption); + if let Some(kms_key_id) = kms_key_id { + put_object_request = put_object_request.ssekms_key_id(kms_key_id); + } + } + put_object_request.send().await.map_err(|sdk_error| { + if sdk_error.is_retryable() { + Retry::Transient(StorageError::from(sdk_error)) + } else { + Retry::Permanent(StorageError::from(sdk_error)) + } + })?; crate::STORAGE_METRICS.object_storage_put_parts.inc(); crate::STORAGE_METRICS @@ -330,12 +367,22 @@ impl S3CompatibleObjectStorage { async fn create_multipart_upload(&self, key: &str) -> StorageResult { let upload_id = aws_retry(&self.retry_params, || async { - self.s3_client + let mut create_multipart_req = self + .s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .key(key) - .send() - .await + .key(key); + let (s3_sse, kms_key_id) = self.apply_server_side_encryption( + self.server_side_encryption, + self.sse_kms_key_id.clone(), + ); + if let Some(encryption) = s3_sse { + create_multipart_req = create_multipart_req.server_side_encryption(encryption); + if let Some(kms_key_id) = kms_key_id { + create_multipart_req = create_multipart_req.ssekms_key_id(kms_key_id); + } + } + create_multipart_req.send().await }) .await? .upload_id @@ -956,6 +1003,8 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, + sse_kms_key_id: None, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1011,6 +1060,8 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, + server_side_encryption: None, + sse_kms_key_id: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1052,6 +1103,8 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, + sse_kms_key_id: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1134,6 +1187,8 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, + sse_kms_key_id: None, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1227,6 +1282,8 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + server_side_encryption: None, + sse_kms_key_id: None, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3]))