diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 68f764a890..f8d9afda1a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -24,6 +24,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" @@ -323,8 +324,9 @@ func (a *FlowableActivity) SyncFlow( } logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset)) + bufferSizeForCDC := dynamicconf.PeerDBCDCChannelBufferSize(ctx) // start a goroutine to pull records from the source - recordBatch := model.NewCDCRecordStream() + recordBatch := model.NewCDCRecordStream(bufferSizeForCDC) startTime := time.Now() errGroup, errCtx := errgroup.WithContext(ctx) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index d34c1d9274..db56857691 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -12,10 +12,10 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) type EventHubConnector struct { @@ -106,7 +106,7 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false) - ticker := time.NewTicker(peerdbenv.PeerDBEventhubFlushTimeoutSeconds()) + ticker := time.NewTicker(dynamicconf.PeerDBEventhubFlushTimeoutSeconds(ctx)) defer ticker.Stop() lastSeenLSN := int64(0) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 746768bf51..7685f4fece 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -21,12 +21,12 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -551,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism()) + g.SetLimit(dynamicconf.PeerDBSnowflakeMergeParallelism(ctx)) for _, tableName := range destinationTableNames { if gCtx.Err() != nil { diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 5225c400b1..b98771d5d6 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -7,6 +7,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/exp/constraints" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/logger" @@ -14,7 +15,7 @@ import ( func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool { var exists pgtype.Bool - query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)" + query := "SELECT EXISTS(SELECT 1 FROM dynamic_settings WHERE config_name = $1)" err := conn.QueryRow(ctx, query, key).Scan(&exists) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to check if key exists: %v", err) @@ -24,7 +25,35 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b return exists.Bool } -func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 { +func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string, defaultValue T) T { + conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) + return defaultValue + } + + if !dynamicConfKeyExists(ctx, conn, key) { + return defaultValue + } + + var value pgtype.Text + query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" + err = conn.QueryRow(ctx, query, key).Scan(&value) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) + return defaultValue + } + + result, err := strconv.ParseInt(value.String, 10, 32) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) + return defaultValue + } + + return T(result) +} + +func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string, defaultValue T) T { conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) @@ -49,21 +78,51 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin return defaultValue } - return uint32(result) + return T(result) } // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) + return dynamicConfUnsigned[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { - why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)) + why := dynamicConfUnsigned[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15) return time.Duration(why) * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) + return dynamicConfUnsigned[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) +} + +func PeerDBSnowflakeMergeParallelism(ctx context.Context) int { + return dynamicConfSigned(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD +func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled +func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) +} + +// PEERDB_CDC_CHANNEL_BUFFER_SIZE +func PeerDBCDCChannelBufferSize(ctx context.Context) int { + return dynamicConfSigned(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) +} + +// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS +func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration { + x := dynamicConfSigned[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) + return time.Duration(x) * time.Second +} + +// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum +func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint32 { + return dynamicConfUnsigned[uint32](ctx, "GOMEMLIMIT", 0) } diff --git a/flow/model/cdc_record_stream.go b/flow/model/cdc_record_stream.go index dcdadfbb67..a9a23d131b 100644 --- a/flow/model/cdc_record_stream.go +++ b/flow/model/cdc_record_stream.go @@ -4,7 +4,6 @@ import ( "sync/atomic" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) type CDCRecordStream struct { @@ -20,10 +19,9 @@ type CDCRecordStream struct { emptySignal chan bool } -func NewCDCRecordStream() *CDCRecordStream { - channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize() +func NewCDCRecordStream(bufferSizeForCDC int) *CDCRecordStream { return &CDCRecordStream{ - records: make(chan Record, channelBuffer), + records: make(chan Record, bufferSizeForCDC), SchemaDeltas: make([]*protos.TableSchemaDelta, 0), emptySignal: make(chan bool, 1), lastCheckpointSet: false, diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 1d0eacc01a..fb250ca7a8 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -18,17 +18,6 @@ func PeerDBDeploymentUID() string { return getEnvString("PEERDB_DEPLOYMENT_UID", "") } -// PEERDB_CDC_CHANNEL_BUFFER_SIZE -func PeerDBCDCChannelBufferSize() int { - return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) -} - -// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS -func PeerDBEventhubFlushTimeoutSeconds() time.Duration { - x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) - return time.Duration(x) * time.Second -} - // env variable doesn't exist anymore, but tests appear to depend on this // in lieu of an actual value of IdleTimeoutSeconds func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { @@ -41,26 +30,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD -func PeerDBCDCDiskSpillRecordsThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) -} - -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled -func PeerDBCDCDiskSpillMemPercentThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) -} - -// PEERDB_DISABLE_ONE_SYNC -func PeerDBDisableOneSync() bool { - return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) -} - -// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum -func PeerDBFlowWorkerMaxMemBytes() uint64 { - return getEnvUint[uint64]("GOMEMLIMIT", 0) -} - // PEERDB_CATALOG_HOST func PeerDBCatalogHost() string { return getEnvString("PEERDB_CATALOG_HOST", "") @@ -91,15 +60,6 @@ func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) } -// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE -func PeerDBEnableParallelSyncNormalize() bool { - return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) -} - -func PeerDBSnowflakeMergeParallelism() int { - return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) -} - // PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN func PeerDBTelemetryAWSSNSTopicArn() string { return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") @@ -121,3 +81,27 @@ func PeerDBAlertingEmailSenderRegion() string { func PeerDBAlertingEmailSenderReplyToAddresses() string { return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "") } + +// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE +func PeerDBEnableParallelSyncNormalize() bool { + return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) +} + +// PEERDB_DISABLE_ONE_SYNC +func PeerDBDisableOneSync() bool { + return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) +} + +// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled +func PeerDBCDCDiskSpillRecordsThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) +} + +func PeerDBCDCDiskSpillMemPercentThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) +} + +// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum +func PeerDBFlowWorkerMaxMemBytes() uint64 { + return getEnvUint[uint64]("GOMEMLIMIT", 0) +} diff --git a/nexus/catalog/migrations/V23__dyncconfigtable.sql b/nexus/catalog/migrations/V23__dyncconfigtable.sql new file mode 100644 index 0000000000..946dd7211a --- /dev/null +++ b/nexus/catalog/migrations/V23__dyncconfigtable.sql @@ -0,0 +1,3 @@ +ALTER TABLE alerting_settings RENAME TO dynamic_settings; +ALTER TABLE dynamic_settings ADD COLUMN setting_description TEXT; +ALTER TABLE dynamic_settings ADD COLUMN needs_restart BOOLEAN;