Skip to content

Commit

Permalink
external_metadata: GREATEST
Browse files Browse the repository at this point in the history
Log how much heartbeat adjusts lsn, don't call SetLastOffset if lsn hasn't moved
  • Loading branch information
serprex committed Dec 14, 2023
1 parent 68bf3ce commit a78644b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = $2, updated_at = NOW()
DO UPDATE SET last_offset = GREATEST(last_offset, $2), updated_at = NOW()
`, jobName, offset, 0)

if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,13 @@ func (p *PostgresCDCSource) consumeStream(
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos

err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err)
}

err = pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}
proposedConsumedXLogPos := consumedXLogPos

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName)
Expand Down Expand Up @@ -250,19 +246,27 @@ func (p *PostgresCDCSource) consumeStream(
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
if proposedConsumedXLogPos > consumedXLogPos {
p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos))
err := p.SetLastOffset(int64(proposedConsumedXLogPos))
if err != nil {
return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err)
}
}

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
consumedXLogPos = proposedConsumedXLogPos
pkmRequiresResponse = false

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage))
standByLastLogged = time.Now()
}

pkmRequiresResponse = false
}

if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock {
Expand Down Expand Up @@ -466,7 +470,7 @@ func (p *PostgresCDCSource) consumeStream(
if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
proposedConsumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
Expand Down

0 comments on commit a78644b

Please sign in to comment.