From efb5dd49aaaf8690bdc7adcf0f28c0628dc16ef0 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 16 Nov 2023 18:57:36 -0500 Subject: [PATCH] fix more --- flow/connectors/postgres/cdc.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 3cf42a6dd2..bf499deeb0 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -235,12 +235,10 @@ func (p *PostgresCDCSource) consumeStream( pkmRequiresResponse = false } - if time.Now().After(nextStandbyMessageDeadline) { - if len(localRecords) <= 1 { - log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) - log.Infof("num records accumulated: %d", len(localRecords)) - nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - } + if len(localRecords) <= 1 { + log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) + log.Infof("num records accumulated: %d", len(localRecords)) + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {