diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 278f0ed863..3cf42a6dd2 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -164,9 +164,6 @@ func (p *PostgresCDCSource) consumeStream( clientXLogPos pglogrepl.LSN, records *model.CDCRecordStream, ) error { - standbyMessageTimeout := req.IdleTimeout - nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - defer func() { err := conn.Close(p.ctx) if err != nil { @@ -215,9 +212,12 @@ func (p *PostgresCDCSource) consumeStream( } } + standbyMessageTimeout := req.IdleTimeout + nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) + pkmRequiresResponse := false + for { - if time.Now().After(nextStandbyMessageDeadline) || - (len(localRecords) >= int(req.MaxBatchSize)) { + if pkmRequiresResponse { // Update XLogPos to the last processed position, we can only confirm // that this is the last row committed on the destination. err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, @@ -226,26 +226,41 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords)) - if time.Since(standByLastLogged) > 10*time.Second { + numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords)) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) standByLastLogged = time.Now() } - nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + pkmRequiresResponse = false + } - if !p.commitLock && (len(localRecords) >= int(req.MaxBatchSize)) { - return nil + if time.Now().After(nextStandbyMessageDeadline) { + if len(localRecords) <= 1 { + log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) + log.Infof("num records accumulated: %d", len(localRecords)) + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } } - ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) + if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock { + return nil + } + + var ctx context.Context + var cancel context.CancelFunc + + if len(localRecords) == 0 { + ctx, cancel = context.WithCancel(p.ctx) + } else { + ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline) + } + rawMsg, err := conn.ReceiveMessage(ctx) cancel() if err != nil && !p.commitLock { if pgconn.Timeout(err) { - log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords)) + log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords)) return nil } else { return fmt.Errorf("ReceiveMessage failed: %w", err) @@ -275,8 +290,9 @@ func (p *PostgresCDCSource) consumeStream( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } + if pkm.ReplyRequested { - nextStandbyMessageDeadline = time.Time{} + pkmRequiresResponse = true } case pglogrepl.XLogDataByteID: