From c4956b2855ea87738ade9e6e05a6d77a13aadaf6 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 25 Dec 2023 10:02:20 -0500 Subject: [PATCH] Never update consumed xlogpos to PKM xlogpos even when num records is zero This could potentially cause the WAL to build up but based on reports in the field this seems like the safer alternative for now. --- flow/connectors/postgres/cdc.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index f2eda2e5f4..b3686f4d09 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -197,7 +197,6 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) } } - proposedConsumedXLogPos := consumedXLogPos var standByLastLogged time.Time cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName) @@ -254,17 +253,6 @@ func (p *PostgresCDCSource) consumeStream( for { if pkmRequiresResponse { - // Update XLogPos to the last processed position, we can only confirm - // that this is the last row committed on the destination. - if proposedConsumedXLogPos > consumedXLogPos { - p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) - consumedXLogPos = proposedConsumedXLogPos - err := p.SetLastOffset(int64(consumedXLogPos)) - if err != nil { - return fmt.Errorf("storing updated LSN failed: %w", err) - } - } - err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { @@ -477,13 +465,6 @@ func (p *PostgresCDCSource) consumeStream( if xld.WALStart > clientXLogPos { clientXLogPos = xld.WALStart } - - if cdcRecordsStorage.IsEmpty() { - // given that we have no records it is safe to update the flush wal position - // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. - proposedConsumedXLogPos = clientXLogPos - records.UpdateLatestCheckpoint(int64(clientXLogPos)) - } } } }