diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 6c07b319c0..36c2675839 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -629,12 +629,16 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) if err != nil { return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) for _, tableName := range distinctTableNames { + normalizeTableSchema := req.TableNameSchemaMapping[tableName] + if len(normalizeTableSchema.PrimaryKeyColumns) == 0 { + c.logger.Warn(fmt.Sprintf("skipping merge for table %s as it has no primary key", tableName)) + continue + } unchangedToastColumns := tableNametoUnchangedToastCols[tableName] dstDatasetTable, _ := c.convertToDatasetTable(tableName) mergeGen := &mergeStmtGenerator{ @@ -645,7 +649,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) }, dstTableName: tableName, dstDatasetTable: dstDatasetTable, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], + normalizedTableSchema: normalizeTableSchema, syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, peerdbCols: &protos.PeerDBColumns{