Skip to content

Commit

Permalink
account for inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 14, 2023
1 parent 6a892ce commit 9ff1a16
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,13 +1010,37 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
for _, columnName := range columnNames {
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(columnName)))
}

// add soft delete and synced at columns to the list of columns
if normalizeReq.SoftDelete {
colName := normalizeReq.SoftDeleteColName
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName)))
}

if normalizeReq.SyncedAtColName != "" {
colName := normalizeReq.SyncedAtColName
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName)))
}

insertColumnsSQL := strings.TrimSuffix(strings.Join(quotedUpperColNames, ","), ",")

insertValuesSQLArray := make([]string, 0, len(columnNames))
for _, columnName := range columnNames {
quotedUpperColumnName := fmt.Sprintf(`"%s"`, strings.ToUpper(columnName))
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s,", quotedUpperColumnName))
}

// add soft delete and synced at columns to the list of columns
if normalizeReq.SoftDelete {
// add false as default value for soft delete column
insertValuesSQLArray = append(insertValuesSQLArray, "FALSE,")
}

if normalizeReq.SyncedAtColName != "" {
// add current timestamp as default value for synced at column
insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP,")
}

insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",")

updateStatementsforToastCols := c.generateUpdateStatement(
Expand Down

0 comments on commit 9ff1a16

Please sign in to comment.