Skip to content

Commit

Permalink
changed WAL update logic to hopefully fix schema changes bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Sep 29, 2023
1 parent c55b004 commit 6baab0e
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ func (p *PostgresCDCSource) consumeStream(
}()

for {
if time.Now().After(nextStandbyMessageDeadline) {
if time.Now().After(nextStandbyMessageDeadline) ||
earlyReturn ||
(records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) {
// update the WALWritePosition to be clientXLogPos - 1
// as the clientXLogPos is the last checkpoint id + 1
// and we want to send the last checkpoint id as the last
Expand All @@ -160,6 +162,10 @@ func (p *PostgresCDCSource) consumeStream(
utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage)
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)

if earlyReturn || (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) {
return result, nil
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
Expand Down Expand Up @@ -284,12 +290,9 @@ func (p *PostgresCDCSource) consumeStream(
}
}

currentPos := xld.WALStart + pglogrepl.LSN(len(xld.WALData))
records.LastCheckPointID = int64(currentPos)

if records.Records != nil &&
((len(records.Records) == int(req.MaxBatchSize)) || earlyReturn) {
return result, nil
if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
records.LastCheckPointID = int64(clientXLogPos)
}
}
}
Expand Down

0 comments on commit 6baab0e

Please sign in to comment.