From a37bec5cfc067e771ed484efb66eeea3336c73a3 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 20 Nov 2023 11:14:24 -0500 Subject: [PATCH] [cdc] Flush at the beginning of CDC until consumedXLogPos --- flow/connectors/postgres/cdc.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6836c9a181..3aa3c23e1f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -182,6 +182,12 @@ func (p *PostgresCDCSource) consumeStream( consumedXLogPos := pglogrepl.LSN(0) if clientXLogPos > 0 { consumedXLogPos = clientXLogPos - 1 + + err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, + pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) + if err != nil { + return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) + } } var standByLastLogged time.Time