Skip to content

Commit

Permalink
Add environment variable to control parallelism of snowflake merge (#…
Browse files Browse the repository at this point in the history
…1487)

Set to 1 to disable parallelism
Set to -1 to enable unlimited parallelism
  • Loading branch information
serprex authored Mar 15, 2024
1 parent 578b1ed commit fe4fdef
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
3 changes: 2 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -550,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch(

var totalRowsAffected int64 = 0
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(8) // limit parallel merges to 8
g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism())

for _, tableName := range destinationTableNames {
if gCtx.Err() != nil {
Expand Down
4 changes: 4 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

func PeerDBSnowflakeMergeParallelism() int {
return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8)
}

// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN
func PeerDBTelemetryAWSSNSTopicArn() string {
return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")
Expand Down

0 comments on commit fe4fdef

Please sign in to comment.