From 6baab0e8f19c7b3e347fd5e28933151b5c459e46 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 30 Sep 2023 02:42:11 +0530 Subject: [PATCH] changed WAL update logic to hopefully fix schema changes bugs --- flow/connectors/postgres/cdc.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 65e75acfc0..dd3deea677 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -141,7 +141,9 @@ func (p *PostgresCDCSource) consumeStream( }() for { - if time.Now().After(nextStandbyMessageDeadline) { + if time.Now().After(nextStandbyMessageDeadline) || + earlyReturn || + (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { // update the WALWritePosition to be clientXLogPos - 1 // as the clientXLogPos is the last checkpoint id + 1 // and we want to send the last checkpoint id as the last @@ -160,6 +162,10 @@ func (p *PostgresCDCSource) consumeStream( utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + + if earlyReturn || (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { + return result, nil + } } ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) @@ -284,12 +290,9 @@ func (p *PostgresCDCSource) consumeStream( } } - currentPos := xld.WALStart + pglogrepl.LSN(len(xld.WALData)) - records.LastCheckPointID = int64(currentPos) - - if records.Records != nil && - ((len(records.Records) == int(req.MaxBatchSize)) || earlyReturn) { - return result, nil + if xld.WALStart > clientXLogPos { + clientXLogPos = xld.WALStart + records.LastCheckPointID = int64(clientXLogPos) } } }