From 951e2aaa078276d6bc6f68834c3fab858c63a534 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 1 Nov 2023 11:19:01 -0400 Subject: [PATCH] Potential fix for the LSN issue --- flow/connectors/postgres/cdc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 8be83b4d64..533b4ee0c5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -359,13 +359,13 @@ func (p *PostgresCDCSource) consumeStream( if xld.WALStart > clientXLogPos { clientXLogPos = xld.WALStart - records.UpdateLatestCheckpoint(int64(clientXLogPos)) } if len(localRecords) == 0 { // given that we have no records it is safe to update the flush wal position // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. consumedXLogPos = clientXLogPos + records.UpdateLatestCheckpoint(int64(clientXLogPos)) } } } @@ -392,7 +392,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl // for a commit message, update the last checkpoint id for the record batch. log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", msg.CommitLSN, msg.TransactionEndLSN) - batch.UpdateLatestCheckpoint(int64(xld.WALStart)) + batch.UpdateLatestCheckpoint(int64(msg.CommitLSN)) p.commitLock = false case *pglogrepl.RelationMessage: // treat all relation messages as correponding to parent if partitioned.