From 472d279b1fbeecb617fb6358a6641e50bc9dd41f Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 21 Mar 2024 08:30:29 +0530 Subject: [PATCH] SF storage integration fixes (#1517) Code is in need of general cleanup 1) CDC was hardcoding to internal stage, make it respect external stages. 2) fixing 1 leads to nil pointer dereference in CDC because integration detection accesses a field not set in CDC. --- flow/connectors/snowflake/qrep.go | 5 ++++- flow/connectors/snowflake/snowflake.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 96e337a3f1..7ba84741a9 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -135,7 +135,10 @@ func (c *SnowflakeConnector) createExternalStage(stageName string, config *proto cleanURL := fmt.Sprintf("s3://%s/%s/%s", s3o.Bucket, s3o.Prefix, config.FlowJobName) - s3Int := config.DestinationPeer.GetSnowflakeConfig().S3Integration + var s3Int string + if config.DestinationPeer != nil { + s3Int = config.DestinationPeer.GetSnowflakeConfig().S3Integration + } if s3Int == "" { credsStr := fmt.Sprintf("CREDENTIALS=(AWS_KEY_ID='%s' AWS_SECRET_KEY='%s')", awsCreds.AccessKeyID, awsCreds.SecretAccessKey) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 746768bf51..b1877320e1 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -459,7 +459,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( } qrepConfig := &protos.QRepConfig{ - StagingPath: "", + StagingPath: req.StagingPath, FlowJobName: req.FlowJobName, DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s.%s", c.rawSchema, rawTableIdentifier)),