diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 3cf42a6dd2..bf499deeb0 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -235,12 +235,10 @@ func (p *PostgresCDCSource) consumeStream( pkmRequiresResponse = false } - if time.Now().After(nextStandbyMessageDeadline) { - if len(localRecords) <= 1 { - log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) - log.Infof("num records accumulated: %d", len(localRecords)) - nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - } + if len(localRecords) <= 1 { + log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) + log.Infof("num records accumulated: %d", len(localRecords)) + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {