From 396c6f0ccaf21e720b634d5826106d15f3056fc4 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 11 Oct 2023 13:52:03 -0400 Subject: [PATCH] Return only at commit message --- flow/connectors/postgres/cdc.go | 43 ++++++++++++++++++--------------- flow/model/model.go | 25 +++++++++++++++++++ 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 1d19b21508..065d62a193 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -131,7 +131,7 @@ func (p *PostgresCDCSource) consumeStream( standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - earlyReturn := false + stopAtNextCommit := false defer func() { err := conn.Close(p.ctx) @@ -142,20 +142,24 @@ func (p *PostgresCDCSource) consumeStream( } }() + // clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed + // until clientXLogPos - 1 each time we send a standby status update. + // consumedXLogPos is the lsn that has been committed on the destination. + consumedXLogPos := pglogrepl.LSN(0) + if clientXLogPos > 0 { + consumedXLogPos = clientXLogPos - 1 + } + for { - if time.Now().After(nextStandbyMessageDeadline) || - earlyReturn || - (len(records.Records) == int(req.MaxBatchSize)) { - // update the WALWritePosition to be clientXLogPos - 1 - // as the clientXLogPos is the last checkpoint id + 1 - // and we want to send the last checkpoint id as the last - // checkpoint id that we have processed. - lastProcessedXLogPos := clientXLogPos - if clientXLogPos > 0 { - lastProcessedXLogPos = clientXLogPos - 1 - } + if len(records.Records) >= int(req.MaxBatchSize) { + stopAtNextCommit = true + } + + if time.Now().After(nextStandbyMessageDeadline) { + // Update XLogPos to the last processed position, we can only confirm + // that this is the last row committed on the destination. err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, - pglogrepl.StandbyStatusUpdate{WALWritePosition: lastProcessedXLogPos}) + pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } @@ -164,10 +168,6 @@ func (p *PostgresCDCSource) consumeStream( utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - - if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) { - return result, nil - } } ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) @@ -285,12 +285,16 @@ func (p *PostgresCDCSource) consumeStream( case *model.DeleteRecord: records.Records = append(records.Records, rec) case *model.RelationRecord: - tableSchemaDelta := rec.(*model.RelationRecord).TableSchemaDelta + tableSchemaDelta := r.TableSchemaDelta if len(tableSchemaDelta.AddedColumns) > 0 || len(tableSchemaDelta.DroppedColumns) > 0 { result.TableSchemaDelta = tableSchemaDelta log.Infof("Detected schema change for table %s, returning currently accumulated records", result.TableSchemaDelta.SrcTableName) - earlyReturn = true + stopAtNextCommit = true + } + case *model.CommitRecord: + if stopAtNextCommit { + return result, nil } } } @@ -321,6 +325,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre case *pglogrepl.CommitMessage: // for a commit message, update the last checkpoint id for the record batch. batch.LastCheckPointID = int64(xld.WALStart) + return model.NewCommitRecord(batch.LastCheckPointID), nil case *pglogrepl.RelationMessage: // TODO (kaushik): consider persistent state for a mirror job // to be stored somewhere in temporal state. We might need to persist diff --git a/flow/model/model.go b/flow/model/model.go index cf5784b113..24d75b6b48 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -365,4 +365,29 @@ func (r *RelationRecord) GetItems() *RecordItems { return nil } +// CommitRecord is a record that represents a commit. +type CommitRecord struct { + CheckPointID int64 +} + +// Implement Record interface for CommitRecord. +func (r *CommitRecord) GetCheckPointID() int64 { + return r.CheckPointID +} + +func (r *CommitRecord) GetTableName() string { + return "" +} + +func (r *CommitRecord) GetItems() *RecordItems { + return nil +} + +// NewCommitRecord creates a new CommitRecord. +func NewCommitRecord(checkPointID int64) *CommitRecord { + return &CommitRecord{ + CheckPointID: checkPointID, + } +} + type RelationMessageMapping map[uint32]*protos.RelationMessage