Skip to content

Commit

Permalink
SF storage integration fixes (#1517)
Browse files Browse the repository at this point in the history
Code is in need of general cleanup

1) CDC was hardcoding to internal stage, make it respect external
stages.
2) fixing 1 leads to nil pointer dereference in CDC because integration
detection accesses a field not set in CDC.
  • Loading branch information
heavycrystal authored Mar 21, 2024
1 parent 61a73d9 commit 472d279
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
5 changes: 4 additions & 1 deletion flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func (c *SnowflakeConnector) createExternalStage(stageName string, config *proto

cleanURL := fmt.Sprintf("s3://%s/%s/%s", s3o.Bucket, s3o.Prefix, config.FlowJobName)

s3Int := config.DestinationPeer.GetSnowflakeConfig().S3Integration
var s3Int string
if config.DestinationPeer != nil {
s3Int = config.DestinationPeer.GetSnowflakeConfig().S3Integration
}
if s3Int == "" {
credsStr := fmt.Sprintf("CREDENTIALS=(AWS_KEY_ID='%s' AWS_SECRET_KEY='%s')",
awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
}

qrepConfig := &protos.QRepConfig{
StagingPath: "",
StagingPath: req.StagingPath,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s.%s", c.rawSchema,
rawTableIdentifier)),
Expand Down

0 comments on commit 472d279

Please sign in to comment.