Skip to content

Commit

Permalink
Potential fix for the LSN issue
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 1, 2023
1 parent 79693fb commit 951e2aa
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ func (p *PostgresCDCSource) consumeStream(

if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}

if len(localRecords) == 0 {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
}
Expand All @@ -392,7 +392,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
// for a commit message, update the last checkpoint id for the record batch.
log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN)
batch.UpdateLatestCheckpoint(int64(xld.WALStart))
batch.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = false
case *pglogrepl.RelationMessage:
// treat all relation messages as correponding to parent if partitioned.
Expand Down

0 comments on commit 951e2aa

Please sign in to comment.