From efbe080e832123981c3a219b734b6699cf6b466b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 19 Mar 2024 23:40:37 +0530 Subject: [PATCH] cleanup and more moving --- flow/connectors/eventhub/eventhub.go | 4 +- flow/connectors/snowflake/snowflake.go | 4 +- flow/dynamicconf/dynamicconf.go | 74 +++++++------------------- flow/model/cdc_record_stream.go | 6 +-- flow/peerdbenv/config.go | 29 ++++++++-- 5 files changed, 50 insertions(+), 67 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index d34c1d9274..db56857691 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -12,10 +12,10 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) type EventHubConnector struct { @@ -106,7 +106,7 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false) - ticker := time.NewTicker(peerdbenv.PeerDBEventhubFlushTimeoutSeconds()) + ticker := time.NewTicker(dynamicconf.PeerDBEventhubFlushTimeoutSeconds(ctx)) defer ticker.Stop() lastSeenLSN := int64(0) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 746768bf51..7685f4fece 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -21,12 +21,12 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -551,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism()) + g.SetLimit(dynamicconf.PeerDBSnowflakeMergeParallelism(ctx)) for _, tableName := range destinationTableNames { if gCtx.Err() != nil { diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 7e8fbf71c0..adb4dde5dd 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -2,10 +2,12 @@ package dynamicconf import ( "context" + "strconv" "time" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/exp/constraints" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/logger" @@ -23,7 +25,7 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b return exists.Bool } -func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 { +func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, defaultValue T) T { conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) @@ -34,7 +36,7 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin return defaultValue } - var value pgtype.Uint32 + var value pgtype.Text query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" err = conn.QueryRow(ctx, query, key).Scan(&value) if err != nil { @@ -42,39 +44,13 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin return defaultValue } - if !value.Valid { - logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) - return defaultValue - } - - return value.Uint32 -} - -func dynamicConfInt64(ctx context.Context, key string, defaultValue int64) int64 { - conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + result, err := strconv.ParseUint(value.String, 10, 32) if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Int8 - query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - if !value.Valid { - logger.LoggerFromCtx(ctx).Error("Failed to parse int64: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) return defaultValue } - return value.Int64 + return T(result) } func dynamicConfString(ctx context.Context, key string, defaultValue string) string { @@ -121,58 +97,48 @@ func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool { return value.Bool } -// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely -func P(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) -} - // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) + return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { - why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)) + why := dynamicConfNumber[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15) return time.Duration(why) * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) -} - -// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE -func PeerDBEnableParallelSyncNormalize(ctx context.Context) bool { - return dynamicConfBool(ctx, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) + return dynamicConfNumber[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) + return int(dynamicConfNumber[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)) + return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)) } // PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)) -} - -// PEERDB_DISABLE_ONE_SYNC -func PeerDBDisableOneSync(ctx context.Context) bool { - return dynamicConfBool(ctx, "PEERDB_DISABLE_ONE_SYNC", false) + return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)) } // PEERDB_CDC_CHANNEL_BUFFER_SIZE func PeerDBCDCChannelBufferSize(ctx context.Context) int { - return int(dynamicConfInt64(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) + return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration { - x := dynamicConfInt64(ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) + x := dynamicConfNumber[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) return time.Duration(x) * time.Second } + +// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum +func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint64 { + return dynamicConfNumber[uint64](ctx, "GOMEMLIMIT", 0) +} diff --git a/flow/model/cdc_record_stream.go b/flow/model/cdc_record_stream.go index dcdadfbb67..a9a23d131b 100644 --- a/flow/model/cdc_record_stream.go +++ b/flow/model/cdc_record_stream.go @@ -4,7 +4,6 @@ import ( "sync/atomic" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) type CDCRecordStream struct { @@ -20,10 +19,9 @@ type CDCRecordStream struct { emptySignal chan bool } -func NewCDCRecordStream() *CDCRecordStream { - channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize() +func NewCDCRecordStream(bufferSizeForCDC int) *CDCRecordStream { return &CDCRecordStream{ - records: make(chan Record, channelBuffer), + records: make(chan Record, bufferSizeForCDC), SchemaDeltas: make([]*protos.TableSchemaDelta, 0), emptySignal: make(chan bool, 1), lastCheckpointSet: false, diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index a947656061..fb250ca7a8 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -30,11 +30,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { return time.Duration(x) * time.Second } -// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum -func PeerDBFlowWorkerMaxMemBytes() uint64 { - return getEnvUint[uint64]("GOMEMLIMIT", 0) -} - // PEERDB_CATALOG_HOST func PeerDBCatalogHost() string { return getEnvString("PEERDB_CATALOG_HOST", "") @@ -86,3 +81,27 @@ func PeerDBAlertingEmailSenderRegion() string { func PeerDBAlertingEmailSenderReplyToAddresses() string { return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "") } + +// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE +func PeerDBEnableParallelSyncNormalize() bool { + return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) +} + +// PEERDB_DISABLE_ONE_SYNC +func PeerDBDisableOneSync() bool { + return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled +func PeerDBCDCDiskSpillRecordsThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) +} + +func PeerDBCDCDiskSpillMemPercentThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) +} + +// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum +func PeerDBFlowWorkerMaxMemBytes() uint64 { + return getEnvUint[uint64]("GOMEMLIMIT", 0) +}