diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 5a5c2eb638..b98771d5d6 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -25,7 +25,35 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b return exists.Bool } -func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, defaultValue T) T { +func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string, defaultValue T) T { + conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + return defaultValue + } + + if !dynamicConfKeyExists(ctx, conn, key) { + return defaultValue + } + + var value pgtype.Text + query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + return defaultValue + } + + result, err := strconv.ParseInt(value.String, 10, 32) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) + return defaultValue + } + + return T(result) +} + +func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string, defaultValue T) T { conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) @@ -55,46 +83,46 @@ func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, d // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { - return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) + return dynamicConfUnsigned[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { - why := dynamicConfNumber[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15) + why := dynamicConfUnsigned[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15) return time.Duration(why) * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { - return dynamicConfNumber[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) + return dynamicConfUnsigned[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { - return int(dynamicConfNumber[int32](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) + return dynamicConfSigned(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 { - return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 { - return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } // PEERDB_CDC_CHANNEL_BUFFER_SIZE func PeerDBCDCChannelBufferSize(ctx context.Context) int { - return int(dynamicConfNumber[int32](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) + return dynamicConfSigned(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration { - x := dynamicConfNumber[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) + x := dynamicConfSigned[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) return time.Duration(x) * time.Second } // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum -func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint64 { - return dynamicConfNumber[uint64](ctx, "GOMEMLIMIT", 0) +func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint32 { + return dynamicConfUnsigned[uint32](ctx, "GOMEMLIMIT", 0) }