From aa7a05e25c51b803fce5d0a97cc5c49cfc1dd829 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 14 Dec 2023 19:50:00 +0000 Subject: [PATCH] code review --- flow/connectors/postgres/cdc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 84cc441d02..b59f5b6d39 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -248,7 +248,8 @@ func (p *PostgresCDCSource) consumeStream( // that this is the last row committed on the destination. if proposedConsumedXLogPos > consumedXLogPos { p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) - err := p.SetLastOffset(int64(proposedConsumedXLogPos)) + consumedXLogPos = proposedConsumedXLogPos + err := p.SetLastOffset(int64(consumedXLogPos)) if err != nil { return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err) } @@ -259,7 +260,6 @@ func (p *PostgresCDCSource) consumeStream( if err != nil { return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - consumedXLogPos = proposedConsumedXLogPos pkmRequiresResponse = false if time.Since(standByLastLogged) > 10*time.Second {