Skip to content

Commit

Permalink
renaming variable names in cdc.go
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 27, 2023
1 parent e56cabd commit d28108f
Showing 1 changed file with 35 additions and 34 deletions.
69 changes: 35 additions & 34 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type PostgresCDCSource struct {
customTypeMapping map[uint32]string

// for partitioned tables, maps child relid to parent relid
chIdToParRelId map[uint32]uint32
childToParentRelIDMapping map[uint32]uint32
}

type PostgresCDCConfig struct {
Expand All @@ -50,27 +50,27 @@ type PostgresCDCConfig struct {

// Create a new PostgresCDCSource
func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) {
childToParentRelIdMap, err := getChildToParentRelIdMap(cdcConfig.AppContext, cdcConfig.Connection)
childToParentRelIDMap, err := getChildToParentRelIDMap(cdcConfig.AppContext, cdcConfig.Connection)
if err != nil {
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
}

return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
replPool: cdcConfig.Connection,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
chIdToParRelId: childToParentRelIdMap,
commitLock: false,
customTypeMapping: customTypeMap,
ctx: cdcConfig.AppContext,
replPool: cdcConfig.Connection,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
childToParentRelIDMapping: childToParentRelIDMap,
commitLock: false,
customTypeMapping: customTypeMap,
}, nil
}

func getChildToParentRelIdMap(ctx context.Context, pool *pgxpool.Pool) (map[uint32]uint32, error) {
func getChildToParentRelIDMap(ctx context.Context, pool *pgxpool.Pool) (map[uint32]uint32, error) {
query := `
SELECT
parent.oid AS parentrelid,
Expand All @@ -88,18 +88,18 @@ func getChildToParentRelIdMap(ctx context.Context, pool *pgxpool.Pool) (map[uint

defer rows.Close()

childToParentRelIdMap := make(map[uint32]uint32)
childToParentRelIDMap := make(map[uint32]uint32)
var parentRelID uint32
var childRelID uint32
for rows.Next() {
var parentRelId uint32
var childRelId uint32
err := rows.Scan(&parentRelId, &childRelId)
err := rows.Scan(&parentRelID, &childRelID)
if err != nil {
return nil, fmt.Errorf("error scanning child to parent relid map: %w", err)
}
childToParentRelIdMap[childRelId] = parentRelId
childToParentRelIDMap[childRelID] = parentRelID
}

return childToParentRelIdMap, nil
return childToParentRelIDMap, nil
}

// PullRecords pulls records from the cdc stream
Expand Down Expand Up @@ -402,8 +402,8 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
batch.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = false
case *pglogrepl.RelationMessage:
// treat all relation messages as correponding to parent if partitioned.
msg.RelationID = p.getParentRelIdIfPartitioned(msg.RelationID)
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)

// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
Expand All @@ -430,19 +430,19 @@ func (p *PostgresCDCSource) processInsertMessage(
lsn pglogrepl.LSN,
msg *pglogrepl.InsertMessage,
) (model.Record, error) {
relId := p.getParentRelIdIfPartitioned(msg.RelationID)
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relId]
tableName, exists := p.SrcTableIDNameMapping[relID]
if !exists {
return nil, nil
}

// log lsn and relation id for debugging
log.Debugf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relId, tableName)
log.Debugf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)

rel, ok := p.relationMessageMapping[relId]
rel, ok := p.relationMessageMapping[relID]
if !ok {
return nil, fmt.Errorf("unknown relation id: %d", relId)
return nil, fmt.Errorf("unknown relation id: %d", relID)
}

// create empty map of string to interface{}
Expand All @@ -464,7 +464,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
lsn pglogrepl.LSN,
msg *pglogrepl.UpdateMessage,
) (model.Record, error) {
relID := p.getParentRelIdIfPartitioned(msg.RelationID)
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
if !exists {
Expand All @@ -485,7 +485,8 @@ func (p *PostgresCDCSource) processUpdateMessage(
return nil, fmt.Errorf("error converting old tuple to map: %w", err)
}

newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel, p.TableNameMapping[tableName].Exclude)
newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple,
rel, p.TableNameMapping[tableName].Exclude)
if err != nil {
return nil, fmt.Errorf("error converting new tuple to map: %w", err)
}
Expand All @@ -505,7 +506,7 @@ func (p *PostgresCDCSource) processDeleteMessage(
lsn pglogrepl.LSN,
msg *pglogrepl.DeleteMessage,
) (model.Record, error) {
relID := p.getParentRelIdIfPartitioned(msg.RelationID)
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
if !exists {
Expand Down Expand Up @@ -736,11 +737,11 @@ func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest,
return fmt.Sprintf("%x", hasher.Sum(nil)), nil
}

func (p *PostgresCDCSource) getParentRelIdIfPartitioned(relId uint32) uint32 {
parentRelId, ok := p.chIdToParRelId[relId]
func (p *PostgresCDCSource) getParentRelIDIfPartitioned(relID uint32) uint32 {
parentRelID, ok := p.childToParentRelIDMapping[relID]
if ok {
return parentRelId
return parentRelID
}

return relId
return relID
}

0 comments on commit d28108f

Please sign in to comment.