diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index a0a1889838..f01fedb9a0 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -121,3 +121,4 @@ jobs: PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3 + PEERDB_ENVIRONMENT: test diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 57d1ae1b47..17b5a13fe0 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -247,7 +247,7 @@ func (p *PostgresCDCSource) consumeStream( if err != nil && !p.commitLock { if pgconn.Timeout(err) { log.Infof("Idle timeout reached") - if receivedPKM || len(localRecords) > 0 { + if utils.IsTestEnv() || receivedPKM || len(localRecords) > 0 { log.Infof("Returning currently accumulated records - %d", len(localRecords)) return nil } else { diff --git a/flow/connectors/utils/env.go b/flow/connectors/utils/env.go index 2911e3d8ef..6d2bebc0f2 100644 --- a/flow/connectors/utils/env.go +++ b/flow/connectors/utils/env.go @@ -45,3 +45,23 @@ func GetEnvInt(name string, defaultValue int) int { return i } + +// GetEnvString returns the value of the environment variable with the given +// name or defaultValue if the environment variable is not set. +func GetEnvString(name string, defaultValue string) string { + val, ok := GetEnv(name) + if !ok { + return defaultValue + } + + return val +} + +func IsTestEnv() bool { + envVal := GetEnvString("PEERDB_ENVIRONMENT", "") + if envVal == "test" { + return true + } else { + return false + } +}