From 9562dacb982b768754dc7e537824bbfd0ac3a87b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 19 Mar 2024 17:46:32 +0530 Subject: [PATCH 1/5] rename config table, move some vars to it --- flow/dynamicconf/dynamicconf.go | 121 +++++++++++++++++- flow/peerdbenv/config.go | 35 ----- .../migrations/V23__dyncconfigtable.sql | 1 + 3 files changed, 116 insertions(+), 41 deletions(-) create mode 100644 nexus/catalog/migrations/V23__dyncconfigtable.sql diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 5225c400b1..7e8fbf71c0 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -2,7 +2,6 @@ package dynamicconf import ( "context" - "strconv" "time" "github.com/jackc/pgx/v5/pgtype" @@ -14,7 +13,7 @@ import ( func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool { var exists pgtype.Bool - query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)" + query := "SELECT EXISTS(SELECT 1 FROM dynamic_settings WHERE config_name = $1)" err := conn.QueryRow(ctx, query, key).Scan(&exists) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to check if key exists: %v", err) @@ -35,7 +34,7 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin return defaultValue } - var value pgtype.Text + var value pgtype.Uint32 query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" err = conn.QueryRow(ctx, query, key).Scan(&value) if err != nil { @@ -43,13 +42,88 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin return defaultValue } - result, err := strconv.ParseUint(value.String, 10, 32) - if err != nil { + if !value.Valid { logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) return defaultValue } - return uint32(result) + return value.Uint32 +} + +func dynamicConfInt64(ctx context.Context, key string, defaultValue int64) int64 { + conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + return defaultValue + } + + if !dynamicConfKeyExists(ctx, conn, key) { + return defaultValue + } + + var value pgtype.Int8 + query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + return defaultValue + } + + if !value.Valid { + logger.LoggerFromCtx(ctx).Error("Failed to parse int64: %v", err) + return defaultValue + } + + return value.Int64 +} + +func dynamicConfString(ctx context.Context, key string, defaultValue string) string { + conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + return defaultValue + } + + if !dynamicConfKeyExists(ctx, conn, key) { + return defaultValue + } + + var value pgtype.Text + query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + return defaultValue + } + + return value.String +} + +func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool { + conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + return defaultValue + } + + if !dynamicConfKeyExists(ctx, conn, key) { + return defaultValue + } + + var value pgtype.Bool + query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + return defaultValue + } + + return value.Bool +} + +// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely +func P(ctx context.Context) uint32 { + return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely @@ -67,3 +141,38 @@ func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } + +// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE +func PeerDBEnableParallelSyncNormalize(ctx context.Context) bool { + return dynamicConfBool(ctx, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) +} + +func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { + return int(dynamicConfInt64(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD +func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int { + return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled +func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int { + return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)) +} + +// PEERDB_DISABLE_ONE_SYNC +func PeerDBDisableOneSync(ctx context.Context) bool { + return dynamicConfBool(ctx, "PEERDB_DISABLE_ONE_SYNC", false) +} + +// PEERDB_CDC_CHANNEL_BUFFER_SIZE +func PeerDBCDCChannelBufferSize(ctx context.Context) int { + return int(dynamicConfInt64(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) +} + +// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS +func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration { + x := dynamicConfInt64(ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) + return time.Duration(x) * time.Second +} diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 1d0eacc01a..a947656061 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -18,17 +18,6 @@ func PeerDBDeploymentUID() string { return getEnvString("PEERDB_DEPLOYMENT_UID", "") } -// PEERDB_CDC_CHANNEL_BUFFER_SIZE -func PeerDBCDCChannelBufferSize() int { - return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) -} - -// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS -func PeerDBEventhubFlushTimeoutSeconds() time.Duration { - x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) - return time.Duration(x) * time.Second -} - // env variable doesn't exist anymore, but tests appear to depend on this // in lieu of an actual value of IdleTimeoutSeconds func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { @@ -41,21 +30,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD -func PeerDBCDCDiskSpillRecordsThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) -} - -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled -func PeerDBCDCDiskSpillMemPercentThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) -} - -// PEERDB_DISABLE_ONE_SYNC -func PeerDBDisableOneSync() bool { - return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) -} - // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum func PeerDBFlowWorkerMaxMemBytes() uint64 { return getEnvUint[uint64]("GOMEMLIMIT", 0) @@ -91,15 +65,6 @@ func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) } -// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE -func PeerDBEnableParallelSyncNormalize() bool { - return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) -} - -func PeerDBSnowflakeMergeParallelism() int { - return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) -} - // PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN func PeerDBTelemetryAWSSNSTopicArn() string { return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") diff --git a/nexus/catalog/migrations/V23__dyncconfigtable.sql b/nexus/catalog/migrations/V23__dyncconfigtable.sql new file mode 100644 index 0000000000..04e0ad5bd0 --- /dev/null +++ b/nexus/catalog/migrations/V23__dyncconfigtable.sql @@ -0,0 +1 @@ +ALTER TABLE alerting_settings RENAME TO dynamic_settings; From efbe080e832123981c3a219b734b6699cf6b466b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 19 Mar 2024 23:40:37 +0530 Subject: [PATCH 2/5] cleanup and more moving --- flow/connectors/eventhub/eventhub.go | 4 +- flow/connectors/snowflake/snowflake.go | 4 +- flow/dynamicconf/dynamicconf.go | 74 +++++++------------------- flow/model/cdc_record_stream.go | 6 +-- flow/peerdbenv/config.go | 29 ++++++++-- 5 files changed, 50 insertions(+), 67 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index d34c1d9274..db56857691 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -12,10 +12,10 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) type EventHubConnector struct { @@ -106,7 +106,7 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false) - ticker := time.NewTicker(peerdbenv.PeerDBEventhubFlushTimeoutSeconds()) + ticker := time.NewTicker(dynamicconf.PeerDBEventhubFlushTimeoutSeconds(ctx)) defer ticker.Stop() lastSeenLSN := int64(0) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 746768bf51..7685f4fece 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -21,12 +21,12 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -551,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism()) + g.SetLimit(dynamicconf.PeerDBSnowflakeMergeParallelism(ctx)) for _, tableName := range destinationTableNames { if gCtx.Err() != nil { diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 7e8fbf71c0..adb4dde5dd 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -2,10 +2,12 @@ package dynamicconf import ( "context" + "strconv" "time" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/exp/constraints" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/logger" @@ -23,7 +25,7 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b return exists.Bool } -func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 { +func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, defaultValue T) T { conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) @@ -34,7 +36,7 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin return defaultValue } - var value pgtype.Uint32 + var value pgtype.Text query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" err = conn.QueryRow(ctx, query, key).Scan(&value) if err != nil { @@ -42,39 +44,13 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin return defaultValue } - if !value.Valid { - logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) - return defaultValue - } - - return value.Uint32 -} - -func dynamicConfInt64(ctx context.Context, key string, defaultValue int64) int64 { - conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + result, err := strconv.ParseUint(value.String, 10, 32) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Int8 - query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - if !value.Valid { - logger.LoggerFromCtx(ctx).Error("Failed to parse int64: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) return defaultValue } - return value.Int64 + return T(result) } func dynamicConfString(ctx context.Context, key string, defaultValue string) string { @@ -121,58 +97,48 @@ func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool { return value.Bool } -// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely -func P(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) -} - // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) + return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { - why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)) + why := dynamicConfNumber[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15) return time.Duration(why) * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) -} - -// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE -func PeerDBEnableParallelSyncNormalize(ctx context.Context) bool { - return dynamicConfBool(ctx, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) + return dynamicConfNumber[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) + return int(dynamicConfNumber[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)) + return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)) -} - -// PEERDB_DISABLE_ONE_SYNC -func PeerDBDisableOneSync(ctx context.Context) bool { - return dynamicConfBool(ctx, "PEERDB_DISABLE_ONE_SYNC", false) + return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)) } // PEERDB_CDC_CHANNEL_BUFFER_SIZE func PeerDBCDCChannelBufferSize(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) + return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration { - x := dynamicConfInt64(ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) + x := dynamicConfNumber[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) return time.Duration(x) * time.Second } + +// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum +func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint64 { + return dynamicConfNumber[uint64](ctx, "GOMEMLIMIT", 0) +} diff --git a/flow/model/cdc_record_stream.go b/flow/model/cdc_record_stream.go index dcdadfbb67..a9a23d131b 100644 --- a/flow/model/cdc_record_stream.go +++ b/flow/model/cdc_record_stream.go @@ -4,7 +4,6 @@ import ( "sync/atomic" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) type CDCRecordStream struct { @@ -20,10 +19,9 @@ type CDCRecordStream struct { emptySignal chan bool } -func NewCDCRecordStream() *CDCRecordStream { - channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize() +func NewCDCRecordStream(bufferSizeForCDC int) *CDCRecordStream { return &CDCRecordStream{ - records: make(chan Record, channelBuffer), + records: make(chan Record, bufferSizeForCDC), SchemaDeltas: make([]*protos.TableSchemaDelta, 0), emptySignal: make(chan bool, 1), lastCheckpointSet: false, diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index a947656061..fb250ca7a8 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -30,11 +30,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { return time.Duration(x) * time.Second } -// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum -func PeerDBFlowWorkerMaxMemBytes() uint64 { - return getEnvUint[uint64]("GOMEMLIMIT", 0) -} - // PEERDB_CATALOG_HOST func PeerDBCatalogHost() string { return getEnvString("PEERDB_CATALOG_HOST", "") @@ -86,3 +81,27 @@ func PeerDBAlertingEmailSenderRegion() string { func PeerDBAlertingEmailSenderReplyToAddresses() string { return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "") } + +// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE +func PeerDBEnableParallelSyncNormalize() bool { + return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) +} + +// PEERDB_DISABLE_ONE_SYNC +func PeerDBDisableOneSync() bool { + return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled +func PeerDBCDCDiskSpillRecordsThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) +} + +func PeerDBCDCDiskSpillMemPercentThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) +} + +// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum +func PeerDBFlowWorkerMaxMemBytes() uint64 { + return getEnvUint[uint64]("GOMEMLIMIT", 0) +} From a9502620df54a347029fcd4a44b4734777a1ff71 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 20 Mar 2024 00:13:52 +0530 Subject: [PATCH 3/5] fix types --- flow/activities/flowable.go | 4 ++- flow/dynamicconf/dynamicconf.go | 56 ++++----------------------------- 2 files changed, 9 insertions(+), 51 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 68f764a890..f8d9afda1a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -24,6 +24,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" @@ -323,8 +324,9 @@ func (a *FlowableActivity) SyncFlow( } logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) + bufferSizeForCDC := dynamicconf.PeerDBCDCChannelBufferSize(ctx) // start a goroutine to pull records from the source - recordBatch := model.NewCDCRecordStream() + recordBatch := model.NewCDCRecordStream(bufferSizeForCDC) startTime := time.Now() errGroup, errCtx := errgroup.WithContext(ctx) diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index adb4dde5dd..5a5c2eb638 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -53,50 +53,6 @@ func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, d return T(result) } -func dynamicConfString(ctx context.Context, key string, defaultValue string) string { - conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Text - query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - return value.String -} - -func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool { - conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Bool - query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - return value.Bool -} - // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) @@ -114,22 +70,22 @@ func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { } func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) + return int(dynamicConfNumber[int32](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD -func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)) +func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 { + return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled -func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)) +func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 { + return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } // PEERDB_CDC_CHANNEL_BUFFER_SIZE func PeerDBCDCChannelBufferSize(ctx context.Context) int { - return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) + return int(dynamicConfNumber[int32](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS From fcd5f59ce8f913fac630ba43809ff9dbe5d082fc Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 20 Mar 2024 00:19:40 +0530 Subject: [PATCH 4/5] add some columns for the table --- nexus/catalog/migrations/V23__dyncconfigtable.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nexus/catalog/migrations/V23__dyncconfigtable.sql b/nexus/catalog/migrations/V23__dyncconfigtable.sql index 04e0ad5bd0..946dd7211a 100644 --- a/nexus/catalog/migrations/V23__dyncconfigtable.sql +++ b/nexus/catalog/migrations/V23__dyncconfigtable.sql @@ -1 +1,3 @@ ALTER TABLE alerting_settings RENAME TO dynamic_settings; +ALTER TABLE dynamic_settings ADD COLUMN setting_description TEXT; +ALTER TABLE dynamic_settings ADD COLUMN needs_restart BOOLEAN; From b1b106fd4f25048c307368a692f6e2d9cf508207 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 20 Mar 2024 16:58:50 +0530 Subject: [PATCH 5/5] try to fix types --- flow/dynamicconf/dynamicconf.go | 50 +++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 5a5c2eb638..b98771d5d6 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -25,7 +25,35 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b return exists.Bool } -func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, defaultValue T) T { +func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string, defaultValue T) T { + conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + return defaultValue + } + + if !dynamicConfKeyExists(ctx, conn, key) { + return defaultValue + } + + var value pgtype.Text + query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + return defaultValue + } + + result, err := strconv.ParseInt(value.String, 10, 32) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) + return defaultValue + } + + return T(result) +} + +func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string, defaultValue T) T { conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) @@ -55,46 +83,46 @@ func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, d // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { - return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) + return dynamicConfUnsigned[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { - why := dynamicConfNumber[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15) + why := dynamicConfUnsigned[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15) return time.Duration(why) * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { - return dynamicConfNumber[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) + return dynamicConfUnsigned[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { - return int(dynamicConfNumber[int32](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) + return dynamicConfSigned(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 { - return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 { - return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } // PEERDB_CDC_CHANNEL_BUFFER_SIZE func PeerDBCDCChannelBufferSize(ctx context.Context) int { - return int(dynamicConfNumber[int32](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) + return dynamicConfSigned(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration { - x := dynamicConfNumber[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) + x := dynamicConfSigned[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) return time.Duration(x) * time.Second } // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum -func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint64 { - return dynamicConfNumber[uint64](ctx, "GOMEMLIMIT", 0) +func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint32 { + return dynamicConfUnsigned[uint32](ctx, "GOMEMLIMIT", 0) }