diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4ba626a053..746768bf51 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -26,6 +26,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -550,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(8) // limit parallel merges to 8 + g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism()) for _, tableName := range destinationTableNames { if gCtx.Err() != nil { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 4903bec091..19f391051a 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -91,6 +91,10 @@ func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) } +func PeerDBSnowflakeMergeParallelism() int { + return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) +} + // PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN func PeerDBTelemetryAWSSNSTopicArn() string { return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")