Skip to content

Commit

Permalink
PostgresConnector replPool: initialize lazily
Browse files Browse the repository at this point in the history
Alternative to #872

Allows #945 to test replPool code path without losing testing of codepaths without replPool
  • Loading branch information
serprex committed Jan 1, 2024
1 parent 58c929d commit 9b56c85
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 35 deletions.
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
return &protos.PeerSlotResponse{SlotData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
Expand All @@ -237,7 +237,7 @@ func (h *FlowRequestHandler) GetStatInfo(
return &protos.PeerStatResponse{StatData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), true)
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
default:
return nil, ErrUnsupportedFunctionality
}
Expand All @@ -147,7 +147,7 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -169,7 +169,7 @@ func GetCDCNormalizeConnector(ctx context.Context,
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -183,7 +183,7 @@ func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
Expand All @@ -195,7 +195,7 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -219,7 +219,7 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
// we can't decide if a PG peer should have replication permissions on it because we don't know
// what the user wants to do with it, so defaulting to being permissive.
// can be revisited in the future or we can use some UI wizardry.
return connpostgres.NewPostgresConnector(ctx, pgConfig, false)
return connpostgres.NewPostgresConnector(ctx, pgConfig)
case protos.DBType_BIGQUERY:
bqConfig := peer.GetBigqueryConfig()
if bqConfig == nil {
Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,12 @@ func (c *PostgresConnector) createSlotAndPublication(

// create slot only after we succeeded in creating publication.
if !s.SlotExists {
conn, err := c.replPool.Acquire(c.ctx)
pool, err := c.GetReplPool(c.ctx)
if err != nil {
return fmt.Errorf("[slot] error acquiring pool: %w", err)
}

conn, err := pool.Acquire(c.ctx)
if err != nil {
return fmt.Errorf("[slot] error acquiring connection: %w", err)
}
Expand Down
59 changes: 36 additions & 23 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"regexp"
"sync"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand All @@ -26,6 +27,8 @@ type PostgresConnector struct {
ctx context.Context
config *protos.PostgresConfig
pool *SSHWrappedPostgresPool
replConfig *pgxpool.Config
replMutex sync.Mutex
replPool *SSHWrappedPostgresPool
tableSchemaMapping map[string]*protos.TableSchema
customTypesMapping map[uint32]string
Expand All @@ -34,12 +37,13 @@ type PostgresConnector struct {
}

// NewPostgresConnector creates a new instance of PostgresConnector.
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig, initializeReplPool bool) (*PostgresConnector, error) {
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

// create a separate connection pool for non-replication queries as replication connections cannot
// be used for extended query protocol, i.e. prepared statements
connConfig, err := pgxpool.ParseConfig(connectionString)
replConfig := connConfig.Copy()
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}
Expand All @@ -52,6 +56,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig,
// set pool size to 3 to avoid connection pool exhaustion
connConfig.MaxConns = 3

// ensure that replication is set to database
replConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConfig.MaxConns = 1

pool, err := NewSSHWrappedPostgresPool(ctx, connConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
Expand All @@ -62,25 +71,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig,
return nil, fmt.Errorf("failed to get custom type map: %w", err)
}

// only initialize for CDCPullConnector to reduce number of idle connections
var replPool *SSHWrappedPostgresPool
if initializeReplPool {
// ensure that replication is set to database
replConnConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

replConnConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1

replPool, err = NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create replication connection pool: %w", err)
}
}

metadataSchema := "_peerdb_internal"
if pgConfig.MetadataSchema != nil {
metadataSchema = *pgConfig.MetadataSchema
Expand All @@ -93,18 +83,36 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig,
ctx: ctx,
config: pgConfig,
pool: pool,
replPool: replPool,
replConfig: replConfig,
replPool: nil,
customTypesMapping: customTypeMap,
metadataSchema: metadataSchema,
logger: *flowLog,
}, nil
}

// GetPool returns the connection pool.
// nil returns the connection pool.
func (c *PostgresConnector) GetPool() *SSHWrappedPostgresPool {
return c.pool
}

func (c *PostgresConnector) GetReplPool(ctx context.Context) (*SSHWrappedPostgresPool, error) {
c.replMutex.Lock()
defer c.replMutex.Unlock()

if c.replPool != nil {
return c.replPool, nil
}

pool, err := NewSSHWrappedPostgresPool(ctx, c.replConfig, c.config.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create replication connection pool: %w", err)
}

c.replPool = pool
return pool, nil
}

// Close closes all connections.
func (c *PostgresConnector) Close() error {
if c.pool != nil {
Expand Down Expand Up @@ -227,9 +235,14 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu

c.logger.Info("PullRecords: performed checks for slot and publication")

replPool, err := c.GetReplPool(c.ctx)
if err != nil {
return err
}

cdc, err := NewPostgresCDCSource(&PostgresCDCConfig{
AppContext: c.ctx,
Connection: c.replPool.Pool,
Connection: replPool.Pool,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
Publication: publicationName,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
}, true)
})
require.NoError(suite.T(), err)

setupTx, err := suite.connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func SetupSuite(t *testing.T, g got.G) PostgresSchemaDeltaTestSuite {
User: "postgres",
Password: "postgres",
Database: "postgres",
}, false)
})
require.NoError(t, err)

setupTx, err := connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG {
User: "postgres",
Password: "postgres",
Database: "postgres",
}, false)
})
require.NoError(t, err)

return PeerFlowE2ETestSuitePG{
Expand Down

0 comments on commit 9b56c85

Please sign in to comment.