Skip to content

Commit

Permalink
make the fix better
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 16, 2023
1 parent ce45d28 commit 9ec0282
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ func (p *PostgresCDCSource) consumeStream(
clientXLogPos pglogrepl.LSN,
records *model.CDCRecordStream,
) error {
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

defer func() {
err := conn.Close(p.ctx)
if err != nil {
Expand Down Expand Up @@ -215,9 +212,12 @@ func (p *PostgresCDCSource) consumeStream(
}
}

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
pkmRequiresResponse := false

for {
if time.Now().After(nextStandbyMessageDeadline) ||
(len(localRecords) >= int(req.MaxBatchSize)) {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
Expand All @@ -226,26 +226,40 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}

numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
standByLastLogged = time.Now()
}

nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
pkmRequiresResponse = false
}

if !p.commitLock && (len(localRecords) >= int(req.MaxBatchSize)) {
return nil
if time.Now().After(nextStandbyMessageDeadline) {
if len(localRecords) <= 1 {
log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}

var ctx context.Context
var cancel context.CancelFunc

if len(localRecords) == 0 {
ctx, cancel = context.WithCancel(p.ctx)
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
}

rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords))
log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords))
return nil
} else {
return fmt.Errorf("ReceiveMessage failed: %w", err)
Expand Down Expand Up @@ -275,8 +289,9 @@ func (p *PostgresCDCSource) consumeStream(
if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}

if pkm.ReplyRequested {
nextStandbyMessageDeadline = time.Time{}
pkmRequiresResponse = true
}

case pglogrepl.XLogDataByteID:
Expand Down

0 comments on commit 9ec0282

Please sign in to comment.