Skip to content

Commit

Permalink
nexus: cleanup split out from adding KafkaConfig (#1459)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 11, 2024
1 parent a233c63 commit 7bb74c0
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 131 deletions.
35 changes: 12 additions & 23 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ fn parse_db_options(
opts.insert(&opt.name.value, val);
}

let config = match db_type {
Ok(Some(match db_type {
DbType::Bigquery => {
let pem_str = opts
.get("private_key")
Expand Down Expand Up @@ -565,8 +565,7 @@ fn parse_db_options(
.ok_or_else(|| anyhow::anyhow!("missing dataset_id in peer options"))?
.to_string(),
};
let config = Config::BigqueryConfig(bq_config);
Some(config)
Config::BigqueryConfig(bq_config)
}
DbType::Snowflake => {
let s3_int = opts
Expand Down Expand Up @@ -605,8 +604,7 @@ fn parse_db_options(
metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()),
s3_integration: s3_int,
};
let config = Config::SnowflakeConfig(snowflake_config);
Some(config)
Config::SnowflakeConfig(snowflake_config)
}
DbType::Mongo => {
let mongo_config = MongoConfig {
Expand All @@ -632,8 +630,7 @@ fn parse_db_options(
.parse::<i32>()
.context("unable to parse port as valid int")?,
};
let config = Config::MongoConfig(mongo_config);
Some(config)
Config::MongoConfig(mongo_config)
}
DbType::Postgres => {
let postgres_config = PostgresConfig {
Expand All @@ -659,8 +656,7 @@ fn parse_db_options(
transaction_snapshot: "".to_string(),
ssh_config: None,
};
let config = Config::PostgresConfig(postgres_config);
Some(config)
Config::PostgresConfig(postgres_config)
}
DbType::Eventhub => {
let subscription_id = opts
Expand Down Expand Up @@ -699,8 +695,7 @@ fn parse_db_options(
partition_count,
message_retention_in_days,
};
let config = Config::EventhubConfig(eventhub_config);
Some(config)
Config::EventhubConfig(eventhub_config)
}
DbType::S3 => {
let s3_config = S3Config {
Expand All @@ -714,8 +709,7 @@ fn parse_db_options(
role_arn: opts.get("role_arn").map(|s| s.to_string()),
endpoint: opts.get("endpoint").map(|s| s.to_string()),
};
let config = Config::S3Config(s3_config);
Some(config)
Config::S3Config(s3_config)
}
DbType::Sqlserver => {
let port_str = opts.get("port").context("port not specified")?;
Expand All @@ -736,8 +730,7 @@ fn parse_db_options(
.context("database is not specified")?
.to_string(),
};
let config = Config::SqlserverConfig(sqlserver_config);
Some(config)
Config::SqlserverConfig(sqlserver_config)
}
DbType::EventhubGroup => {
// split comma separated list of columns and trim
Expand Down Expand Up @@ -775,8 +768,7 @@ fn parse_db_options(
eventhubs,
unnest_columns,
};
let config = Config::EventhubGroupConfig(eventhub_group_config);
Some(config)
Config::EventhubGroupConfig(eventhub_group_config)
}
DbType::Clickhouse => {
let clickhouse_config = ClickhouseConfig {
Expand Down Expand Up @@ -817,12 +809,9 @@ fn parse_db_options(
disable_tls: opts
.get("disable_tls")
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or_default()
.unwrap_or_default(),
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
Config::ClickhouseConfig(clickhouse_config)
}
};

Ok(config)
}))
}
175 changes: 67 additions & 108 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,57 +88,19 @@ impl Catalog {
pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result<i64> {
let config_blob = {
let config = peer.config.clone().context("invalid peer config")?;
let mut buf = Vec::new();

match config {
Config::SnowflakeConfig(snowflake_config) => {
let config_len = snowflake_config.encoded_len();
buf.reserve(config_len);
snowflake_config.encode(&mut buf)?;
}
Config::BigqueryConfig(bigquery_config) => {
let config_len = bigquery_config.encoded_len();
buf.reserve(config_len);
bigquery_config.encode(&mut buf)?;
}
Config::MongoConfig(mongo_config) => {
let config_len = mongo_config.encoded_len();
buf.reserve(config_len);
mongo_config.encode(&mut buf)?;
}
Config::PostgresConfig(postgres_config) => {
let config_len = postgres_config.encoded_len();
buf.reserve(config_len);
postgres_config.encode(&mut buf)?;
}
Config::EventhubConfig(eventhub_config) => {
let config_len = eventhub_config.encoded_len();
buf.reserve(config_len);
eventhub_config.encode(&mut buf)?;
}
Config::S3Config(s3_config) => {
let config_len = s3_config.encoded_len();
buf.reserve(config_len);
s3_config.encode(&mut buf)?;
}
Config::SqlserverConfig(sqlserver_config) => {
let config_len = sqlserver_config.encoded_len();
buf.reserve(config_len);
sqlserver_config.encode(&mut buf)?;
}
Config::SnowflakeConfig(snowflake_config) => snowflake_config.encode_to_vec(),
Config::BigqueryConfig(bigquery_config) => bigquery_config.encode_to_vec(),
Config::MongoConfig(mongo_config) => mongo_config.encode_to_vec(),
Config::PostgresConfig(postgres_config) => postgres_config.encode_to_vec(),
Config::EventhubConfig(eventhub_config) => eventhub_config.encode_to_vec(),
Config::S3Config(s3_config) => s3_config.encode_to_vec(),
Config::SqlserverConfig(sqlserver_config) => sqlserver_config.encode_to_vec(),
Config::EventhubGroupConfig(eventhub_group_config) => {
let config_len = eventhub_group_config.encoded_len();
buf.reserve(config_len);
eventhub_group_config.encode(&mut buf)?;
}
Config::ClickhouseConfig(clickhouse_config) => {
let config_len = clickhouse_config.encoded_len();
buf.reserve(config_len);
clickhouse_config.encode(&mut buf)?;
eventhub_group_config.encode_to_vec()
}
};

buf
Config::ClickhouseConfig(clickhouse_config) => clickhouse_config.encode_to_vec(),
}
};

let stmt = self
Expand Down Expand Up @@ -290,67 +252,64 @@ impl Catalog {
name: &str,
options: &[u8],
) -> anyhow::Result<Option<Config>> {
match db_type {
Some(DbType::Snowflake) => {
let err = format!("unable to decode {} options for peer {}", "snowflake", name);
let snowflake_config =
pt::peerdb_peers::SnowflakeConfig::decode(options).context(err)?;
Ok(Some(Config::SnowflakeConfig(snowflake_config)))
}
Some(DbType::Bigquery) => {
let err = format!("unable to decode {} options for peer {}", "bigquery", name);
let bigquery_config =
pt::peerdb_peers::BigqueryConfig::decode(options).context(err)?;
Ok(Some(Config::BigqueryConfig(bigquery_config)))
}
Some(DbType::Mongo) => {
let err = format!("unable to decode {} options for peer {}", "mongo", name);
let mongo_config = pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
Ok(Some(Config::MongoConfig(mongo_config)))
}
Some(DbType::Eventhub) => {
let err = format!("unable to decode {} options for peer {}", "eventhub", name);
let eventhub_config =
pt::peerdb_peers::EventHubConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubConfig(eventhub_config)))
}
Some(DbType::Postgres) => {
let err = format!("unable to decode {} options for peer {}", "postgres", name);
let postgres_config =
pt::peerdb_peers::PostgresConfig::decode(options).context(err)?;
Ok(Some(Config::PostgresConfig(postgres_config)))
}
Some(DbType::S3) => {
let err = format!("unable to decode {} options for peer {}", "s3", name);
let s3_config = pt::peerdb_peers::S3Config::decode(options).context(err)?;
Ok(Some(Config::S3Config(s3_config)))
}
Some(DbType::Sqlserver) => {
let err = format!("unable to decode {} options for peer {}", "sqlserver", name);
let sqlserver_config =
pt::peerdb_peers::SqlServerConfig::decode(options).context(err)?;
Ok(Some(Config::SqlserverConfig(sqlserver_config)))
}
Some(DbType::EventhubGroup) => {
let err = format!(
"unable to decode {} options for peer {}",
"eventhub_group", name
);
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
Some(DbType::Clickhouse) => {
let err = format!(
Ok(if let Some(db_type) = db_type {
let err = || {
format!(
"unable to decode {} options for peer {}",
"clickhouse", name
);
let clickhouse_config =
pt::peerdb_peers::ClickhouseConfig::decode(options).context(err)?;
Ok(Some(Config::ClickhouseConfig(clickhouse_config)))
}
None => Ok(None),
}
db_type.as_str_name(),
name
)
};
Some(match db_type {
DbType::Snowflake => {
let snowflake_config =
pt::peerdb_peers::SnowflakeConfig::decode(options).with_context(err)?;
Config::SnowflakeConfig(snowflake_config)
}
DbType::Bigquery => {
let bigquery_config =
pt::peerdb_peers::BigqueryConfig::decode(options).with_context(err)?;
Config::BigqueryConfig(bigquery_config)
}
DbType::Mongo => {
let mongo_config =
pt::peerdb_peers::MongoConfig::decode(options).with_context(err)?;
Config::MongoConfig(mongo_config)
}
DbType::Eventhub => {
let eventhub_config =
pt::peerdb_peers::EventHubConfig::decode(options).with_context(err)?;
Config::EventhubConfig(eventhub_config)
}
DbType::Postgres => {
let postgres_config =
pt::peerdb_peers::PostgresConfig::decode(options).with_context(err)?;
Config::PostgresConfig(postgres_config)
}
DbType::S3 => {
let s3_config =
pt::peerdb_peers::S3Config::decode(options).with_context(err)?;
Config::S3Config(s3_config)
}
DbType::Sqlserver => {
let sqlserver_config =
pt::peerdb_peers::SqlServerConfig::decode(options).with_context(err)?;
Config::SqlserverConfig(sqlserver_config)
}
DbType::EventhubGroup => {
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options).with_context(err)?;
Config::EventhubGroupConfig(eventhub_group_config)
}
DbType::Clickhouse => {
let clickhouse_config =
pt::peerdb_peers::ClickhouseConfig::decode(options).with_context(err)?;
Config::ClickhouseConfig(clickhouse_config)
}
})
} else {
None
})
}

async fn normalize_schema_for_table_identifier(
Expand Down

0 comments on commit 7bb74c0

Please sign in to comment.