From 3055e227b2865c39f1eacadd2c7ea1857a6d3316 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 18 Oct 2023 14:33:58 +0000 Subject: [PATCH] improve connection params for postgres connector --- flow/connectors/postgres/postgres.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 199469bf56..a4df6246d3 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -52,7 +52,17 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) // create a separate connection pool for non-replication queries as replication connections cannot // be used for extended query protocol, i.e. prepared statements - pool, err := pgxpool.New(ctx, connectionString) + connConfig, err := pgxpool.ParseConfig(connectionString) + if err != nil { + return nil, fmt.Errorf("failed to parse connection string: %w", err) + } + + runtimeParams := connConfig.ConnConfig.RuntimeParams + runtimeParams["application_name"] = "peerdb_query_executor" + runtimeParams["idle_in_transaction_session_timeout"] = "0" + runtimeParams["statement_timeout"] = "0" + + pool, err := pgxpool.NewWithConfig(ctx, connConfig) if err != nil { return nil, fmt.Errorf("failed to create connection pool: %w", err) } @@ -63,16 +73,16 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) } // ensure that replication is set to database - connConfig, err := pgxpool.ParseConfig(connectionString) + replConnConfig, err := pgxpool.ParseConfig(connectionString) if err != nil { return nil, fmt.Errorf("failed to parse connection string: %w", err) } - connConfig.ConnConfig.RuntimeParams["replication"] = "database" - connConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" - connConfig.MaxConns = 1 + replConnConfig.ConnConfig.RuntimeParams["replication"] = "database" + replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" + replConnConfig.MaxConns = 1 - replPool, err := pgxpool.NewWithConfig(ctx, connConfig) + replPool, err := pgxpool.NewWithConfig(ctx, replConnConfig) if err != nil { return nil, fmt.Errorf("failed to create connection pool: %w", err) }