diff --git a/docker-compose.yml b/docker-compose.yml index 1bad765a58..6cd0812937 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -183,23 +183,23 @@ services: temporal-admin-tools: condition: service_healthy - peerdb: - container_name: peerdb-server - build: - context: . - dockerfile: stacks/peerdb-server.Dockerfile - environment: - <<: *catalog-config - PEERDB_LOG_DIR: /var/log/peerdb - PEERDB_PASSWORD: peerdb - PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112 - RUST_LOG: info - RUST_BACKTRACE: 1 - ports: - - 9900:9900 - depends_on: - catalog: - condition: service_healthy + # peerdb: + # container_name: peerdb-server + # build: + # context: . + # dockerfile: stacks/peerdb-server.Dockerfile + # environment: + # <<: *catalog-config + # PEERDB_LOG_DIR: /var/log/peerdb + # PEERDB_PASSWORD: peerdb + # PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112 + # RUST_LOG: info + # RUST_BACKTRACE: 1 + # ports: + # - 9900:9900 + # depends_on: + # catalog: + # condition: service_healthy peerdb-prometheus: container_name: peerdb-prometheus diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 832274fd7f..3342e2d008 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -193,6 +193,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier } + idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10) + startTime := time.Now() recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{ FlowJobName: input.FlowConnectionConfigs.FlowJobName, @@ -200,7 +202,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastSyncState: input.LastSyncState, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), - IdleTimeout: 10 * time.Second, + IdleTimeout: time.Duration(idleTimeout) * time.Second, TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, diff --git a/flow/connectors/utils/env.go b/flow/connectors/utils/env.go index d3c1acfff5..2911e3d8ef 100644 --- a/flow/connectors/utils/env.go +++ b/flow/connectors/utils/env.go @@ -28,3 +28,20 @@ func GetEnvBool(name string, defaultValue bool) bool { return b } + +// GetEnvInt returns the value of the environment variable with the given name +// or defaultValue if the environment variable is not set or is not a valid +// integer value. +func GetEnvInt(name string, defaultValue int) int { + val, ok := GetEnv(name) + if !ok { + return defaultValue + } + + i, err := strconv.Atoi(val) + if err != nil { + return defaultValue + } + + return i +}