Skip to content

Commit

Permalink
Clickhouse add protos (#968)
Browse files Browse the repository at this point in the history
add protos as a first step for clickhouse connectors
  • Loading branch information
pankaj-peerdb authored Jan 4, 2024
1 parent 5b0c8b3 commit 59d630f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 1 deletion.
32 changes: 31 additions & 1 deletion nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use pt::{
flow_model::{FlowJob, FlowJobTableMapping, QRepFlowJob},
peerdb_peers::{
peer::Config, BigqueryConfig, DbType, EventHubConfig, MongoConfig, Peer, PostgresConfig,
S3Config, SnowflakeConfig, SqlServerConfig,
S3Config, SnowflakeConfig, SqlServerConfig,ClickhouseConfig
},
};
use qrep::process_options;
Expand Down Expand Up @@ -793,6 +793,36 @@ fn parse_db_options(
let config = Config::EventhubGroupConfig(eventhub_group_config);
Some(config)
}
DbType::Clickhouse => {
let s3_int = opts
.get("s3_integration")
.map(|s| s.to_string())
.unwrap_or_default();

let clickhouse_config = ClickhouseConfig {
host: opts.get("host").context("no host specified")?.to_string(),
port: opts
.get("port")
.context("no port specified")?
.parse::<u32>()
.context("unable to parse port as valid int")?,
user: opts
.get("user")
.context("no username specified")?
.to_string(),
password: opts
.get("password")
.context("no password specified")?
.to_string(),
database: opts
.get("database")
.context("no default database specified")?
.to_string(),
s3_integration: s3_int,
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
}
};

Ok(config)
Expand Down
11 changes: 11 additions & 0 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ impl Catalog {
buf.reserve(config_len);
eventhub_group_config.encode(&mut buf)?;
}
Config::ClickhouseConfig(clickhouse_config) => {
let config_len = clickhouse_config.encoded_len();
buf.reserve(config_len);
clickhouse_config.encode(&mut buf)?;
}
};

buf
Expand Down Expand Up @@ -334,6 +339,12 @@ impl Catalog {
pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
Some(DbType::Clickhouse) => {
let err = format!("unable to decode {} options for peer {}", "clickhouse", name);
let clickhouse_config =
pt::peerdb_peers::ClickhouseConfig::decode(options).context(err)?;
Ok(Some(Config::ClickhouseConfig(clickhouse_config)))
}
None => Ok(None),
}
}
Expand Down
11 changes: 11 additions & 0 deletions protos/peers.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ message S3Config {
PostgresConfig metadata_db = 7;
}

message ClickhouseConfig{
string host = 1;
uint32 port = 2;
string user = 3;
string password = 4;
string database = 5;
string s3_integration = 6; // staging to store avro files
}

message SqlServerConfig {
string server = 1;
uint32 port = 2;
Expand All @@ -106,6 +115,7 @@ enum DBType {
S3 = 5;
SQLSERVER = 6;
EVENTHUB_GROUP = 7;
CLICKHOUSE = 8;
}

message Peer {
Expand All @@ -120,5 +130,6 @@ message Peer {
S3Config s3_config = 8;
SqlServerConfig sqlserver_config = 9;
EventHubGroupConfig eventhub_group_config = 10;
ClickhouseConfig clickhouse_config = 11;
}
}
2 changes: 2 additions & 0 deletions ui/components/PeerTypeComponent.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export const DBTypeToGoodText = (ptype: DBType) => {
return 'SQL Server';
case DBType.MONGO:
return 'MongoDB';
case DBType.CLICKHOUSE:
return 'Clickhouse';
case DBType.UNRECOGNIZED:
return 'Unrecognised';
}
Expand Down

0 comments on commit 59d630f

Please sign in to comment.