diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 69c077fab9..9a4fbc6b6e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -218,6 +218,7 @@ func (p *PostgresCDCSource) consumeStream( } pkmRequiresResponse := false + waitingForCommit := false for { if pkmRequiresResponse { @@ -242,6 +243,31 @@ func (p *PostgresCDCSource) consumeStream( return nil } + if waitingForCommit && !p.commitLock { + log.Infof( + "[%s] commit received, returning currently accumulated records - %d", + req.FlowJobName, + len(localRecords), + ) + return nil + } + + // if we are past the next standby deadline (?) + if time.Now().After(nextStandbyMessageDeadline) { + if !p.commitLock { + log.Infof( + "[%s] Stand-by deadline exceeded, returning currently accumulated records - %d", + req.FlowJobName, + len(localRecords), + ) + return nil + } else { + // we need to wait for next commit. + waitingForCommit = true + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + } + } + var ctx context.Context var cancel context.CancelFunc