diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6836c9a181..3aa3c23e1f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -182,6 +182,12 @@ func (p *PostgresCDCSource) consumeStream( consumedXLogPos := pglogrepl.LSN(0) if clientXLogPos > 0 { consumedXLogPos = clientXLogPos - 1 + + err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, + pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) + if err != nil { + return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) + } } var standByLastLogged time.Time