Skip to content

Commit

Permalink
Merge branch 'main' into peerdbenv-getter-naming-convention
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Dec 22, 2023
2 parents eaf541b + fd9aebe commit ab50fc1
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 44 deletions.
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
return &protos.PeerSlotResponse{SlotData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
Expand All @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetStatInfo(
return &protos.PeerStatResponse{StatData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ type CDCPullConnector interface {
// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error

// SendWALHeartbeat allows for activity to progress restart_lsn on postgres.
SendWALHeartbeat() error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(slotName string) ([]*protos.SlotInfo, error)

Expand Down Expand Up @@ -140,7 +137,7 @@ func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), true)
default:
return nil, ErrUnsupportedFunctionality
}
Expand All @@ -150,7 +147,7 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -172,7 +169,7 @@ func GetCDCNormalizeConnector(ctx context.Context,
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -186,7 +183,7 @@ func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
Expand All @@ -198,7 +195,7 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
Expand All @@ -219,7 +216,10 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
if pgConfig == nil {
return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name)
}
return connpostgres.NewPostgresConnector(ctx, pgConfig)
// we can't decide if a PG peer should have replication permissions on it because we don't know
// what the user wants to do with it, so defaulting to being permissive.
// can be revisited in the future or we can use some UI wizardry.
return connpostgres.NewPostgresConnector(ctx, pgConfig, false)
case protos.DBType_BIGQUERY:
bqConfig := peer.GetBigqueryConfig()
if bqConfig == nil {
Expand Down
46 changes: 16 additions & 30 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type PostgresConnector struct {
}

// NewPostgresConnector creates a new instance of PostgresConnector.
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig, initializeReplPool bool) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

// create a separate connection pool for non-replication queries as replication connections cannot
Expand Down Expand Up @@ -62,21 +62,23 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
return nil, fmt.Errorf("failed to get custom type map: %w", err)
}

// ensure that replication is set to database
replConnConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}
// only initialize for CDCPullConnector to reduce number of idle connections
var replPool *SSHWrappedPostgresPool
if initializeReplPool {
// ensure that replication is set to database
replConnConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %w", err)
}

replConnConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1
replConnConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1

// TODO: replPool not initializing might be intentional, if we only want to use QRep mirrors
// and the user doesn't have the REPLICATION permission
replPool, err := NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
replPool, err = NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create replication connection pool: %w", err)
}
}

metadataSchema := "_peerdb_internal"
Expand Down Expand Up @@ -880,22 +882,6 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error {
return nil
}

func (c *PostgresConnector) SendWALHeartbeat() error {
command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
_, err := c.pool.Exec(c.ctx, command)
if err != nil {
return fmt.Errorf("error bumping wal position: %w", err)
}

return nil
}

// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) {
row := c.pool.
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, true)
require.NoError(suite.T(), err)

setupTx, err := suite.connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, false)
suite.failTestError(err)

setupTx, err := suite.connector.pool.Begin(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() {
User: "postgres",
Password: "postgres",
Database: "postgres",
})
}, false)
s.NoError(err)
}

Expand Down

0 comments on commit ab50fc1

Please sign in to comment.