Skip to content

Commit

Permalink
switched to using commitLock in conjunction with earlyReturn
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 11, 2023
1 parent 3393fba commit b5155c6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 42 deletions.
36 changes: 19 additions & 17 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PostgresCDCSource struct {
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
}

type PostgresCDCConfig struct {
Expand All @@ -52,6 +53,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, err
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
commitLock: false,
}, nil
}

Expand Down Expand Up @@ -131,7 +133,7 @@ func (p *PostgresCDCSource) consumeStream(

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
stopAtNextCommit := false
earlyReturn := false

defer func() {
err := conn.Close(p.ctx)
Expand All @@ -151,11 +153,9 @@ func (p *PostgresCDCSource) consumeStream(
}

for {
if len(records.Records) >= int(req.MaxBatchSize) {
stopAtNextCommit = true
}

if time.Now().After(nextStandbyMessageDeadline) {
if time.Now().After(nextStandbyMessageDeadline) ||
earlyReturn ||
(len(records.Records) == int(req.MaxBatchSize)) {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
Expand All @@ -168,16 +168,19 @@ func (p *PostgresCDCSource) consumeStream(
utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage)
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)

if !p.commitLock && (earlyReturn || (len(records.Records) == int(req.MaxBatchSize))) {
return result, nil
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil {
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
// TODO (kaushik): consider returning the records accumulated so far
// if last message seen was a commit, then we can return right away as well.
stopAtNextCommit = true
log.Infof("Idle timeout reached, returning currently accumulated records")
return result, nil
} else {
return nil, fmt.Errorf("ReceiveMessage failed: %w", err)
}
Expand Down Expand Up @@ -292,11 +295,7 @@ func (p *PostgresCDCSource) consumeStream(
result.TableSchemaDelta = tableSchemaDelta
log.Infof("Detected schema change for table %s, returning currently accumulated records",
result.TableSchemaDelta.SrcTableName)
stopAtNextCommit = true
}
case *model.CommitRecord:
if stopAtNextCommit {
return result, nil
earlyReturn = true
}
}
}
Expand All @@ -317,7 +316,8 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre

switch msg := logicalMsg.(type) {
case *pglogrepl.BeginMessage:
log.Debugf("Ignoring BeginMessage")
log.Debugf("Locking PullRecords at BeginMessage, awaiting CommitMessage")
p.commitLock = true
case *pglogrepl.InsertMessage:
return p.processInsertMessage(xld.WALStart, msg)
case *pglogrepl.UpdateMessage:
Expand All @@ -326,8 +326,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
return p.processDeleteMessage(xld.WALStart, msg)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
log.Warnf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN)
batch.LastCheckPointID = int64(xld.WALStart)
return model.NewCommitRecord(batch.LastCheckPointID), nil
p.commitLock = false
case *pglogrepl.RelationMessage:
// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
Expand Down
25 changes: 0 additions & 25 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,29 +365,4 @@ func (r *RelationRecord) GetItems() *RecordItems {
return nil
}

// CommitRecord is a record that represents a commit.
type CommitRecord struct {
CheckPointID int64
}

// Implement Record interface for CommitRecord.
func (r *CommitRecord) GetCheckPointID() int64 {
return r.CheckPointID
}

func (r *CommitRecord) GetTableName() string {
return ""
}

func (r *CommitRecord) GetItems() *RecordItems {
return nil
}

// NewCommitRecord creates a new CommitRecord.
func NewCommitRecord(checkPointID int64) *CommitRecord {
return &CommitRecord{
CheckPointID: checkPointID,
}
}

type RelationMessageMapping map[uint32]*protos.RelationMessage

0 comments on commit b5155c6

Please sign in to comment.