From c6ad0f6b0cb0cde7d1634c4351eaed119c1ae97a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 21 Nov 2023 08:21:09 -0500 Subject: [PATCH] [cdc] Flush at the beginning of CDC until consumedXLogPos (#688) This was missed before, when `wal_sender_timeout` is set to a large value waiting for PKMs is a bad idea --- flow/connectors/postgres/cdc.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6836c9a181..3aa3c23e1f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -182,6 +182,12 @@ func (p *PostgresCDCSource) consumeStream( consumedXLogPos := pglogrepl.LSN(0) if clientXLogPos > 0 { consumedXLogPos = clientXLogPos - 1 + + err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, + pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) + if err != nil { + return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) + } } var standByLastLogged time.Time