Skip to content

Commit

Permalink
error out if duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Feb 9, 2024
1 parent cf146bd commit c588b71
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,12 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
shortColumn: map[string]string{},
}

// if there are duplicates in the unchangedToastColumns, error out here.
if len(unchangedToastColumns) != len(utils.Unique(unchangedToastColumns)) {
return nil, fmt.Errorf("duplicates found in unchangedToastColumns for table %s", tableName)
}

// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts(unchangedToastColumns)
for i, mergeStmt := range mergeStmts {
Expand Down
12 changes: 12 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ func ArrayMinus(first []string, second []string) []string {
return result
}

func Unique[T comparable](slice []T) []T {
keys := make(map[T]bool)
list := []T{}
for _, entry := range slice {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}

func ArrayChunks[T any](slice []T, size int) [][]T {
var partitions [][]T

Expand Down

0 comments on commit c588b71

Please sign in to comment.