Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only initialize replication pool when needed in CDCPull #872

Merged
merged 3 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
return &protos.PeerSlotResponse{SlotData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
Expand All @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetStatInfo(
return &protos.PeerStatResponse{StatData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
Expand Down
15 changes: 9 additions & 6 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), true)
default:
return nil, ErrUnsupportedFunctionality
}
Expand All @@ -150,7 +150,7 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -172,7 +172,7 @@ func GetCDCNormalizeConnector(ctx context.Context,
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -186,7 +186,7 @@ func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
Expand All @@ -198,7 +198,7 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -219,7 +219,10 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
if pgConfig == nil {
return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name)
}
return connpostgres.NewPostgresConnector(ctx, pgConfig)
// we can't decide if a PG peer should have replication permissions on it because we don't know
// what the user wants to do with it, so defaulting to being permissive.
// can be revisited in the future or we can use some UI wizardry.
return connpostgres.NewPostgresConnector(ctx, pgConfig, false)
case protos.DBType_BIGQUERY:
bqConfig := peer.GetBigqueryConfig()
if bqConfig == nil {
Expand Down
30 changes: 16 additions & 14 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type PostgresConnector struct {
}

// NewPostgresConnector creates a new instance of PostgresConnector.
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig, initializeReplPool bool) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

// create a separate connection pool for non-replication queries as replication connections cannot
Expand Down Expand Up @@ -62,21 +62,23 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
return nil, fmt.Errorf("failed to get custom type map: %w", err)
}

// ensure that replication is set to database
replConnConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}
// only initialize for CDCPullConnector to reduce number of idle connections
var replPool *SSHWrappedPostgresPool
if initializeReplPool {
// ensure that replication is set to database
replConnConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

replConnConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1
replConnConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1

// TODO: replPool not initializing might be intentional, if we only want to use QRep mirrors
// and the user doesn't have the REPLICATION permission
replPool, err := NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
replPool, err = NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create replication connection pool: %w", err)
}
}

metadataSchema := "_peerdb_internal"
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, true)
require.NoError(suite.T(), err)

setupTx, err := suite.connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, false)
suite.failTestError(err)

setupTx, err := suite.connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, false)
s.NoError(err)
}

Expand Down
Loading