Skip to content

Commit

Permalink
Specialize IterColumns1
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 29, 2023
1 parent 4ff4d1e commit 83eb9f6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
9 changes: 4 additions & 5 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,12 +830,11 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
columnNames := utils.TableSchemaColumnNames(normalizedTableSchema)

flattenedCastsSQLArray := make([]string, 0, utils.TableSchemaColumns(normalizedTableSchema))
ret, err := utils.IterColumns1(normalizedTableSchema, func(columnName, genericColumnType string) (bool, error) {
err = utils.IterColumnsError(normalizedTableSchema, func(columnName, genericColumnType string) error {
qvKind := qvalue.QValueKind(genericColumnType)
sfType, err := qValueKindToSnowflakeType(qvKind)
if err != nil {
return true, fmt.Errorf("failed to convert column type %s to snowflake type: %w",
genericColumnType, err)
return fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err)
}

targetColumnName := SnowflakeIdentifierNormalize(columnName)
Expand Down Expand Up @@ -866,9 +865,9 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
toVariantColumnName, columnName, sfType, targetColumnName))
}
}
return false, nil
return nil
})
if ret {
if err != nil {
return 0, err
}
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ""), ",")
Expand Down
18 changes: 8 additions & 10 deletions flow/connectors/utils/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,21 @@ func IterColumns(schema *protos.TableSchema, iter func(k, v string)) {
}
}

func IterColumns1[T any](schema *protos.TableSchema, iter func(k, v string) (bool, T)) (bool, T) {
var zero T
func IterColumnsError(schema *protos.TableSchema, iter func(k, v string) error) error {
if schema.Columns != nil {
for k, v := range schema.Columns {
done, ret := iter(k, v)
if done {
return true, ret
err := iter(k, v)
if err != nil {
return err
}
}
return false, zero
} else {
for i, name := range schema.ColumnNames {
done, ret := iter(name, schema.ColumnTypes[i])
if done {
return true, ret
err := iter(name, schema.ColumnTypes[i])
if err != nil {
return err
}
}
return false, zero
}
return nil
}

0 comments on commit 83eb9f6

Please sign in to comment.