From c1134616cb3f1d0b45792a02f6c89ebfffd8f041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 15 Mar 2024 17:23:50 +0000 Subject: [PATCH] Add environment variable to control parallelism of snowflake merge Set to 1 to disable parallelism Set to -1 to enable unlimited parallelism --- flow/connectors/snowflake/snowflake.go | 3 ++- flow/peerdbenv/config.go | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4ba626a053..746768bf51 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -26,6 +26,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -550,7 +551,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(8) // limit parallel merges to 8 + g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism()) for _, tableName := range destinationTableNames { if gCtx.Err() != nil { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 4903bec091..19f391051a 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -91,6 +91,10 @@ func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) } +func PeerDBSnowflakeMergeParallelism() int { + return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) +} + // PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN func PeerDBTelemetryAWSSNSTopicArn() string { return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")