Skip to content

Commit

Permalink
Merge branch 'main' into remove-bq-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 25, 2023
2 parents 9f04fc6 + 79e9ce3 commit 1049e0f
Showing 1 changed file with 0 additions and 19 deletions.
19 changes: 0 additions & 19 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}
proposedConsumedXLogPos := consumedXLogPos

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
Expand Down Expand Up @@ -254,17 +253,6 @@ func (p *PostgresCDCSource) consumeStream(

for {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
if proposedConsumedXLogPos > consumedXLogPos {
p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos))
consumedXLogPos = proposedConsumedXLogPos
err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("storing updated LSN failed: %w", err)
}
}

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
Expand Down Expand Up @@ -477,13 +465,6 @@ func (p *PostgresCDCSource) consumeStream(
if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
}

if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
proposedConsumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
}
}
Expand Down

0 comments on commit 1049e0f

Please sign in to comment.