diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 84cc441d02..b59f5b6d39 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -248,7 +248,8 @@ func (p *PostgresCDCSource) consumeStream( // that this is the last row committed on the destination. if proposedConsumedXLogPos > consumedXLogPos { p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) - err := p.SetLastOffset(int64(proposedConsumedXLogPos)) + consumedXLogPos = proposedConsumedXLogPos + err := p.SetLastOffset(int64(consumedXLogPos)) if err != nil { return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err) } @@ -259,7 +260,6 @@ func (p *PostgresCDCSource) consumeStream( if err != nil { return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - consumedXLogPos = proposedConsumedXLogPos pkmRequiresResponse = false if time.Since(standByLastLogged) > 10*time.Second {