From d605a0b8039fd6424562f1c23fbcd93f6a1abb2d Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 22 Oct 2023 17:03:14 -0400 Subject: [PATCH] Make idle timeout configurable `PEERDB_CDC_IDLE_TIMEOUT_SECONDS` is the env var --- flow/activities/flowable.go | 4 +++- flow/connectors/utils/env.go | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 832274fd7f..3342e2d008 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -193,6 +193,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier } + idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10) + startTime := time.Now() recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{ FlowJobName: input.FlowConnectionConfigs.FlowJobName, @@ -200,7 +202,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastSyncState: input.LastSyncState, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), - IdleTimeout: 10 * time.Second, + IdleTimeout: time.Duration(idleTimeout) * time.Second, TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, diff --git a/flow/connectors/utils/env.go b/flow/connectors/utils/env.go index d3c1acfff5..2911e3d8ef 100644 --- a/flow/connectors/utils/env.go +++ b/flow/connectors/utils/env.go @@ -28,3 +28,20 @@ func GetEnvBool(name string, defaultValue bool) bool { return b } + +// GetEnvInt returns the value of the environment variable with the given name +// or defaultValue if the environment variable is not set or is not a valid +// integer value. +func GetEnvInt(name string, defaultValue int) int { + val, ok := GetEnv(name) + if !ok { + return defaultValue + } + + i, err := strconv.Atoi(val) + if err != nil { + return defaultValue + } + + return i +}