Skip to content

Commit

Permalink
rename config table, move some vars to it
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 19, 2024
1 parent fdff1a7 commit 9562dac
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 41 deletions.
121 changes: 115 additions & 6 deletions flow/dynamicconf/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dynamicconf

import (
"context"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgtype"
Expand All @@ -14,7 +13,7 @@ import (

func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool {
var exists pgtype.Bool
query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)"
query := "SELECT EXISTS(SELECT 1 FROM dynamic_settings WHERE config_name = $1)"
err := conn.QueryRow(ctx, query, key).Scan(&exists)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to check if key exists: %v", err)
Expand All @@ -35,21 +34,96 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin
return defaultValue
}

var value pgtype.Text
var value pgtype.Uint32
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

result, err := strconv.ParseUint(value.String, 10, 32)
if err != nil {
if !value.Valid {
logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err)
return defaultValue
}

return uint32(result)
return value.Uint32
}

func dynamicConfInt64(ctx context.Context, key string, defaultValue int64) int64 {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Int8
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

if !value.Valid {
logger.LoggerFromCtx(ctx).Error("Failed to parse int64: %v", err)
return defaultValue
}

return value.Int64
}

func dynamicConfString(ctx context.Context, key string, defaultValue string) string {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Text
query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

return value.String
}

func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Bool
query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

return value.Bool
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func P(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
Expand All @@ -67,3 +141,38 @@ func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration {
func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize(ctx context.Context) bool {
return dynamicConfBool(ctx, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8))
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD
func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000))
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1))
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync(ctx context.Context) bool {
return dynamicConfBool(ctx, "PEERDB_DISABLE_ONE_SYNC", false)
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func PeerDBCDCChannelBufferSize(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18))
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration {
x := dynamicConfInt64(ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}
35 changes: 0 additions & 35 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ func PeerDBDeploymentUID() string {
return getEnvString("PEERDB_DEPLOYMENT_UID", "")
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func PeerDBCDCChannelBufferSize() int {
return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func PeerDBEventhubFlushTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// env variable doesn't exist anymore, but tests appear to depend on this
// in lieu of an actual value of IdleTimeoutSeconds
func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
Expand All @@ -41,21 +30,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD
func PeerDBCDCDiskSpillRecordsThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillMemPercentThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync() bool {
return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false)
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
Expand Down Expand Up @@ -91,15 +65,6 @@ func PeerDBEnableWALHeartbeat() bool {
return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false)
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

func PeerDBSnowflakeMergeParallelism() int {
return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)
}

// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN
func PeerDBTelemetryAWSSNSTopicArn() string {
return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")
Expand Down
1 change: 1 addition & 0 deletions nexus/catalog/migrations/V23__dyncconfigtable.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE alerting_settings RENAME TO dynamic_settings;

0 comments on commit 9562dac

Please sign in to comment.