Skip to content

Commit

Permalink
sql: support ssh fields in postgres create peer
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 8, 2024
1 parent 829e753 commit 1d5ece9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
// create a separate connection pool for non-replication queries as replication connections cannot
// be used for extended query protocol, i.e. prepared statements
connConfig, err := pgx.ParseConfig(connectionString)
replConfig := connConfig.Copy()
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

replConfig := connConfig.Copy()
runtimeParams := connConfig.Config.RuntimeParams
runtimeParams["idle_in_transaction_session_timeout"] = "0"
runtimeParams["statement_timeout"] = "0"
Expand Down
20 changes: 18 additions & 2 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use pt::{
peerdb_peers::{
peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, GcpServiceAccount,
KafkaConfig, MongoConfig, Peer, PostgresConfig, PubSubConfig, S3Config, SnowflakeConfig,
SqlServerConfig,
SqlServerConfig, SshConfig,
},
};
use qrep::process_options;
Expand Down Expand Up @@ -646,6 +646,19 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu
Config::MongoConfig(mongo_config)
}
DbType::Postgres => {
let ssh_fields: Option<SshConfig> = match opts.get("ssh_config") {
Some(ssh_config) => {
let ssh_config_str = ssh_config.to_string();
if ssh_config_str.is_empty() {
None
} else {
serde_json::from_str(&ssh_config_str)
.context("failed to deserialize ssh_config")?
}
}
None => None,
};

let postgres_config = PostgresConfig {
host: opts.get("host").context("no host specified")?.to_string(),
port: opts
Expand All @@ -667,8 +680,11 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu
.to_string(),
metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()),
transaction_snapshot: "".to_string(),
ssh_config: None,
ssh_config: ssh_fields
};

// log postgres config
println!("postgres_config: {:#?}", postgres_config);
Config::PostgresConfig(postgres_config)
}
DbType::S3 => {
Expand Down

0 comments on commit 1d5ece9

Please sign in to comment.