Skip to content

Commit

Permalink
update nexus
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 3, 2024
1 parent 38c6d5d commit 8a5f6cf
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
33 changes: 32 additions & 1 deletion nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use pt::{
flow_model::{FlowJob, FlowJobTableMapping, QRepFlowJob},
peerdb_peers::{
peer::Config, BigqueryConfig, DbType, EventHubConfig, MongoConfig, Peer, PostgresConfig,
S3Config, SnowflakeConfig, SqlServerConfig,
S3Config, SnowflakeConfig, SqlServerConfig,ClickhouseConfig
},
};
use qrep::process_options;
Expand Down Expand Up @@ -803,6 +803,37 @@ fn parse_db_options(
let config = Config::EventhubGroupConfig(eventhub_group_config);
Some(config)
}
DbType::Clickhouse => {
let s3_int = opts
.get("s3_integration")
.map(|s| s.to_string())
.unwrap_or_default();

let clickhouse_config = ClickhouseConfig {
host: opts.get("host").context("no host specified")?.to_string(),
port: opts
.get("port")
.context("no port specified")?
.parse::<u32>()
.context("unable to parse port as valid int")?,
user: opts
.get("user")
.context("no username specified")?
.to_string(),
password: opts
.get("password")
.context("no password specified")?
.to_string(),
database: opts
.get("database")
.context("no default database specified")?
.to_string(),
metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()),
s3_integration: s3_int,
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
}
};

Ok(config)
Expand Down
11 changes: 11 additions & 0 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ impl Catalog {
buf.reserve(config_len);
eventhub_group_config.encode(&mut buf)?;
}
Config::ClickhouseConfig(clickhouse_config) => {
let config_len = clickhouse_config.encoded_len();
buf.reserve(config_len);
clickhouse_config.encode(&mut buf)?;
}
};

buf
Expand Down Expand Up @@ -334,6 +339,12 @@ impl Catalog {
pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
Some(DbType::Clickhouse) => {
let err = format!("unable to decode {} options for peer {}", "clickhouse", name);
let clickhouse_config =
pt::peerdb_peers::ClickhouseConfig::decode(options).context(err)?;
Ok(Some(Config::ClickhouseConfig(clickhouse_config)))
}
None => Ok(None),
}
}
Expand Down

0 comments on commit 8a5f6cf

Please sign in to comment.