Skip to content

Commit

Permalink
Never update consumed xlogpos even when num records is zero (#900)
Browse files Browse the repository at this point in the history
This could potentially cause the WAL to build up but based on reports in
the field this seems like the safer alternative for now.
  • Loading branch information
iskakaushik authored Dec 25, 2023
1 parent 0806d33 commit 79e9ce3
Showing 1 changed file with 0 additions and 19 deletions.
19 changes: 0 additions & 19 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}
proposedConsumedXLogPos := consumedXLogPos

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
Expand Down Expand Up @@ -254,17 +253,6 @@ func (p *PostgresCDCSource) consumeStream(

for {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
if proposedConsumedXLogPos > consumedXLogPos {
p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos))
consumedXLogPos = proposedConsumedXLogPos
err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("storing updated LSN failed: %w", err)
}
}

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
Expand Down Expand Up @@ -477,13 +465,6 @@ func (p *PostgresCDCSource) consumeStream(
if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
}

if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
proposedConsumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
}
}
Expand Down

0 comments on commit 79e9ce3

Please sign in to comment.