From 26e2f90db08859d09839b56e3a00fefc058c146c Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 28 Aug 2024 11:32:17 +0800 Subject: [PATCH] feat: supports name in object storage config --- src/datanode/src/config.rs | 59 ++++++++++++++++++++++++++++++--- src/datanode/src/datanode.rs | 4 +-- src/object-store/src/manager.rs | 8 ++++- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 6ce16b779922..7cb18131ed3e 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -47,7 +47,8 @@ pub enum ObjectStoreConfig { } impl ObjectStoreConfig { - pub fn name(&self) -> &'static str { + /// Returns the object storage type name, such as `S3`, `Oss` etc. + pub fn provider_name(&self) -> &'static str { match self { Self::File(_) => "File", Self::S3(_) => "S3", @@ -56,6 +57,24 @@ impl ObjectStoreConfig { Self::Gcs(_) => "Gcs", } } + + /// Returns the object storage configuration name, return the provider name if it's empty. + pub fn config_name(&self) -> &str { + let name = match self { + // file storage doesn't support name + Self::File(_) => self.provider_name(), + Self::S3(s3) => &s3.name, + Self::Oss(oss) => &oss.name, + Self::Azblob(az) => &az.name, + Self::Gcs(gcs) => &gcs.name, + }; + + if name.trim().is_empty() { + return self.provider_name(); + } + + name + } } /// Storage engine config @@ -66,6 +85,7 @@ pub struct StorageConfig { pub data_home: String, #[serde(flatten)] pub store: ObjectStoreConfig, + /// Object storage providers pub providers: Vec, } @@ -95,6 +115,7 @@ pub struct ObjectStorageCacheConfig { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct S3Config { + pub name: String, pub bucket: String, pub root: String, #[serde(skip_serializing)] @@ -109,7 +130,8 @@ pub struct S3Config { impl PartialEq for S3Config { fn eq(&self, other: &Self) -> bool { - self.bucket == other.bucket + self.name == other.name + && self.bucket == other.bucket && self.root == other.root && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret() @@ -122,6 +144,7 @@ impl PartialEq for S3Config { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct OssConfig { + pub name: String, pub bucket: String, pub root: String, #[serde(skip_serializing)] @@ -135,7 +158,8 @@ pub struct OssConfig { impl PartialEq for OssConfig { fn eq(&self, other: &Self) -> bool { - self.bucket == other.bucket + self.name == other.name + && self.bucket == other.bucket && self.root == other.root && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret() @@ -147,6 +171,7 @@ impl PartialEq for OssConfig { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct AzblobConfig { + pub name: String, pub container: String, pub root: String, #[serde(skip_serializing)] @@ -161,7 +186,8 @@ pub struct AzblobConfig { impl PartialEq for AzblobConfig { fn eq(&self, other: &Self) -> bool { - self.container == other.container + self.name == other.name + && self.container == other.container && self.root == other.root && self.account_name.expose_secret() == other.account_name.expose_secret() && self.account_key.expose_secret() == other.account_key.expose_secret() @@ -174,6 +200,7 @@ impl PartialEq for AzblobConfig { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct GcsConfig { + pub name: String, pub root: String, pub bucket: String, pub scope: String, @@ -188,7 +215,8 @@ pub struct GcsConfig { impl PartialEq for GcsConfig { fn eq(&self, other: &Self) -> bool { - self.root == other.root + self.name == other.name + && self.root == other.root && self.bucket == other.bucket && self.scope == other.scope && self.credential_path.expose_secret() == other.credential_path.expose_secret() @@ -201,6 +229,7 @@ impl PartialEq for GcsConfig { impl Default for S3Config { fn default() -> Self { Self { + name: String::default(), bucket: String::default(), root: String::default(), access_key_id: SecretString::from(String::default()), @@ -215,6 +244,7 @@ impl Default for S3Config { impl Default for OssConfig { fn default() -> Self { Self { + name: String::default(), bucket: String::default(), root: String::default(), access_key_id: SecretString::from(String::default()), @@ -228,6 +258,7 @@ impl Default for OssConfig { impl Default for AzblobConfig { fn default() -> Self { Self { + name: String::default(), container: String::default(), root: String::default(), account_name: SecretString::from(String::default()), @@ -242,6 +273,7 @@ impl Default for AzblobConfig { impl Default for GcsConfig { fn default() -> Self { Self { + name: String::default(), root: String::default(), bucket: String::default(), scope: String::default(), @@ -355,6 +387,23 @@ mod tests { let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap(); } + #[test] + fn test_config_name() { + let object_store_config = ObjectStoreConfig::default(); + assert_eq!("File", object_store_config.config_name()); + + let s3_config = ObjectStoreConfig::S3(S3Config::default()); + assert_eq!("S3", s3_config.config_name()); + assert_eq!("S3", s3_config.provider_name()); + + let s3_config = ObjectStoreConfig::S3(S3Config { + name: "test".to_string(), + ..Default::default() + }); + assert_eq!("test", s3_config.config_name()); + assert_eq!("S3", s3_config.provider_name()); + } + #[test] fn test_secstr() { let toml_str = r#" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index eca551a4a0d4..09bf901d37f0 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -273,11 +273,11 @@ impl DatanodeBuilder { /// Builds [ObjectStoreManager] from [StorageConfig]. pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result { let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?; - let default_name = cfg.store.name(); + let default_name = cfg.store.config_name(); let mut object_store_manager = ObjectStoreManager::new(default_name, object_store); for store in &cfg.providers { object_store_manager.add( - store.name(), + store.config_name(), store::new_object_store(store.clone(), &cfg.data_home).await?, ); } diff --git a/src/object-store/src/manager.rs b/src/object-store/src/manager.rs index 6513923b52cc..b73147baf18e 100644 --- a/src/object-store/src/manager.rs +++ b/src/object-store/src/manager.rs @@ -37,8 +37,13 @@ impl ObjectStoreManager { } /// Adds an object store to the manager. + /// # Safety + /// Panic when the name already exists pub fn add(&mut self, name: &str, object_store: ObjectStore) { - self.stores.insert(name.to_lowercase(), object_store); + let name = name.to_lowercase(); + if self.stores.insert(name.clone(), object_store).is_some() { + panic!("Object storage provider name conflicts, the `{name}` already exists"); + } } /// Finds an object store corresponding to the name. @@ -46,6 +51,7 @@ impl ObjectStoreManager { self.stores.get(&name.to_lowercase()) } + /// Returns the default object storage pub fn default_object_store(&self) -> &ObjectStore { &self.default_object_store }