diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 99d40ff1c1..5bd0b60df4 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -196,7 +196,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = $2, updated_at = NOW() + DO UPDATE SET last_offset = GREATEST(last_offset, $2), updated_at = NOW() `, jobName, offset, 0) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index a7c86f19a8..84cc441d02 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -181,17 +181,13 @@ func (p *PostgresCDCSource) consumeStream( if clientXLogPos > 0 { consumedXLogPos = clientXLogPos - err := p.SetLastOffset(int64(consumedXLogPos)) - if err != nil { - return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err) - } - - err = pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, + err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) } } + proposedConsumedXLogPos := consumedXLogPos var standByLastLogged time.Time cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName) @@ -250,19 +246,27 @@ func (p *PostgresCDCSource) consumeStream( if pkmRequiresResponse { // Update XLogPos to the last processed position, we can only confirm // that this is the last row committed on the destination. + if proposedConsumedXLogPos > consumedXLogPos { + p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) + err := p.SetLastOffset(int64(proposedConsumedXLogPos)) + if err != nil { + return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err) + } + } + err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } + consumedXLogPos = proposedConsumedXLogPos + pkmRequiresResponse = false if time.Since(standByLastLogged) > 10*time.Second { numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len()) p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage)) standByLastLogged = time.Now() } - - pkmRequiresResponse = false } if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock { @@ -466,7 +470,7 @@ func (p *PostgresCDCSource) consumeStream( if cdcRecordsStorage.IsEmpty() { // given that we have no records it is safe to update the flush wal position // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. - consumedXLogPos = clientXLogPos + proposedConsumedXLogPos = clientXLogPos records.UpdateLatestCheckpoint(int64(clientXLogPos)) } }