Skip to content

Commit

Permalink
Merge branch 'wait-for-at-least-one' into customer-mirage
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 17, 2023
2 parents 69935ee + 6638b31 commit 0b28451
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,21 @@ func (p *PostgresCDCSource) consumeStream(
}()

tablePKeyLastSeen := make(map[model.TableWithPkey]int)
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecord := func(rec model.Record) {
records.AddRecord(rec)
localRecords = append(localRecords, rec)

if len(localRecords) == 1 {
records.SignalAsNotEmpty()
log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))
log.Infof("num records accumulated: %d", len(localRecords))
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
}

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
pkmRequiresResponse := false

for {
Expand All @@ -235,12 +238,6 @@ func (p *PostgresCDCSource) consumeStream(
pkmRequiresResponse = false
}

if len(localRecords) == 0 {
log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))
log.Infof("num records accumulated: %d", len(localRecords))
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}

if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock {
return nil
}
Expand Down

0 comments on commit 0b28451

Please sign in to comment.