diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index dd3deea677..3be9406c34 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -143,7 +143,7 @@ func (p *PostgresCDCSource) consumeStream( for { if time.Now().After(nextStandbyMessageDeadline) || earlyReturn || - (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { + (len(records.Records) == int(req.MaxBatchSize)) { // update the WALWritePosition to be clientXLogPos - 1 // as the clientXLogPos is the last checkpoint id + 1 // and we want to send the last checkpoint id as the last @@ -163,7 +163,7 @@ func (p *PostgresCDCSource) consumeStream( log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - if earlyReturn || (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { + if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) { return result, nil } }