Skip to content

Commit

Permalink
basic deadline exceeded scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 19, 2023
1 parent e09f0cc commit c7a7c74
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (p *PostgresCDCSource) consumeStream(
}

pkmRequiresResponse := false
waitingForCommit := false

for {
if pkmRequiresResponse {
Expand All @@ -242,6 +243,31 @@ func (p *PostgresCDCSource) consumeStream(
return nil
}

if waitingForCommit && !p.commitLock {
log.Infof(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
len(localRecords),
)
return nil
}

// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !p.commitLock {
log.Infof(
"[%s] Stand-by deadline exceeded, returning currently accumulated records - %d",
req.FlowJobName,
len(localRecords),
)
return nil
} else {
// we need to wait for next commit.
waitingForCommit = true
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
}

var ctx context.Context
var cancel context.CancelFunc

Expand Down

0 comments on commit c7a7c74

Please sign in to comment.