Skip to content

Commit

Permalink
Potential fix for the LSN issue (#600)
Browse files Browse the repository at this point in the history
KeepAlive messages are now sent only after we confirm no records were read
  • Loading branch information
iskakaushik authored Nov 1, 2023
1 parent 79693fb commit de9ceff
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ func (p *PostgresCDCSource) consumeStream(

if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}

if len(localRecords) == 0 {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
}
Expand All @@ -392,7 +392,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
// for a commit message, update the last checkpoint id for the record batch.
log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN)
batch.UpdateLatestCheckpoint(int64(xld.WALStart))
batch.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = false
case *pglogrepl.RelationMessage:
// treat all relation messages as correponding to parent if partitioned.
Expand Down

0 comments on commit de9ceff

Please sign in to comment.