Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports name in object storage config #4630

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ pub enum ObjectStoreConfig {
}

impl ObjectStoreConfig {
pub fn name(&self) -> &'static str {
/// Returns the object storage type name, such as `S3`, `Oss` etc.
pub fn provider_name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Expand All @@ -56,6 +57,24 @@ impl ObjectStoreConfig {
Self::Gcs(_) => "Gcs",
}
}

/// Returns the object storage configuration name, return the provider name if it's empty.
pub fn config_name(&self) -> &str {
let name = match self {
// file storage doesn't support name
Self::File(_) => self.provider_name(),
Self::S3(s3) => &s3.name,
Self::Oss(oss) => &oss.name,
Self::Azblob(az) => &az.name,
Self::Gcs(gcs) => &gcs.name,
};

if name.trim().is_empty() {
return self.provider_name();
}

name
}
}

/// Storage engine config
Expand All @@ -66,6 +85,7 @@ pub struct StorageConfig {
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
/// Object storage providers
pub providers: Vec<ObjectStoreConfig>,
}

Expand Down Expand Up @@ -95,6 +115,7 @@ pub struct ObjectStorageCacheConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
Expand All @@ -109,7 +130,8 @@ pub struct S3Config {

impl PartialEq for S3Config {
fn eq(&self, other: &Self) -> bool {
self.bucket == other.bucket
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
Expand All @@ -122,6 +144,7 @@ impl PartialEq for S3Config {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
Expand All @@ -135,7 +158,8 @@ pub struct OssConfig {

impl PartialEq for OssConfig {
fn eq(&self, other: &Self) -> bool {
self.bucket == other.bucket
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
Expand All @@ -147,6 +171,7 @@ impl PartialEq for OssConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
pub name: String,
pub container: String,
pub root: String,
#[serde(skip_serializing)]
Expand All @@ -161,7 +186,8 @@ pub struct AzblobConfig {

impl PartialEq for AzblobConfig {
fn eq(&self, other: &Self) -> bool {
self.container == other.container
self.name == other.name
&& self.container == other.container
&& self.root == other.root
&& self.account_name.expose_secret() == other.account_name.expose_secret()
&& self.account_key.expose_secret() == other.account_key.expose_secret()
Expand All @@ -174,6 +200,7 @@ impl PartialEq for AzblobConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub name: String,
pub root: String,
pub bucket: String,
pub scope: String,
Expand All @@ -188,7 +215,8 @@ pub struct GcsConfig {

impl PartialEq for GcsConfig {
fn eq(&self, other: &Self) -> bool {
self.root == other.root
self.name == other.name
&& self.root == other.root
&& self.bucket == other.bucket
&& self.scope == other.scope
&& self.credential_path.expose_secret() == other.credential_path.expose_secret()
Expand All @@ -201,6 +229,7 @@ impl PartialEq for GcsConfig {
impl Default for S3Config {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
Expand All @@ -215,6 +244,7 @@ impl Default for S3Config {
impl Default for OssConfig {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
Expand All @@ -228,6 +258,7 @@ impl Default for OssConfig {
impl Default for AzblobConfig {
fn default() -> Self {
Self {
name: String::default(),
container: String::default(),
root: String::default(),
account_name: SecretString::from(String::default()),
Expand All @@ -242,6 +273,7 @@ impl Default for AzblobConfig {
impl Default for GcsConfig {
fn default() -> Self {
Self {
name: String::default(),
root: String::default(),
bucket: String::default(),
scope: String::default(),
Expand Down Expand Up @@ -355,6 +387,23 @@ mod tests {
let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
}

#[test]
fn test_config_name() {
let object_store_config = ObjectStoreConfig::default();
assert_eq!("File", object_store_config.config_name());

let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert_eq!("S3", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());

let s3_config = ObjectStoreConfig::S3(S3Config {
name: "test".to_string(),
..Default::default()
});
assert_eq!("test", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
}

#[test]
fn test_secstr() {
let toml_str = r#"
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ impl DatanodeBuilder {
/// Builds [ObjectStoreManager] from [StorageConfig].
pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
let default_name = cfg.store.name();
let default_name = cfg.store.config_name();
let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
for store in &cfg.providers {
object_store_manager.add(
store.name(),
store.config_name(),
store::new_object_store(store.clone(), &cfg.data_home).await?,
);
}
Expand Down
8 changes: 7 additions & 1 deletion src/object-store/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,21 @@ impl ObjectStoreManager {
}

/// Adds an object store to the manager.
/// # Safety
/// Panic when the name already exists
pub fn add(&mut self, name: &str, object_store: ObjectStore) {
self.stores.insert(name.to_lowercase(), object_store);
let name = name.to_lowercase();
if self.stores.insert(name.clone(), object_store).is_some() {
panic!("Object storage provider name conflicts, the `{name}` already exists");
}
}

/// Finds an object store corresponding to the name.
pub fn find(&self, name: &str) -> Option<&ObjectStore> {
self.stores.get(&name.to_lowercase())
}

/// Returns the default object storage
pub fn default_object_store(&self) -> &ObjectStore {
&self.default_object_store
}
Expand Down
Loading