diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 17b5a13fe0..58cdd230df 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -241,7 +241,17 @@ func (p *PostgresCDCSource) consumeStream( } } - ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) + var ctx context.Context + var cancel context.CancelFunc + + if len(localRecords) == 0 { + // if length of localRecords is 0, then we are waiting for the first record + // indefinitely. So we should not timeout. + ctx, cancel = context.WithCancel(p.ctx) + } else { + ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline) + } + rawMsg, err := conn.ReceiveMessage(ctx) cancel() if err != nil && !p.commitLock {