Skip to content

Commit

Permalink
postgres connector: replace pgxpool.Pool with pgx.Conn
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 26, 2024
1 parent 9d4065b commit 13ffe93
Show file tree
Hide file tree
Showing 25 changed files with 447 additions and 471 deletions.
32 changes: 16 additions & 16 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin
return &pgPeerConfig, nil
}

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*connpostgres.SSHWrappedPostgresPool, error) {
func (h *FlowRequestHandler) getConnForPGPeer(ctx context.Context, peerName string) (*connpostgres.SSHWrappedPostgresConn, error) {
pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName)
if err != nil {
return nil, err
}

pool, err := connpostgres.NewSSHWrappedPostgresPoolFromConfig(ctx, pgPeerConfig)
pool, err := connpostgres.NewSSHWrappedPostgresConnFromConfig(ctx, pgPeerConfig)
if err != nil {
slog.Error("Failed to create postgres pool", slog.Any("error", err))
return nil, err
Expand All @@ -50,13 +50,13 @@ func (h *FlowRequestHandler) GetSchemas(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSchemasResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT schema_name"+
defer peerConn.Close(ctx)
rows, err := peerConn.Query(ctx, "SELECT schema_name"+
" FROM information_schema.schemata WHERE schema_name !~ '^pg_' AND schema_name <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
Expand All @@ -73,13 +73,13 @@ func (h *FlowRequestHandler) GetTablesInSchema(
ctx context.Context,
req *protos.SchemaTablesRequest,
) (*protos.SchemaTablesResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}

defer peerPool.Close()
rows, err := peerPool.Query(ctx, `SELECT DISTINCT ON (t.relname)
defer peerConn.Close(ctx)
rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname)
t.relname,
CASE
WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true
Expand Down Expand Up @@ -130,13 +130,13 @@ func (h *FlowRequestHandler) GetAllTables(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.AllTablesResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT table_schema || '.' || table_name AS schema_table "+
defer peerConn.Close(ctx)
rows, err := peerConn.Query(ctx, "SELECT table_schema || '.' || table_name AS schema_table "+
"FROM information_schema.tables WHERE table_schema !~ '^pg_' AND table_schema <> 'information_schema'")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
Expand All @@ -160,13 +160,13 @@ func (h *FlowRequestHandler) GetColumns(
ctx context.Context,
req *protos.TableColumnsRequest,
) (*protos.TableColumnsResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
}

defer peerPool.Close()
rows, err := peerPool.Query(ctx, `
defer peerConn.Close(ctx)
rows, err := peerConn.Query(ctx, `
SELECT
cols.column_name,
cols.data_type,
Expand Down Expand Up @@ -252,10 +252,10 @@ func (h *FlowRequestHandler) GetStatInfo(
}
defer pgConnector.Close()

peerPool := pgConnector.GetPool()
peerConn := pgConnector.GetConn()
peerUser := pgConfig.User

rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
rows, err := peerConn.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE "+
"usename=$1 AND state != 'idle';", peerUser)
Expand Down
19 changes: 7 additions & 12 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const maxRetriesForWalSegmentRemoved = 5

type PostgresCDCSource struct {
ctx context.Context
replPool *pgxpool.Pool
replConn *pgx.Conn
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
slot string
Expand All @@ -54,7 +54,7 @@ type PostgresCDCSource struct {

type PostgresCDCConfig struct {
AppContext context.Context
Connection *pgxpool.Pool
Connection *pgx.Conn
Slot string
Publication string
SrcTableIDNameMapping map[uint32]string
Expand Down Expand Up @@ -84,7 +84,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string)
return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
replPool: cdcConfig.Connection,
replConn: cdcConfig.Connection,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
Expand All @@ -102,7 +102,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
}, nil
}

func getChildToParentRelIDMap(ctx context.Context, pool *pgxpool.Pool) (map[uint32]uint32, error) {
func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) {
query := `
SELECT
parent.oid AS parentrelid,
Expand All @@ -113,7 +113,7 @@ func getChildToParentRelIDMap(ctx context.Context, pool *pgxpool.Pool) (map[uint
WHERE parent.relkind='p';
`

rows, err := pool.Query(ctx, query, pgx.QueryExecModeSimpleProtocol)
rows, err := conn.Query(ctx, query, pgx.QueryExecModeSimpleProtocol)
if err != nil {
return nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
}
Expand Down Expand Up @@ -142,14 +142,9 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {
}

// create replication connection
replicationConn, err := p.replPool.Acquire(p.ctx)
if err != nil {
return fmt.Errorf("error acquiring connection for replication: %w", err)
}

defer replicationConn.Release()
replicationConn := p.replConn

pgConn := replicationConn.Conn().PgConn()
pgConn := replicationConn.PgConn()
p.logger.Info("created replication connection")

// start replication
Expand Down
Loading

0 comments on commit 13ffe93

Please sign in to comment.