From 9faf556fc4c2faeb8e3f1c0b2946bae3a0d97d0a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 16 Nov 2023 11:34:25 -0500 Subject: [PATCH] remove the env --- .github/workflows/flow.yml | 1 - flow/connectors/postgres/cdc.go | 6 +++--- flow/connectors/utils/env.go | 20 -------------------- 3 files changed, 3 insertions(+), 24 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index f01fedb9a0..a0a1889838 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -121,4 +121,3 @@ jobs: PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3 - PEERDB_ENVIRONMENT: test diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 58cdd230df..2b05135a85 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) consumeStream( var ctx context.Context var cancel context.CancelFunc - if len(localRecords) == 0 { + if len(localRecords) == 0 && !receivedPKM { // if length of localRecords is 0, then we are waiting for the first record // indefinitely. So we should not timeout. ctx, cancel = context.WithCancel(p.ctx) @@ -256,8 +256,8 @@ func (p *PostgresCDCSource) consumeStream( cancel() if err != nil && !p.commitLock { if pgconn.Timeout(err) { - log.Infof("Idle timeout reached") - if utils.IsTestEnv() || receivedPKM || len(localRecords) > 0 { + log.Infof("Idle timeout of %d seconds reached", standbyMessageTimeout/time.Second) + if receivedPKM || len(localRecords) > 0 { log.Infof("Returning currently accumulated records - %d", len(localRecords)) return nil } else { diff --git a/flow/connectors/utils/env.go b/flow/connectors/utils/env.go index 6d2bebc0f2..2911e3d8ef 100644 --- a/flow/connectors/utils/env.go +++ b/flow/connectors/utils/env.go @@ -45,23 +45,3 @@ func GetEnvInt(name string, defaultValue int) int { return i } - -// GetEnvString returns the value of the environment variable with the given -// name or defaultValue if the environment variable is not set. -func GetEnvString(name string, defaultValue string) string { - val, ok := GetEnv(name) - if !ok { - return defaultValue - } - - return val -} - -func IsTestEnv() bool { - envVal := GetEnvString("PEERDB_ENVIRONMENT", "") - if envVal == "test" { - return true - } else { - return false - } -}