diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 55108ffb76..d0ff430888 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + "github.com/aws/smithy-go/ptr" "github.com/jackc/pgx/v5/pgtype" "github.com/snowflakedb/gosnowflake" "go.temporal.io/sdk/activity" @@ -173,6 +174,9 @@ func NewSnowflakeConnector( return nil, err } + additionalParams := make(map[string]*string) + additionalParams["CLIENT_SESSION_KEEP_ALIVE"] = ptr.String("true") + snowflakeConfig := gosnowflake.Config{ Account: snowflakeProtoConfig.AccountId, User: snowflakeProtoConfig.Username, @@ -183,7 +187,9 @@ func NewSnowflakeConnector( Role: snowflakeProtoConfig.Role, RequestTimeout: time.Duration(snowflakeProtoConfig.QueryTimeout), DisableTelemetry: true, + Params: additionalParams, } + snowflakeConfigDSN, err := gosnowflake.DSN(&snowflakeConfig) if err != nil { return nil, fmt.Errorf("failed to get DSN from Snowflake config: %w", err) @@ -499,6 +505,7 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No } for batchId := normBatchID + 1; batchId <= req.SyncBatchID; batchId++ { + c.logger.Info(fmt.Sprintf("normalizing records for batch %d [of %d]", batchId, req.SyncBatchID)) mergeErr := c.mergeTablesForBatch(ctx, batchId, req.FlowJobName, req.TableNameSchemaMapping, &protos.PeerDBColumns{ @@ -565,7 +572,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( } startTime := time.Now() - c.logger.Info("[merge] merging records...", "destTable", tableName) + c.logger.Info("[merge] merging records...", "destTable", tableName, "batchId", batchId) result, err := c.database.ExecContext(gCtx, mergeStatement, tableName) if err != nil { @@ -575,7 +582,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch( endTime := time.Now() c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", - tableName, endTime.Sub(startTime)/time.Second)) + tableName, endTime.Sub(startTime)/time.Second), "batchId", batchId) rowsAffected, err := result.RowsAffected() if err != nil {