diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 199469bf56..a4df6246d3 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -52,7 +52,17 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) // create a separate connection pool for non-replication queries as replication connections cannot // be used for extended query protocol, i.e. prepared statements - pool, err := pgxpool.New(ctx, connectionString) + connConfig, err := pgxpool.ParseConfig(connectionString) + if err != nil { + return nil, fmt.Errorf("failed to parse connection string: %w", err) + } + + runtimeParams := connConfig.ConnConfig.RuntimeParams + runtimeParams["application_name"] = "peerdb_query_executor" + runtimeParams["idle_in_transaction_session_timeout"] = "0" + runtimeParams["statement_timeout"] = "0" + + pool, err := pgxpool.NewWithConfig(ctx, connConfig) if err != nil { return nil, fmt.Errorf("failed to create connection pool: %w", err) } @@ -63,16 +73,16 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) } // ensure that replication is set to database - connConfig, err := pgxpool.ParseConfig(connectionString) + replConnConfig, err := pgxpool.ParseConfig(connectionString) if err != nil { return nil, fmt.Errorf("failed to parse connection string: %w", err) } - connConfig.ConnConfig.RuntimeParams["replication"] = "database" - connConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" - connConfig.MaxConns = 1 + replConnConfig.ConnConfig.RuntimeParams["replication"] = "database" + replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" + replConnConfig.MaxConns = 1 - replPool, err := pgxpool.NewWithConfig(ctx, connConfig) + replPool, err := pgxpool.NewWithConfig(ctx, replConnConfig) if err != nil { return nil, fmt.Errorf("failed to create connection pool: %w", err) }