diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 8be83b4d64..533b4ee0c5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -359,13 +359,13 @@ func (p *PostgresCDCSource) consumeStream( if xld.WALStart > clientXLogPos { clientXLogPos = xld.WALStart - records.UpdateLatestCheckpoint(int64(clientXLogPos)) } if len(localRecords) == 0 { // given that we have no records it is safe to update the flush wal position // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. consumedXLogPos = clientXLogPos + records.UpdateLatestCheckpoint(int64(clientXLogPos)) } } } @@ -392,7 +392,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl // for a commit message, update the last checkpoint id for the record batch. log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", msg.CommitLSN, msg.TransactionEndLSN) - batch.UpdateLatestCheckpoint(int64(xld.WALStart)) + batch.UpdateLatestCheckpoint(int64(msg.CommitLSN)) p.commitLock = false case *pglogrepl.RelationMessage: // treat all relation messages as correponding to parent if partitioned.