Skip to content

Commit

Permalink
only initialize replication pool when needed in CDCPull (#872)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 22, 2023
1 parent 08861aa commit 5d32604
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 25 deletions.
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
return &protos.PeerSlotResponse{SlotData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
Expand All @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetStatInfo(
return &protos.PeerStatResponse{StatData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
Expand Down
15 changes: 9 additions & 6 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), true)
default:
return nil, ErrUnsupportedFunctionality
}
Expand All @@ -150,7 +150,7 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -172,7 +172,7 @@ func GetCDCNormalizeConnector(ctx context.Context,
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -186,7 +186,7 @@ func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
Expand All @@ -198,7 +198,7 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -219,7 +219,10 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
if pgConfig == nil {
return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name)
}
return connpostgres.NewPostgresConnector(ctx, pgConfig)
// we can't decide if a PG peer should have replication permissions on it because we don't know
// what the user wants to do with it, so defaulting to being permissive.
// can be revisited in the future or we can use some UI wizardry.
return connpostgres.NewPostgresConnector(ctx, pgConfig, false)
case protos.DBType_BIGQUERY:
bqConfig := peer.GetBigqueryConfig()
if bqConfig == nil {
Expand Down
30 changes: 16 additions & 14 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type PostgresConnector struct {
}

// NewPostgresConnector creates a new instance of PostgresConnector.
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig, initializeReplPool bool) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

// create a separate connection pool for non-replication queries as replication connections cannot
Expand Down Expand Up @@ -62,21 +62,23 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
return nil, fmt.Errorf("failed to get custom type map: %w", err)
}

// ensure that replication is set to database
replConnConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}
// only initialize for CDCPullConnector to reduce number of idle connections
var replPool *SSHWrappedPostgresPool
if initializeReplPool {
// ensure that replication is set to database
replConnConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

replConnConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1
replConnConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1

// TODO: replPool not initializing might be intentional, if we only want to use QRep mirrors
// and the user doesn't have the REPLICATION permission
replPool, err := NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
replPool, err = NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create replication connection pool: %w", err)
}
}

metadataSchema := "_peerdb_internal"
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, true)
require.NoError(suite.T(), err)

setupTx, err := suite.connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, false)
suite.failTestError(err)

setupTx, err := suite.connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, false)
s.NoError(err)
}

Expand Down

0 comments on commit 5d32604

Please sign in to comment.