Skip to content

Commit

Permalink
fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 19, 2024
1 parent efbe080 commit a950262
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 51 deletions.
4 changes: 3 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
Expand Down Expand Up @@ -323,8 +324,9 @@ func (a *FlowableActivity) SyncFlow(
}
logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))

bufferSizeForCDC := dynamicconf.PeerDBCDCChannelBufferSize(ctx)
// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
recordBatch := model.NewCDCRecordStream(bufferSizeForCDC)
startTime := time.Now()

errGroup, errCtx := errgroup.WithContext(ctx)
Expand Down
56 changes: 6 additions & 50 deletions flow/dynamicconf/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,50 +53,6 @@ func dynamicConfNumber[T constraints.Integer](ctx context.Context, key string, d
return T(result)
}

func dynamicConfString(ctx context.Context, key string, defaultValue string) string {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Text
query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

return value.String
}

func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool {
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
}

var value pgtype.Bool
query := "SELECT config_value FROM dynamic_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return defaultValue
}

return value.Bool
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 {
return dynamicConfNumber[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
Expand All @@ -114,22 +70,22 @@ func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 {
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context) int {
return int(dynamicConfNumber[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8))
return int(dynamicConfNumber[int32](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of a signed 64-bit integer from
strconv.ParseInt
to a lower bit size type int without an upper bound check.
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD
func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int {
return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000))
func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) int64 {
return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000)
}

// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled
func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int {
return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1))
func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) int64 {
return dynamicConfNumber[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1)
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func PeerDBCDCChannelBufferSize(ctx context.Context) int {
return int(dynamicConfNumber[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18))
return int(dynamicConfNumber[int32](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of a signed 64-bit integer from
strconv.ParseInt
to a lower bit size type int without an upper bound check.
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
Expand Down

0 comments on commit a950262

Please sign in to comment.