Skip to content

Commit

Permalink
Always set application_name when connecting to postgres
Browse files Browse the repository at this point in the history
Nexus application_name is peerdb_nexus since those queries can be user generated

Remove unused code for creating pool over ssh tunnel
  • Loading branch information
serprex committed Jan 29, 2024
1 parent 3c27655 commit c4919bf
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 61 deletions.
3 changes: 2 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
}

runtimeParams := connConfig.Config.RuntimeParams
runtimeParams["application_name"] = "peerdb_query_executor"
runtimeParams["application_name"] = "peerdb"
runtimeParams["idle_in_transaction_session_timeout"] = "0"
runtimeParams["statement_timeout"] = "0"

Expand All @@ -66,6 +66,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
}

// ensure that replication is set to database
replConfig.Config.RuntimeParams["application_name"] = "peerdb"
replConfig.Config.RuntimeParams["replication"] = "database"
replConfig.Config.RuntimeParams["bytea_output"] = "hex"

Expand Down
58 changes: 1 addition & 57 deletions flow/connectors/postgres/ssh_wrapped_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/crypto/ssh"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand Down Expand Up @@ -93,62 +92,6 @@ func (tunnel *SSHTunnel) Close() {
}
}

func (tunnel *SSHTunnel) NewPostgresPoolFromPostgresConfig(
ctx context.Context,
pgConfig *protos.PostgresConfig,
) (*pgxpool.Pool, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

poolConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, err
}

return tunnel.NewPostgresPoolFromConfig(ctx, poolConfig)
}

func (tunnel *SSHTunnel) NewPostgresPoolFromConfig(
ctx context.Context,
poolConfig *pgxpool.Config,
) (*pgxpool.Pool, error) {
// set pool size to 3 to avoid connection pool exhaustion
poolConfig.MaxConns = 3

if tunnel.sshClient != nil {
poolConfig.ConnConfig.DialFunc = func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := tunnel.sshClient.Dial(network, addr)
if err != nil {
return nil, err
}
return &noDeadlineConn{Conn: conn}, nil
}
}

pool, err := pgxpool.NewWithConfig(tunnel.ctx, poolConfig)
if err != nil {
slog.Error("Failed to create pool:", slog.Any("error", err))
return nil, err
}

host := poolConfig.ConnConfig.Host
err = retryWithBackoff(func() error {
err = pool.Ping(tunnel.ctx)
if err != nil {
slog.Error("Failed to ping pool", slog.Any("error", err), slog.String("host", host))
return err
}
return nil
}, 5, 5*time.Second)

if err != nil {
slog.Error("Failed to create pool", slog.Any("error", err), slog.String("host", host))
pool.Close()
return nil, err
}

return pool, nil
}

func (tunnel *SSHTunnel) NewPostgresConnFromPostgresConfig(
ctx context.Context,
pgConfig *protos.PostgresConfig,
Expand All @@ -159,6 +102,7 @@ func (tunnel *SSHTunnel) NewPostgresConnFromPostgresConfig(
if err != nil {
return nil, err
}
connConfig.RuntimeParams["application_name"] = "peerdb"

return tunnel.NewPostgresConnFromConfig(ctx, connConfig)
}
Expand Down
6 changes: 3 additions & 3 deletions nexus/postgres-connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
pub fn get_pg_connection_string(config: &PostgresConfig) -> String {
let mut connection_string = String::from("postgres://");

connection_string.push_str(&config.user);
connection_string.push_str(&urlencoding::encode(&config.user));
if !config.password.is_empty() {
connection_string.push(':');
connection_string.push_str(&urlencoding::encode(&config.password));
Expand All @@ -67,8 +67,8 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String {
// Add the timeout as a query parameter, sslmode changes here appear to be useless
write!(
connection_string,
"@{}:{}/{}?connect_timeout=15",
config.host, config.port, config.database
"@{}:{}/{}?connect_timeout=15&application_name=peerdb_nexus",
config.host, config.port, urlencoding::encode(&config.database)
)
.ok();

Expand Down

0 comments on commit c4919bf

Please sign in to comment.