diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 3aa3c23e1f..552f372a04 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -35,7 +35,7 @@ type PostgresCDCSource struct { customTypeMapping map[uint32]string // for partitioned tables, maps child relid to parent relid - chIdToParRelId map[uint32]uint32 + childToParentRelIDMapping map[uint32]uint32 } type PostgresCDCConfig struct { @@ -50,27 +50,27 @@ type PostgresCDCConfig struct { // Create a new PostgresCDCSource func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) { - childToParentRelIdMap, err := getChildToParentRelIdMap(cdcConfig.AppContext, cdcConfig.Connection) + childToParentRelIDMap, err := getChildToParentRelIDMap(cdcConfig.AppContext, cdcConfig.Connection) if err != nil { return nil, fmt.Errorf("error getting child to parent relid map: %w", err) } return &PostgresCDCSource{ - ctx: cdcConfig.AppContext, - replPool: cdcConfig.Connection, - SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, - TableNameMapping: cdcConfig.TableNameMapping, - slot: cdcConfig.Slot, - publication: cdcConfig.Publication, - relationMessageMapping: cdcConfig.RelationMessageMapping, - typeMap: pgtype.NewMap(), - chIdToParRelId: childToParentRelIdMap, - commitLock: false, - customTypeMapping: customTypeMap, + ctx: cdcConfig.AppContext, + replPool: cdcConfig.Connection, + SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, + TableNameMapping: cdcConfig.TableNameMapping, + slot: cdcConfig.Slot, + publication: cdcConfig.Publication, + relationMessageMapping: cdcConfig.RelationMessageMapping, + typeMap: pgtype.NewMap(), + childToParentRelIDMapping: childToParentRelIDMap, + commitLock: false, + customTypeMapping: customTypeMap, }, nil } -func getChildToParentRelIdMap(ctx context.Context, pool *pgxpool.Pool) (map[uint32]uint32, error) { +func getChildToParentRelIDMap(ctx context.Context, pool *pgxpool.Pool) (map[uint32]uint32, error) { query := ` SELECT parent.oid AS parentrelid, @@ -88,18 +88,18 @@ func getChildToParentRelIdMap(ctx context.Context, pool *pgxpool.Pool) (map[uint defer rows.Close() - childToParentRelIdMap := make(map[uint32]uint32) + childToParentRelIDMap := make(map[uint32]uint32) + var parentRelID uint32 + var childRelID uint32 for rows.Next() { - var parentRelId uint32 - var childRelId uint32 - err := rows.Scan(&parentRelId, &childRelId) + err := rows.Scan(&parentRelID, &childRelID) if err != nil { return nil, fmt.Errorf("error scanning child to parent relid map: %w", err) } - childToParentRelIdMap[childRelId] = parentRelId + childToParentRelIDMap[childRelID] = parentRelID } - return childToParentRelIdMap, nil + return childToParentRelIDMap, nil } // PullRecords pulls records from the cdc stream @@ -402,8 +402,8 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl batch.UpdateLatestCheckpoint(int64(msg.CommitLSN)) p.commitLock = false case *pglogrepl.RelationMessage: - // treat all relation messages as correponding to parent if partitioned. - msg.RelationID = p.getParentRelIdIfPartitioned(msg.RelationID) + // treat all relation messages as corresponding to parent if partitioned. + msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) // TODO (kaushik): consider persistent state for a mirror job // to be stored somewhere in temporal state. We might need to persist @@ -430,19 +430,19 @@ func (p *PostgresCDCSource) processInsertMessage( lsn pglogrepl.LSN, msg *pglogrepl.InsertMessage, ) (model.Record, error) { - relId := p.getParentRelIdIfPartitioned(msg.RelationID) + relID := p.getParentRelIDIfPartitioned(msg.RelationID) - tableName, exists := p.SrcTableIDNameMapping[relId] + tableName, exists := p.SrcTableIDNameMapping[relID] if !exists { return nil, nil } // log lsn and relation id for debugging - log.Debugf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relId, tableName) + log.Debugf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName) - rel, ok := p.relationMessageMapping[relId] + rel, ok := p.relationMessageMapping[relID] if !ok { - return nil, fmt.Errorf("unknown relation id: %d", relId) + return nil, fmt.Errorf("unknown relation id: %d", relID) } // create empty map of string to interface{} @@ -464,7 +464,7 @@ func (p *PostgresCDCSource) processUpdateMessage( lsn pglogrepl.LSN, msg *pglogrepl.UpdateMessage, ) (model.Record, error) { - relID := p.getParentRelIdIfPartitioned(msg.RelationID) + relID := p.getParentRelIDIfPartitioned(msg.RelationID) tableName, exists := p.SrcTableIDNameMapping[relID] if !exists { @@ -485,7 +485,8 @@ func (p *PostgresCDCSource) processUpdateMessage( return nil, fmt.Errorf("error converting old tuple to map: %w", err) } - newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel, p.TableNameMapping[tableName].Exclude) + newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, + rel, p.TableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting new tuple to map: %w", err) } @@ -505,7 +506,7 @@ func (p *PostgresCDCSource) processDeleteMessage( lsn pglogrepl.LSN, msg *pglogrepl.DeleteMessage, ) (model.Record, error) { - relID := p.getParentRelIdIfPartitioned(msg.RelationID) + relID := p.getParentRelIDIfPartitioned(msg.RelationID) tableName, exists := p.SrcTableIDNameMapping[relID] if !exists { @@ -736,11 +737,11 @@ func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest, return fmt.Sprintf("%x", hasher.Sum(nil)), nil } -func (p *PostgresCDCSource) getParentRelIdIfPartitioned(relId uint32) uint32 { - parentRelId, ok := p.chIdToParRelId[relId] +func (p *PostgresCDCSource) getParentRelIDIfPartitioned(relID uint32) uint32 { + parentRelID, ok := p.childToParentRelIDMapping[relID] if ok { - return parentRelId + return parentRelID } - return relId + return relID }