diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 68f764a890..f8d9afda1a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -24,6 +24,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" @@ -323,8 +324,9 @@ func (a *FlowableActivity) SyncFlow( } logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) + bufferSizeForCDC := dynamicconf.PeerDBCDCChannelBufferSize(ctx) // start a goroutine to pull records from the source - recordBatch := model.NewCDCRecordStream() + recordBatch := model.NewCDCRecordStream(bufferSizeForCDC) startTime := time.Now() errGroup, errCtx := errgroup.WithContext(ctx) diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index adb4dde5dd..5a5c2eb638 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -53,50 +53,6 @@ func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, d return T(result) } -func dynamicConfString(ctx context.Context, key string, defaultValue string) string { - conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Text - query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - return value.String -} - -func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool { - conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Bool - query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - return value.Bool -} - // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) @@ -114,22 +70,22 @@ func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { } func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) + return int(dynamicConfNumber[int32](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD -func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)) +func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 { + return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled -func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)) +func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 { + return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } // PEERDB_CDC_CHANNEL_BUFFER_SIZE func PeerDBCDCChannelBufferSize(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) + return int(dynamicConfNumber[int32](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS