diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 65e75acfc0..48a94cd40a 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -141,7 +141,9 @@ func (p *PostgresCDCSource) consumeStream( }() for { - if time.Now().After(nextStandbyMessageDeadline) { + if time.Now().After(nextStandbyMessageDeadline) || + earlyReturn || + (len(records.Records) == int(req.MaxBatchSize)) { // update the WALWritePosition to be clientXLogPos - 1 // as the clientXLogPos is the last checkpoint id + 1 // and we want to send the last checkpoint id as the last @@ -160,6 +162,10 @@ func (p *PostgresCDCSource) consumeStream( utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + + if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) { + return result, nil + } } ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) @@ -195,6 +201,9 @@ func (p *PostgresCDCSource) consumeStream( log.Debugf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t", pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested) + if pkm.ServerWALEnd > clientXLogPos { + clientXLogPos = pkm.ServerWALEnd + } if pkm.ReplyRequested { nextStandbyMessageDeadline = time.Time{} } @@ -284,12 +293,9 @@ func (p *PostgresCDCSource) consumeStream( } } - currentPos := xld.WALStart + pglogrepl.LSN(len(xld.WALData)) - records.LastCheckPointID = int64(currentPos) - - if records.Records != nil && - ((len(records.Records) == int(req.MaxBatchSize)) || earlyReturn) { - return result, nil + if xld.WALStart > clientXLogPos { + clientXLogPos = xld.WALStart + records.LastCheckPointID = int64(clientXLogPos) } } }