Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move some environment variables to be fetched by catalog #1502

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
Expand Down Expand Up @@ -323,8 +324,9 @@ func (a *FlowableActivity) SyncFlow(
}
logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))

bufferSizeForCDC := dynamicconf.PeerDBCDCChannelBufferSize(ctx)
// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
recordBatch := model.NewCDCRecordStream(bufferSizeForCDC)
startTime := time.Now()

errGroup, errCtx := errgroup.WithContext(ctx)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type EventHubConnector struct {
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false)

ticker := time.NewTicker(peerdbenv.PeerDBEventhubFlushTimeoutSeconds())
ticker := time.NewTicker(dynamicconf.PeerDBEventhubFlushTimeoutSeconds(ctx))
defer ticker.Stop()

lastSeenLSN := int64(0)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -551,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch(

var totalRowsAffected int64 = 0
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism())
g.SetLimit(dynamicconf.PeerDBSnowflakeMergeParallelism(ctx))

for _, tableName := range destinationTableNames {
if gCtx.Err() != nil {
Expand Down
71 changes: 65 additions & 6 deletions flow/dynamicconf/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (

"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/exp/constraints"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
)

func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool {
var exists pgtype.Bool
query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)"
query := "SELECT EXISTS(SELECT 1 FROM dynamic_settings WHERE config_name = $1)"
err := conn.QueryRow(ctx, query, key).Scan(&exists)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to check if key exists: %v", err)
Expand All @@ -24,7 +25,35 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b
return exists.Bool
}

func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 {
func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string, defaultValue T) T {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Text
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

result, err := strconv.ParseInt(value.String, 10, 32)
Copy link
Contributor

@serprex serprex Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result, err := strconv.ParseInt(value.String, 10, 32)
result, err := strconv.ParseInt(value.String, 10, 64)

Should update signed code too

if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err)
return defaultValue
}

return T(result)
}

func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string, defaultValue T) T {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
Expand All @@ -49,21 +78,51 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin
return defaultValue
}

return uint32(result)
return T(result)
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
return dynamicConfUnsigned[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration {
why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15))
why := dynamicConfUnsigned[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)
return time.Duration(why) * time.Minute
}

// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
return dynamicConfUnsigned[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context) int {
return dynamicConfSigned(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD
func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 {
return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 {
return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func PeerDBCDCChannelBufferSize(ctx context.Context) int {
return dynamicConfSigned(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration {
x := dynamicConfSigned[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint32 {
return dynamicConfUnsigned[uint32](ctx, "GOMEMLIMIT", 0)
}
6 changes: 2 additions & 4 deletions flow/model/cdc_record_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"sync/atomic"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type CDCRecordStream struct {
Expand All @@ -20,10 +19,9 @@ type CDCRecordStream struct {
emptySignal chan bool
}

func NewCDCRecordStream() *CDCRecordStream {
channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize()
func NewCDCRecordStream(bufferSizeForCDC int) *CDCRecordStream {
return &CDCRecordStream{
records: make(chan Record, channelBuffer),
records: make(chan Record, bufferSizeForCDC),
SchemaDeltas: make([]*protos.TableSchemaDelta, 0),
emptySignal: make(chan bool, 1),
lastCheckpointSet: false,
Expand Down
64 changes: 24 additions & 40 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ func PeerDBDeploymentUID() string {
return getEnvString("PEERDB_DEPLOYMENT_UID", "")
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func PeerDBCDCChannelBufferSize() int {
return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func PeerDBEventhubFlushTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// env variable doesn't exist anymore, but tests appear to depend on this
// in lieu of an actual value of IdleTimeoutSeconds
func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
Expand All @@ -41,26 +30,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD
func PeerDBCDCDiskSpillRecordsThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillMemPercentThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync() bool {
return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false)
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
}

// PEERDB_CATALOG_HOST
func PeerDBCatalogHost() string {
return getEnvString("PEERDB_CATALOG_HOST", "")
Expand Down Expand Up @@ -91,15 +60,6 @@ func PeerDBEnableWALHeartbeat() bool {
return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false)
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

func PeerDBSnowflakeMergeParallelism() int {
return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)
}

// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN
func PeerDBTelemetryAWSSNSTopicArn() string {
return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")
Expand All @@ -121,3 +81,27 @@ func PeerDBAlertingEmailSenderRegion() string {
func PeerDBAlertingEmailSenderReplyToAddresses() string {
return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "")
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync() bool {
return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillRecordsThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
}

func PeerDBCDCDiskSpillMemPercentThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
}
3 changes: 3 additions & 0 deletions nexus/catalog/migrations/V23__dyncconfigtable.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE alerting_settings RENAME TO dynamic_settings;
ALTER TABLE dynamic_settings ADD COLUMN setting_description TEXT;
ALTER TABLE dynamic_settings ADD COLUMN needs_restart BOOLEAN;
Loading