Skip to content

Commit

Permalink
cleanup and more moving
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 19, 2024
1 parent 9562dac commit efbe080
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 67 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type EventHubConnector struct {
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false)

ticker := time.NewTicker(peerdbenv.PeerDBEventhubFlushTimeoutSeconds())
ticker := time.NewTicker(dynamicconf.PeerDBEventhubFlushTimeoutSeconds(ctx))
defer ticker.Stop()

lastSeenLSN := int64(0)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -551,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch(

var totalRowsAffected int64 = 0
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism())
g.SetLimit(dynamicconf.PeerDBSnowflakeMergeParallelism(ctx))

for _, tableName := range destinationTableNames {
if gCtx.Err() != nil {
Expand Down
74 changes: 20 additions & 54 deletions flow/dynamicconf/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package dynamicconf

import (
"context"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/exp/constraints"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
Expand All @@ -23,7 +25,7 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b
return exists.Bool
}

func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 {
func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, defaultValue T) T {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
Expand All @@ -34,47 +36,21 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin
return defaultValue
}

var value pgtype.Uint32
var value pgtype.Text
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

if !value.Valid {
logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err)
return defaultValue
}

return value.Uint32
}

func dynamicConfInt64(ctx context.Context, key string, defaultValue int64) int64 {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
result, err := strconv.ParseUint(value.String, 10, 32)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Int8
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

if !value.Valid {
logger.LoggerFromCtx(ctx).Error("Failed to parse int64: %v", err)
logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err)
return defaultValue
}

return value.Int64
return T(result)
}

func dynamicConfString(ctx context.Context, key string, defaultValue string) string {
Expand Down Expand Up @@ -121,58 +97,48 @@ func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool {
return value.Bool
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func P(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration {
why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15))
why := dynamicConfNumber[uint32](ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)
return time.Duration(why) * time.Minute
}

// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize(ctx context.Context) bool {
return dynamicConfBool(ctx, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
return dynamicConfNumber[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8))
return int(dynamicConfNumber[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD
func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000))
return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1))
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync(ctx context.Context) bool {
return dynamicConfBool(ctx, "PEERDB_DISABLE_ONE_SYNC", false)
return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func PeerDBCDCChannelBufferSize(ctx context.Context) int {
return int(dynamicConfInt64(ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18))
return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 32-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func PeerDBEventhubFlushTimeoutSeconds(ctx context.Context) time.Duration {
x := dynamicConfInt64(ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
x := dynamicConfNumber[int64](ctx, "PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes(ctx context.Context) uint64 {
return dynamicConfNumber[uint64](ctx, "GOMEMLIMIT", 0)
}
6 changes: 2 additions & 4 deletions flow/model/cdc_record_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"sync/atomic"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type CDCRecordStream struct {
Expand All @@ -20,10 +19,9 @@ type CDCRecordStream struct {
emptySignal chan bool
}

func NewCDCRecordStream() *CDCRecordStream {
channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize()
func NewCDCRecordStream(bufferSizeForCDC int) *CDCRecordStream {
return &CDCRecordStream{
records: make(chan Record, channelBuffer),
records: make(chan Record, bufferSizeForCDC),
SchemaDeltas: make([]*protos.TableSchemaDelta, 0),
emptySignal: make(chan bool, 1),
lastCheckpointSet: false,
Expand Down
29 changes: 24 additions & 5 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
return time.Duration(x) * time.Second
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
}

// PEERDB_CATALOG_HOST
func PeerDBCatalogHost() string {
return getEnvString("PEERDB_CATALOG_HOST", "")
Expand Down Expand Up @@ -86,3 +81,27 @@ func PeerDBAlertingEmailSenderRegion() string {
func PeerDBAlertingEmailSenderReplyToAddresses() string {
return getEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "")
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

// PEERDB_DISABLE_ONE_SYNC
func PeerDBDisableOneSync() bool {
return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillRecordsThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
}

func PeerDBCDCDiskSpillMemPercentThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum
func PeerDBFlowWorkerMaxMemBytes() uint64 {
return getEnvUint[uint64]("GOMEMLIMIT", 0)
}

0 comments on commit efbe080

Please sign in to comment.