diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 03ee0c02b1..3642289756 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -62,11 +62,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) // create a separate connection pool for non-replication queries as replication connections cannot // be used for extended query protocol, i.e. prepared statements connConfig, err := pgx.ParseConfig(connectionString) - replConfig := connConfig.Copy() if err != nil { return nil, fmt.Errorf("failed to parse connection string: %w", err) } + replConfig := connConfig.Copy() runtimeParams := connConfig.Config.RuntimeParams runtimeParams["idle_in_transaction_session_timeout"] = "0" runtimeParams["statement_timeout"] = "0" diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index b0ff741612..4bde1cbcec 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -11,7 +11,7 @@ use pt::{ peerdb_peers::{ peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, GcpServiceAccount, KafkaConfig, MongoConfig, Peer, PostgresConfig, PubSubConfig, S3Config, SnowflakeConfig, - SqlServerConfig, + SqlServerConfig, SshConfig, }, }; use qrep::process_options; @@ -646,6 +646,19 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu Config::MongoConfig(mongo_config) } DbType::Postgres => { + let ssh_fields: Option = match opts.get("ssh_config") { + Some(ssh_config) => { + let ssh_config_str = ssh_config.to_string(); + if ssh_config_str.is_empty() { + None + } else { + serde_json::from_str(&ssh_config_str) + .context("failed to deserialize ssh_config")? + } + } + None => None, + }; + let postgres_config = PostgresConfig { host: opts.get("host").context("no host specified")?.to_string(), port: opts @@ -667,8 +680,11 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .to_string(), metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()), transaction_snapshot: "".to_string(), - ssh_config: None, + ssh_config: ssh_fields }; + + // log postgres config + println!("postgres_config: {:#?}", postgres_config); Config::PostgresConfig(postgres_config) } DbType::S3 => {