diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4d63a97657..4b9fff6654 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -511,12 +511,16 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor if err != nil { return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) for _, tableName := range distinctTableNames { + normalizeTableSchema := req.TableNameSchemaMapping[tableName] + if len(normalizeTableSchema.PrimaryKeyColumns) == 0 { + c.logger.Warn(fmt.Sprintf("skipping merge for table %s as it has no primary key", tableName)) + continue + } unchangedToastColumns := tableNametoUnchangedToastCols[tableName] dstDatasetTable, _ := c.convertToDatasetTable(tableName) mergeGen := &mergeStmtGenerator{ @@ -527,7 +531,7 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor }, dstTableName: tableName, dstDatasetTable: dstDatasetTable, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], + normalizedTableSchema: normalizeTableSchema, syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, peerdbCols: &protos.PeerDBColumns{