From 6638b3115f1b1e84f5adf7520093f1c7c63358b7 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 16 Nov 2023 19:42:22 -0500 Subject: [PATCH] one last fix --- flow/connectors/postgres/cdc.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index c59bb7b57f..b61856620f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -202,6 +202,8 @@ func (p *PostgresCDCSource) consumeStream( }() tablePKeyLastSeen := make(map[model.TableWithPkey]int) + standbyMessageTimeout := req.IdleTimeout + nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) addRecord := func(rec model.Record) { records.AddRecord(rec) @@ -209,11 +211,12 @@ func (p *PostgresCDCSource) consumeStream( if len(localRecords) == 1 { records.SignalAsNotEmpty() + log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) + log.Infof("num records accumulated: %d", len(localRecords)) + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } } - standbyMessageTimeout := req.IdleTimeout - nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) pkmRequiresResponse := false for { @@ -235,12 +238,6 @@ func (p *PostgresCDCSource) consumeStream( pkmRequiresResponse = false } - if len(localRecords) == 0 { - log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) - log.Infof("num records accumulated: %d", len(localRecords)) - nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - } - if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock { return nil }