From 7bb74c0849e2c58a1d3ba4360db1f22b4f379fba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 11 Mar 2024 17:06:40 +0000 Subject: [PATCH] nexus: cleanup split out from adding KafkaConfig (#1459) --- nexus/analyzer/src/lib.rs | 35 +++----- nexus/catalog/src/lib.rs | 175 +++++++++++++++----------------------- 2 files changed, 79 insertions(+), 131 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 9f67e38132..81468018a8 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -511,7 +511,7 @@ fn parse_db_options( opts.insert(&opt.name.value, val); } - let config = match db_type { + Ok(Some(match db_type { DbType::Bigquery => { let pem_str = opts .get("private_key") @@ -565,8 +565,7 @@ fn parse_db_options( .ok_or_else(|| anyhow::anyhow!("missing dataset_id in peer options"))? .to_string(), }; - let config = Config::BigqueryConfig(bq_config); - Some(config) + Config::BigqueryConfig(bq_config) } DbType::Snowflake => { let s3_int = opts @@ -605,8 +604,7 @@ fn parse_db_options( metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()), s3_integration: s3_int, }; - let config = Config::SnowflakeConfig(snowflake_config); - Some(config) + Config::SnowflakeConfig(snowflake_config) } DbType::Mongo => { let mongo_config = MongoConfig { @@ -632,8 +630,7 @@ fn parse_db_options( .parse::() .context("unable to parse port as valid int")?, }; - let config = Config::MongoConfig(mongo_config); - Some(config) + Config::MongoConfig(mongo_config) } DbType::Postgres => { let postgres_config = PostgresConfig { @@ -659,8 +656,7 @@ fn parse_db_options( transaction_snapshot: "".to_string(), ssh_config: None, }; - let config = Config::PostgresConfig(postgres_config); - Some(config) + Config::PostgresConfig(postgres_config) } DbType::Eventhub => { let subscription_id = opts @@ -699,8 +695,7 @@ fn parse_db_options( partition_count, message_retention_in_days, }; - let config = Config::EventhubConfig(eventhub_config); - Some(config) + Config::EventhubConfig(eventhub_config) } DbType::S3 => { let s3_config = S3Config { @@ -714,8 +709,7 @@ fn parse_db_options( role_arn: opts.get("role_arn").map(|s| s.to_string()), endpoint: opts.get("endpoint").map(|s| s.to_string()), }; - let config = Config::S3Config(s3_config); - Some(config) + Config::S3Config(s3_config) } DbType::Sqlserver => { let port_str = opts.get("port").context("port not specified")?; @@ -736,8 +730,7 @@ fn parse_db_options( .context("database is not specified")? .to_string(), }; - let config = Config::SqlserverConfig(sqlserver_config); - Some(config) + Config::SqlserverConfig(sqlserver_config) } DbType::EventhubGroup => { // split comma separated list of columns and trim @@ -775,8 +768,7 @@ fn parse_db_options( eventhubs, unnest_columns, }; - let config = Config::EventhubGroupConfig(eventhub_group_config); - Some(config) + Config::EventhubGroupConfig(eventhub_group_config) } DbType::Clickhouse => { let clickhouse_config = ClickhouseConfig { @@ -817,12 +809,9 @@ fn parse_db_options( disable_tls: opts .get("disable_tls") .and_then(|s| s.parse::().ok()) - .unwrap_or_default() + .unwrap_or_default(), }; - let config = Config::ClickhouseConfig(clickhouse_config); - Some(config) + Config::ClickhouseConfig(clickhouse_config) } - }; - - Ok(config) + })) } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 123e29a0d9..ce3eb7e778 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -88,57 +88,19 @@ impl Catalog { pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result { let config_blob = { let config = peer.config.clone().context("invalid peer config")?; - let mut buf = Vec::new(); - match config { - Config::SnowflakeConfig(snowflake_config) => { - let config_len = snowflake_config.encoded_len(); - buf.reserve(config_len); - snowflake_config.encode(&mut buf)?; - } - Config::BigqueryConfig(bigquery_config) => { - let config_len = bigquery_config.encoded_len(); - buf.reserve(config_len); - bigquery_config.encode(&mut buf)?; - } - Config::MongoConfig(mongo_config) => { - let config_len = mongo_config.encoded_len(); - buf.reserve(config_len); - mongo_config.encode(&mut buf)?; - } - Config::PostgresConfig(postgres_config) => { - let config_len = postgres_config.encoded_len(); - buf.reserve(config_len); - postgres_config.encode(&mut buf)?; - } - Config::EventhubConfig(eventhub_config) => { - let config_len = eventhub_config.encoded_len(); - buf.reserve(config_len); - eventhub_config.encode(&mut buf)?; - } - Config::S3Config(s3_config) => { - let config_len = s3_config.encoded_len(); - buf.reserve(config_len); - s3_config.encode(&mut buf)?; - } - Config::SqlserverConfig(sqlserver_config) => { - let config_len = sqlserver_config.encoded_len(); - buf.reserve(config_len); - sqlserver_config.encode(&mut buf)?; - } + Config::SnowflakeConfig(snowflake_config) => snowflake_config.encode_to_vec(), + Config::BigqueryConfig(bigquery_config) => bigquery_config.encode_to_vec(), + Config::MongoConfig(mongo_config) => mongo_config.encode_to_vec(), + Config::PostgresConfig(postgres_config) => postgres_config.encode_to_vec(), + Config::EventhubConfig(eventhub_config) => eventhub_config.encode_to_vec(), + Config::S3Config(s3_config) => s3_config.encode_to_vec(), + Config::SqlserverConfig(sqlserver_config) => sqlserver_config.encode_to_vec(), Config::EventhubGroupConfig(eventhub_group_config) => { - let config_len = eventhub_group_config.encoded_len(); - buf.reserve(config_len); - eventhub_group_config.encode(&mut buf)?; - } - Config::ClickhouseConfig(clickhouse_config) => { - let config_len = clickhouse_config.encoded_len(); - buf.reserve(config_len); - clickhouse_config.encode(&mut buf)?; + eventhub_group_config.encode_to_vec() } - }; - - buf + Config::ClickhouseConfig(clickhouse_config) => clickhouse_config.encode_to_vec(), + } }; let stmt = self @@ -290,67 +252,64 @@ impl Catalog { name: &str, options: &[u8], ) -> anyhow::Result> { - match db_type { - Some(DbType::Snowflake) => { - let err = format!("unable to decode {} options for peer {}", "snowflake", name); - let snowflake_config = - pt::peerdb_peers::SnowflakeConfig::decode(options).context(err)?; - Ok(Some(Config::SnowflakeConfig(snowflake_config))) - } - Some(DbType::Bigquery) => { - let err = format!("unable to decode {} options for peer {}", "bigquery", name); - let bigquery_config = - pt::peerdb_peers::BigqueryConfig::decode(options).context(err)?; - Ok(Some(Config::BigqueryConfig(bigquery_config))) - } - Some(DbType::Mongo) => { - let err = format!("unable to decode {} options for peer {}", "mongo", name); - let mongo_config = pt::peerdb_peers::MongoConfig::decode(options).context(err)?; - Ok(Some(Config::MongoConfig(mongo_config))) - } - Some(DbType::Eventhub) => { - let err = format!("unable to decode {} options for peer {}", "eventhub", name); - let eventhub_config = - pt::peerdb_peers::EventHubConfig::decode(options).context(err)?; - Ok(Some(Config::EventhubConfig(eventhub_config))) - } - Some(DbType::Postgres) => { - let err = format!("unable to decode {} options for peer {}", "postgres", name); - let postgres_config = - pt::peerdb_peers::PostgresConfig::decode(options).context(err)?; - Ok(Some(Config::PostgresConfig(postgres_config))) - } - Some(DbType::S3) => { - let err = format!("unable to decode {} options for peer {}", "s3", name); - let s3_config = pt::peerdb_peers::S3Config::decode(options).context(err)?; - Ok(Some(Config::S3Config(s3_config))) - } - Some(DbType::Sqlserver) => { - let err = format!("unable to decode {} options for peer {}", "sqlserver", name); - let sqlserver_config = - pt::peerdb_peers::SqlServerConfig::decode(options).context(err)?; - Ok(Some(Config::SqlserverConfig(sqlserver_config))) - } - Some(DbType::EventhubGroup) => { - let err = format!( - "unable to decode {} options for peer {}", - "eventhub_group", name - ); - let eventhub_group_config = - pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?; - Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) - } - Some(DbType::Clickhouse) => { - let err = format!( + Ok(if let Some(db_type) = db_type { + let err = || { + format!( "unable to decode {} options for peer {}", - "clickhouse", name - ); - let clickhouse_config = - pt::peerdb_peers::ClickhouseConfig::decode(options).context(err)?; - Ok(Some(Config::ClickhouseConfig(clickhouse_config))) - } - None => Ok(None), - } + db_type.as_str_name(), + name + ) + }; + Some(match db_type { + DbType::Snowflake => { + let snowflake_config = + pt::peerdb_peers::SnowflakeConfig::decode(options).with_context(err)?; + Config::SnowflakeConfig(snowflake_config) + } + DbType::Bigquery => { + let bigquery_config = + pt::peerdb_peers::BigqueryConfig::decode(options).with_context(err)?; + Config::BigqueryConfig(bigquery_config) + } + DbType::Mongo => { + let mongo_config = + pt::peerdb_peers::MongoConfig::decode(options).with_context(err)?; + Config::MongoConfig(mongo_config) + } + DbType::Eventhub => { + let eventhub_config = + pt::peerdb_peers::EventHubConfig::decode(options).with_context(err)?; + Config::EventhubConfig(eventhub_config) + } + DbType::Postgres => { + let postgres_config = + pt::peerdb_peers::PostgresConfig::decode(options).with_context(err)?; + Config::PostgresConfig(postgres_config) + } + DbType::S3 => { + let s3_config = + pt::peerdb_peers::S3Config::decode(options).with_context(err)?; + Config::S3Config(s3_config) + } + DbType::Sqlserver => { + let sqlserver_config = + pt::peerdb_peers::SqlServerConfig::decode(options).with_context(err)?; + Config::SqlserverConfig(sqlserver_config) + } + DbType::EventhubGroup => { + let eventhub_group_config = + pt::peerdb_peers::EventHubGroupConfig::decode(options).with_context(err)?; + Config::EventhubGroupConfig(eventhub_group_config) + } + DbType::Clickhouse => { + let clickhouse_config = + pt::peerdb_peers::ClickhouseConfig::decode(options).with_context(err)?; + Config::ClickhouseConfig(clickhouse_config) + } + }) + } else { + None + }) } async fn normalize_schema_for_table_identifier(