Skip to content

Commit

Permalink
Return only at commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 11, 2023
1 parent 1716500 commit 396c6f0
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
43 changes: 24 additions & 19 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (p *PostgresCDCSource) consumeStream(

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
earlyReturn := false
stopAtNextCommit := false

defer func() {
err := conn.Close(p.ctx)
Expand All @@ -142,20 +142,24 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

// clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed
// until clientXLogPos - 1 each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1
}

for {
if time.Now().After(nextStandbyMessageDeadline) ||
earlyReturn ||
(len(records.Records) == int(req.MaxBatchSize)) {
// update the WALWritePosition to be clientXLogPos - 1
// as the clientXLogPos is the last checkpoint id + 1
// and we want to send the last checkpoint id as the last
// checkpoint id that we have processed.
lastProcessedXLogPos := clientXLogPos
if clientXLogPos > 0 {
lastProcessedXLogPos = clientXLogPos - 1
}
if len(records.Records) >= int(req.MaxBatchSize) {
stopAtNextCommit = true
}

if time.Now().After(nextStandbyMessageDeadline) {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: lastProcessedXLogPos})
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
Expand All @@ -164,10 +168,6 @@ func (p *PostgresCDCSource) consumeStream(
utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage)
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)

if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) {
return result, nil
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
Expand Down Expand Up @@ -285,12 +285,16 @@ func (p *PostgresCDCSource) consumeStream(
case *model.DeleteRecord:
records.Records = append(records.Records, rec)
case *model.RelationRecord:
tableSchemaDelta := rec.(*model.RelationRecord).TableSchemaDelta
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 || len(tableSchemaDelta.DroppedColumns) > 0 {
result.TableSchemaDelta = tableSchemaDelta
log.Infof("Detected schema change for table %s, returning currently accumulated records",
result.TableSchemaDelta.SrcTableName)
earlyReturn = true
stopAtNextCommit = true
}
case *model.CommitRecord:
if stopAtNextCommit {
return result, nil
}
}
}
Expand Down Expand Up @@ -321,6 +325,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
batch.LastCheckPointID = int64(xld.WALStart)
return model.NewCommitRecord(batch.LastCheckPointID), nil
case *pglogrepl.RelationMessage:
// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
Expand Down
25 changes: 25 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,29 @@ func (r *RelationRecord) GetItems() *RecordItems {
return nil
}

// CommitRecord is a record that represents a commit.
type CommitRecord struct {
CheckPointID int64
}

// Implement Record interface for CommitRecord.
func (r *CommitRecord) GetCheckPointID() int64 {
return r.CheckPointID
}

func (r *CommitRecord) GetTableName() string {
return ""
}

func (r *CommitRecord) GetItems() *RecordItems {
return nil
}

// NewCommitRecord creates a new CommitRecord.
func NewCommitRecord(checkPointID int64) *CommitRecord {
return &CommitRecord{
CheckPointID: checkPointID,
}
}

type RelationMessageMapping map[uint32]*protos.RelationMessage

0 comments on commit 396c6f0

Please sign in to comment.