Skip to content

Commit

Permalink
feat: supports name in object storage config
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Aug 28, 2024
1 parent 4ea4122 commit 26e2f90
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
59 changes: 54 additions & 5 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ pub enum ObjectStoreConfig {
}

impl ObjectStoreConfig {
pub fn name(&self) -> &'static str {
/// Returns the object storage type name, such as `S3`, `Oss` etc.
pub fn provider_name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Expand All @@ -56,6 +57,24 @@ impl ObjectStoreConfig {
Self::Gcs(_) => "Gcs",
}
}

/// Returns the object storage configuration name, return the provider name if it's empty.
pub fn config_name(&self) -> &str {
let name = match self {
// file storage doesn't support name
Self::File(_) => self.provider_name(),
Self::S3(s3) => &s3.name,
Self::Oss(oss) => &oss.name,
Self::Azblob(az) => &az.name,
Self::Gcs(gcs) => &gcs.name,
};

if name.trim().is_empty() {
return self.provider_name();
}

name
}
}

/// Storage engine config
Expand All @@ -66,6 +85,7 @@ pub struct StorageConfig {
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
/// Object storage providers
pub providers: Vec<ObjectStoreConfig>,
}

Expand Down Expand Up @@ -95,6 +115,7 @@ pub struct ObjectStorageCacheConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
Expand All @@ -109,7 +130,8 @@ pub struct S3Config {

impl PartialEq for S3Config {
fn eq(&self, other: &Self) -> bool {
self.bucket == other.bucket
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
Expand All @@ -122,6 +144,7 @@ impl PartialEq for S3Config {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
Expand All @@ -135,7 +158,8 @@ pub struct OssConfig {

impl PartialEq for OssConfig {
fn eq(&self, other: &Self) -> bool {
self.bucket == other.bucket
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
Expand All @@ -147,6 +171,7 @@ impl PartialEq for OssConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
pub name: String,
pub container: String,
pub root: String,
#[serde(skip_serializing)]
Expand All @@ -161,7 +186,8 @@ pub struct AzblobConfig {

impl PartialEq for AzblobConfig {
fn eq(&self, other: &Self) -> bool {
self.container == other.container
self.name == other.name
&& self.container == other.container
&& self.root == other.root
&& self.account_name.expose_secret() == other.account_name.expose_secret()
&& self.account_key.expose_secret() == other.account_key.expose_secret()
Expand All @@ -174,6 +200,7 @@ impl PartialEq for AzblobConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub name: String,
pub root: String,
pub bucket: String,
pub scope: String,
Expand All @@ -188,7 +215,8 @@ pub struct GcsConfig {

impl PartialEq for GcsConfig {
fn eq(&self, other: &Self) -> bool {
self.root == other.root
self.name == other.name
&& self.root == other.root
&& self.bucket == other.bucket
&& self.scope == other.scope
&& self.credential_path.expose_secret() == other.credential_path.expose_secret()
Expand All @@ -201,6 +229,7 @@ impl PartialEq for GcsConfig {
impl Default for S3Config {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
Expand All @@ -215,6 +244,7 @@ impl Default for S3Config {
impl Default for OssConfig {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
Expand All @@ -228,6 +258,7 @@ impl Default for OssConfig {
impl Default for AzblobConfig {
fn default() -> Self {
Self {
name: String::default(),
container: String::default(),
root: String::default(),
account_name: SecretString::from(String::default()),
Expand All @@ -242,6 +273,7 @@ impl Default for AzblobConfig {
impl Default for GcsConfig {
fn default() -> Self {
Self {
name: String::default(),
root: String::default(),
bucket: String::default(),
scope: String::default(),
Expand Down Expand Up @@ -355,6 +387,23 @@ mod tests {
let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
}

#[test]
fn test_config_name() {
let object_store_config = ObjectStoreConfig::default();
assert_eq!("File", object_store_config.config_name());

let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert_eq!("S3", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());

let s3_config = ObjectStoreConfig::S3(S3Config {
name: "test".to_string(),
..Default::default()
});
assert_eq!("test", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
}

#[test]
fn test_secstr() {
let toml_str = r#"
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ impl DatanodeBuilder {
/// Builds [ObjectStoreManager] from [StorageConfig].
pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
let default_name = cfg.store.name();
let default_name = cfg.store.config_name();
let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
for store in &cfg.providers {
object_store_manager.add(
store.name(),
store.config_name(),
store::new_object_store(store.clone(), &cfg.data_home).await?,
);
}
Expand Down
8 changes: 7 additions & 1 deletion src/object-store/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,21 @@ impl ObjectStoreManager {
}

/// Adds an object store to the manager.
/// # Safety
/// Panic when the name already exists
pub fn add(&mut self, name: &str, object_store: ObjectStore) {
self.stores.insert(name.to_lowercase(), object_store);
let name = name.to_lowercase();
if self.stores.insert(name.clone(), object_store).is_some() {
panic!("Object storage provider name conflicts, the `{name}` already exists");
}
}

/// Finds an object store corresponding to the name.
pub fn find(&self, name: &str) -> Option<&ObjectStore> {
self.stores.get(&name.to_lowercase())
}

/// Returns the default object storage
pub fn default_object_store(&self) -> &ObjectStore {
&self.default_object_store
}
Expand Down

0 comments on commit 26e2f90

Please sign in to comment.