From c7a7c741cf0e4026aee0c5f187e4f51ad97c0a13 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 19 Nov 2023 14:36:44 -0500 Subject: [PATCH] basic deadline exceeded scenario --- flow/connectors/postgres/cdc.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 69c077fab9..9a4fbc6b6e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -218,6 +218,7 @@ func (p *PostgresCDCSource) consumeStream( } pkmRequiresResponse := false + waitingForCommit := false for { if pkmRequiresResponse { @@ -242,6 +243,31 @@ func (p *PostgresCDCSource) consumeStream( return nil } + if waitingForCommit && !p.commitLock { + log.Infof( + "[%s] commit received, returning currently accumulated records - %d", + req.FlowJobName, + len(localRecords), + ) + return nil + } + + // if we are past the next standby deadline (?) + if time.Now().After(nextStandbyMessageDeadline) { + if !p.commitLock { + log.Infof( + "[%s] Stand-by deadline exceeded, returning currently accumulated records - %d", + req.FlowJobName, + len(localRecords), + ) + return nil + } else { + // we need to wait for next commit. + waitingForCommit = true + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + } + } + var ctx context.Context var cancel context.CancelFunc