From ea7646d3dd9034ae99401cc3abcbe0877c16223a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 20 Oct 2023 16:32:01 -0400 Subject: [PATCH] push at least not equals --- flow/connectors/postgres/cdc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index e8137381b1..fe30d74c6b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -156,7 +156,7 @@ func (p *PostgresCDCSource) consumeStream( for { if time.Now().After(nextStandbyMessageDeadline) || - (len(records.Records) == int(req.MaxBatchSize)) { + (len(records.Records) >= int(req.MaxBatchSize)) { // Update XLogPos to the last processed position, we can only confirm // that this is the last row committed on the destination. err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, @@ -170,7 +170,7 @@ func (p *PostgresCDCSource) consumeStream( log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - if !p.commitLock && (len(records.Records) == int(req.MaxBatchSize)) { + if !p.commitLock && (len(records.Records) >= int(req.MaxBatchSize)) { return result, nil } }