Skip to content

Commit

Permalink
changed WAL update logic (#458)
Browse files Browse the repository at this point in the history
This PR is debugging an issue where schema change propagation (#368)
don't apply properly when there are parallel transactions happening
during an ALTER TABLE statement.

Borrowing from jackc/pglogrepl#59

This PR changes the handling of clientXLogPos to match the one from
Postgres' own `pg_recvlogical`:

- keepalive messages should bump the position too, as they're only sent
(from what I can tell) after any xlogdata message;
- both the WALStart and the ServerWALEnd in logical xlogdata messages
represent the position that should be reported back, and adding the
length of the post-decoding data to it is meaningless;
- relation messages have a position of zero, and in general we should
match the pg_recvlogical behavior of only increasing the local position.

---------

Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
heavycrystal and iskakaushik authored Sep 30, 2023
1 parent 5e67e42 commit 71930a4
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ func (p *PostgresCDCSource) consumeStream(
}()

for {
if time.Now().After(nextStandbyMessageDeadline) {
if time.Now().After(nextStandbyMessageDeadline) ||
earlyReturn ||
(len(records.Records) == int(req.MaxBatchSize)) {
// update the WALWritePosition to be clientXLogPos - 1
// as the clientXLogPos is the last checkpoint id + 1
// and we want to send the last checkpoint id as the last
Expand All @@ -160,6 +162,10 @@ func (p *PostgresCDCSource) consumeStream(
utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage)
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)

if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) {
return result, nil
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
Expand Down Expand Up @@ -195,6 +201,9 @@ func (p *PostgresCDCSource) consumeStream(
log.Debugf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested)

if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}
if pkm.ReplyRequested {
nextStandbyMessageDeadline = time.Time{}
}
Expand Down Expand Up @@ -284,12 +293,9 @@ func (p *PostgresCDCSource) consumeStream(
}
}

currentPos := xld.WALStart + pglogrepl.LSN(len(xld.WALData))
records.LastCheckPointID = int64(currentPos)

if records.Records != nil &&
((len(records.Records) == int(req.MaxBatchSize)) || earlyReturn) {
return result, nil
if xld.WALStart > clientXLogPos {
clientXLogPos = xld.WALStart
records.LastCheckPointID = int64(clientXLogPos)
}
}
}
Expand Down

0 comments on commit 71930a4

Please sign in to comment.