diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b90e9d779b..b0ffcd1f58 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -177,7 +177,8 @@ func (p *PostgresCDCSource) consumeStream( nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - if !p.commitLock && (len(records.Records) >= int(req.MaxBatchSize)) { + if !p.commitLock { + log.Infof("Stand-by timeout reached, returning currently '%d' records", len(records.Records)) return result, nil } } @@ -187,7 +188,7 @@ func (p *PostgresCDCSource) consumeStream( cancel() if err != nil && !p.commitLock { if pgconn.Timeout(err) { - log.Infof("Idle timeout reached, returning currently accumulated records") + log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(records.Records)) return result, nil } else { return nil, fmt.Errorf("ReceiveMessage failed: %w", err)