From b230e62113d6d7ccc7fcf8dec1a23c5a9e34b539 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 16 Nov 2023 11:33:24 -0500 Subject: [PATCH] set no deadline for the first message --- flow/connectors/postgres/cdc.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 17b5a13fe0..58cdd230df 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -241,7 +241,17 @@ func (p *PostgresCDCSource) consumeStream( } } - ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) + var ctx context.Context + var cancel context.CancelFunc + + if len(localRecords) == 0 { + // if length of localRecords is 0, then we are waiting for the first record + // indefinitely. So we should not timeout. + ctx, cancel = context.WithCancel(p.ctx) + } else { + ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline) + } + rawMsg, err := conn.ReceiveMessage(ctx) cancel() if err != nil && !p.commitLock {