Skip to content

Commit

Permalink
[postgres] application name has mirror suffix (#2150)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 22, 2024
1 parent 12113b6 commit 02fe7d0
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 26 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (a *FlowableActivity) MaintainPull(
config *protos.FlowConnectionConfigs,
sessionID string,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
if err != nil {
return err
Expand Down Expand Up @@ -670,7 +671,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {

func() {
pgConfig := pgPeer.GetPostgresConfig()
pgConn, peerErr := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConn, peerErr := connpostgres.NewPostgresConnector(ctx, nil, pgConfig)
if peerErr != nil {
logger.Error(fmt.Sprintf("error creating connector for postgres peer %s with host %s: %v",
pgPeer.Name, pgConfig.Host, peerErr))
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
return nil, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, nil, pgConfig)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
return nil, errors.New("source peer config is not postgres")
}

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
pgPeer, err := connpostgres.NewPostgresConnector(ctx, nil, sourcePeerConfig)
if err != nil {
displayErr := fmt.Errorf("failed to create postgres connector: %v", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func LoadPeer(ctx context.Context, catalogPool *pgxpool.Pool, peerName string) (
func GetConnector(ctx context.Context, env map[string]string, config *protos.Peer) (Connector, error) {
switch inner := config.Config.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, inner.PostgresConfig)
return connpostgres.NewPostgresConnector(ctx, env, inner.PostgresConfig)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, inner.BigqueryConfig)
case *protos.Peer_SnowflakeConfig:
Expand Down
40 changes: 27 additions & 13 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type PostgresConnector struct {
config *protos.PostgresConfig
ssh *SSHTunnel
conn *pgx.Conn
replConfig *pgx.ConnConfig
replConn *pgx.Conn
replState *ReplState
customTypesMapping map[uint32]string
Expand All @@ -55,18 +54,23 @@ type ReplState struct {
LastOffset atomic.Int64
}

func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
func NewPostgresConnector(ctx context.Context, env map[string]string, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
logger := logger.LoggerFromCtx(ctx)
connectionString := shared.GetPGConnectionString(pgConfig)
flowNameInApplicationName, err := peerdbenv.PeerDBApplicationNamePerMirrorName(ctx, nil)
if err != nil {
logger.Error("Failed to get flow name from application name", slog.Any("error", err))
}
var flowName string
if flowNameInApplicationName {
flowName, _ = ctx.Value(shared.FlowNameKey).(string)
}
connectionString := shared.GetPGConnectionString(pgConfig, flowName)

// create a separate connection pool for non-replication queries as replication connections cannot
// be used for extended query protocol, i.e. prepared statements
connConfig, err := pgx.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

replConfig := connConfig.Copy()
runtimeParams := connConfig.Config.RuntimeParams
runtimeParams["idle_in_transaction_session_timeout"] = "0"
runtimeParams["statement_timeout"] = "0"
Expand All @@ -83,11 +87,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
return nil, fmt.Errorf("failed to create connection: %w", err)
}

// ensure that replication is set to database
replConfig.Config.RuntimeParams["replication"] = "database"
replConfig.Config.RuntimeParams["bytea_output"] = "hex"
replConfig.Config.RuntimeParams["intervalstyle"] = "postgres"

customTypeMap, err := shared.GetCustomDataTypes(ctx, conn)
if err != nil {
logger.Error("failed to get custom type map", slog.Any("error", err))
Expand All @@ -104,7 +103,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
config: pgConfig,
ssh: tunnel,
conn: conn,
replConfig: replConfig,
replConn: nil,
replState: nil,
replLock: sync.Mutex{},
customTypesMapping: customTypeMap,
Expand All @@ -116,7 +115,22 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
}

func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, error) {
conn, err := c.ssh.NewPostgresConnFromConfig(ctx, c.replConfig)
// create a separate connection pool for non-replication queries as replication connections cannot
// be used for extended query protocol, i.e. prepared statements
replConfig, err := pgx.ParseConfig(c.connStr)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

runtimeParams := replConfig.Config.RuntimeParams
runtimeParams["idle_in_transaction_session_timeout"] = "0"
runtimeParams["statement_timeout"] = "0"
// ensure that replication is set to database
replConfig.Config.RuntimeParams["replication"] = "database"
replConfig.Config.RuntimeParams["bytea_output"] = "hex"
replConfig.Config.RuntimeParams["intervalstyle"] = "postgres"

conn, err := c.ssh.NewPostgresConnFromConfig(ctx, replConfig)
if err != nil {
logger.LoggerFromCtx(ctx).Error("failed to create replication connection", "error", err)
return nil, fmt.Errorf("failed to create replication connection: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PostgresSchemaDeltaTestSuite struct {
func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), peerdbenv.GetCatalogPostgresConfigFromEnv())
connector, err := NewPostgresConnector(context.Background(), nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
require.NoError(t, err)

setupTx, err := connector.conn.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {
query := "SELECT * FROM bench.large_table"

ctx := context.Background()
connector, err := NewPostgresConnector(ctx, peerdbenv.GetCatalogPostgresConfigFromEnv())
connector, err := NewPostgresConnector(ctx, nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
func setupDB(t *testing.T) (*PostgresConnector, string) {
t.Helper()

connector, err := NewPostgresConnector(context.Background(), peerdbenv.GetCatalogPostgresConfigFromEnv())
connector, err := NewPostgresConnector(context.Background(),
nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
if err != nil {
t.Fatalf("unable to create connector: %v", err)
}
Expand Down
13 changes: 11 additions & 2 deletions flow/connectors/postgres/ssh_wrapped_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -79,13 +80,21 @@ func (tunnel *SSHTunnel) NewPostgresConnFromPostgresConfig(
ctx context.Context,
pgConfig *protos.PostgresConfig,
) (*pgx.Conn, error) {
connectionString := shared.GetPGConnectionString(pgConfig)
flowNameInApplicationName, err := peerdbenv.PeerDBApplicationNamePerMirrorName(ctx, nil)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get flow name from application name", slog.Any("error", err))
}

var flowName string
if flowNameInApplicationName {
flowName, _ = ctx.Value(shared.FlowNameKey).(string)
}
connectionString := shared.GetPGConnectionString(pgConfig, flowName)

connConfig, err := pgx.ParseConfig(connectionString)
if err != nil {
return nil, err
}
connConfig.RuntimeParams["application_name"] = "peerdb"

return tunnel.NewPostgresConnFromConfig(ctx, connConfig)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error {
func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector, error) {
t.Helper()

connector, err := connpostgres.NewPostgresConnector(context.Background(), peerdbenv.GetCatalogPostgresConfigFromEnv())
connector, err := connpostgres.NewPostgresConnector(context.Background(),
nil, peerdbenv.GetCatalogPostgresConfigFromEnv())
if err != nil {
return nil, fmt.Errorf("failed to create postgres connection: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/peerdbenv/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error)
}

func GetCatalogConnectionStringFromEnv() string {
return shared.GetPGConnectionString(GetCatalogPostgresConfigFromEnv())
return shared.GetPGConnectionString(GetCatalogPostgresConfigFromEnv(), "")
}

func GetCatalogPostgresConfigFromEnv() *protos.PostgresConfig {
Expand Down
12 changes: 12 additions & 0 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); END;`,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_APPLICATION_NAME_PER_MIRROR_NAME",
Description: "Set Postgres application_name to have mirror name as suffix for each mirror",
DefaultValue: "false",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
}

var DynamicIndex = func() map[string]int {
Expand Down Expand Up @@ -342,3 +350,7 @@ func PeerDBQueueForceTopicCreation(ctx context.Context, env map[string]string) (
func PeerDBIntervalSinceLastNormalizeThresholdMinutes(ctx context.Context, env map[string]string) (uint32, error) {
return dynamicConfUnsigned[uint32](ctx, env, "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES")
}

func PeerDBApplicationNamePerMirrorName(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_APPLICATION_NAME_PER_MIRROR_NAME")
}
10 changes: 8 additions & 2 deletions flow/shared/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@ const (
POSTGRES_15 PGVersion = 150000
)

func GetPGConnectionString(pgConfig *protos.PostgresConfig) string {
func GetPGConnectionString(pgConfig *protos.PostgresConfig, flowName string) string {
passwordEscaped := url.QueryEscape(pgConfig.Password)
applicationName := "peerdb"
if flowName != "" {
applicationName = "peerdb_" + flowName
}

// for a url like postgres://user:password@host:port/dbname
connString := fmt.Sprintf(
"postgres://%s:%s@%s:%d/%s?application_name=peerdb&client_encoding=UTF8",
"postgres://%s:%s@%s:%d/%s?application_name=%s&client_encoding=UTF8",
pgConfig.User,
passwordEscaped,
pgConfig.Host,
pgConfig.Port,
pgConfig.Database,
applicationName,
)
return connString
}
Expand Down

0 comments on commit 02fe7d0

Please sign in to comment.