diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 68c9755ad8..2cdca4859e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -615,7 +615,9 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) // if there are duplicates in the unchangedToastColumns, error out here. if len(unchangedToastColumns) != len(utils.Unique(unchangedToastColumns)) { - return nil, fmt.Errorf("duplicates found in unchangedToastColumns for table %s", tableName) + dups := utils.CountMoreThan(unchangedToastColumns, 1) + c.logger.Error(fmt.Sprintf("duplicates found in unchangedToastColumns for table %s: `%v`", tableName, dups)) + return nil, fmt.Errorf("duplicates found in unchangedToastColumns for table %s: `%v`", tableName, dups) } // normalize anything between last normalized batch id to last sync batchid diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index c6745d8643..3d1a06db2a 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -29,6 +29,27 @@ func Unique[T comparable](slice []T) []T { return list } +func Count[T comparable](slice []T) map[T]int { + counts := make(map[T]int) + for _, entry := range slice { + counts[entry]++ + } + return counts +} + +func CountMoreThan[T comparable](slice []T, n int) map[T]int { + counts := make(map[T]int) + for _, entry := range slice { + counts[entry]++ + } + for k, v := range counts { + if v <= n { + delete(counts, k) + } + } + return counts +} + func ArrayChunks[T any](slice []T, size int) [][]T { var partitions [][]T