Skip to content

Commit

Permalink
error logs dups as well
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Feb 10, 2024
1 parent 899ad6c commit fec56ab
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
4 changes: 3 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,9 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)

// if there are duplicates in the unchangedToastColumns, error out here.
if len(unchangedToastColumns) != len(utils.Unique(unchangedToastColumns)) {
return nil, fmt.Errorf("duplicates found in unchangedToastColumns for table %s", tableName)
dups := utils.CountMoreThan(unchangedToastColumns, 1)
c.logger.Error(fmt.Sprintf("duplicates found in unchangedToastColumns for table %s: `%v`", tableName, dups))
return nil, fmt.Errorf("duplicates found in unchangedToastColumns for table %s: `%v`", tableName, dups)
}

// normalize anything between last normalized batch id to last sync batchid
Expand Down
21 changes: 21 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@ func Unique[T comparable](slice []T) []T {
return list
}

func Count[T comparable](slice []T) map[T]int {
counts := make(map[T]int)
for _, entry := range slice {
counts[entry]++
}
return counts
}

func CountMoreThan[T comparable](slice []T, n int) map[T]int {
counts := make(map[T]int)
for _, entry := range slice {
counts[entry]++
}
for k, v := range counts {
if v <= n {
delete(counts, k)
}
}
return counts
}

func ArrayChunks[T any](slice []T, size int) [][]T {
var partitions [][]T

Expand Down

0 comments on commit fec56ab

Please sign in to comment.