Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/wait-for-at-least-one' into cust…
Browse files Browse the repository at this point in the history
…omer-mirage
  • Loading branch information
iskakaushik committed Nov 20, 2023
2 parents 53a342b + c6f0f1e commit a7475d2
Showing 1 changed file with 49 additions and 12 deletions.
61 changes: 49 additions & 12 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ func (p *PostgresCDCSource) consumeStream(
clientXLogPos pglogrepl.LSN,
records *model.CDCRecordStream,
) error {
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

defer func() {
err := conn.Close(p.ctx)
if err != nil {
Expand Down Expand Up @@ -205,19 +202,26 @@ func (p *PostgresCDCSource) consumeStream(
}()

tablePKeyLastSeen := make(map[model.TableWithPkey]int)
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecord := func(rec model.Record) {
records.AddRecord(rec)
localRecords = append(localRecords, rec)

if len(localRecords) == 1 {
records.SignalAsNotEmpty()
log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))
log.Infof("num records accumulated: %d", len(localRecords))
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
}

pkmRequiresResponse := false
waitingForCommit := false

for {
if time.Now().After(nextStandbyMessageDeadline) ||
(len(localRecords) >= int(req.MaxBatchSize)) {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
Expand All @@ -226,26 +230,58 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}

numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords))
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
standByLastLogged = time.Now()
}

nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
pkmRequiresResponse = false
}

if !p.commitLock && (len(localRecords) >= int(req.MaxBatchSize)) {
if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}

if waitingForCommit && !p.commitLock {
log.Infof(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
len(localRecords),
)
return nil
}

// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !p.commitLock && len(localRecords) > 0 {
log.Infof(
"[%s] Stand-by deadline exceeded, returning currently accumulated records - %d",
req.FlowJobName,
len(localRecords),
)
return nil
} else {
// we need to wait for next commit.
waitingForCommit = p.commitLock
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
var ctx context.Context
var cancel context.CancelFunc

if len(localRecords) == 0 {
ctx, cancel = context.WithCancel(p.ctx)
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
}

rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords))
log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords))
return nil
} else {
return fmt.Errorf("ReceiveMessage failed: %w", err)
Expand Down Expand Up @@ -275,8 +311,9 @@ func (p *PostgresCDCSource) consumeStream(
if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
}

if pkm.ReplyRequested {
nextStandbyMessageDeadline = time.Time{}
pkmRequiresResponse = true
}

case pglogrepl.XLogDataByteID:
Expand Down

0 comments on commit a7475d2

Please sign in to comment.