From b037c67e9820a50a1d4f24dbb476a55e68cfa1c2 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 24 Oct 2023 13:01:54 -0400 Subject: [PATCH] return early when exit condition has been reached --- flow/connectors/postgres/cdc.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b90e9d779b..b0ffcd1f58 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -177,7 +177,8 @@ func (p *PostgresCDCSource) consumeStream( nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - if !p.commitLock && (len(records.Records) >= int(req.MaxBatchSize)) { + if !p.commitLock { + log.Infof("Stand-by timeout reached, returning currently '%d' records", len(records.Records)) return result, nil } } @@ -187,7 +188,7 @@ func (p *PostgresCDCSource) consumeStream( cancel() if err != nil && !p.commitLock { if pgconn.Timeout(err) { - log.Infof("Idle timeout reached, returning currently accumulated records") + log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(records.Records)) return result, nil } else { return nil, fmt.Errorf("ReceiveMessage failed: %w", err)