From 51f42568eeaba573a93ad264f07fc9489958a044 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 14 Feb 2024 19:34:02 +0530 Subject: [PATCH] BigQuery: Skip merge if no primary keys present (#1290) Merge statements currently are syntactically incorrect for tables without a primary key. This case is ideally not seen as tables without pkey/replica identity full are filtered out the UI level but in branches where we rely on signals for adding tables, this case can go unchecked and normalize flow fails. This PR skips a table during merge if there are no primary keys for it. --- flow/connectors/bigquery/bigquery.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 6c07b319c0..36c2675839 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -629,12 +629,16 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) if err != nil { return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) for _, tableName := range distinctTableNames { + normalizeTableSchema := req.TableNameSchemaMapping[tableName] + if len(normalizeTableSchema.PrimaryKeyColumns) == 0 { + c.logger.Warn(fmt.Sprintf("skipping merge for table %s as it has no primary key", tableName)) + continue + } unchangedToastColumns := tableNametoUnchangedToastCols[tableName] dstDatasetTable, _ := c.convertToDatasetTable(tableName) mergeGen := &mergeStmtGenerator{ @@ -645,7 +649,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) }, dstTableName: tableName, dstDatasetTable: dstDatasetTable, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], + normalizedTableSchema: normalizeTableSchema, syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, peerdbCols: &protos.PeerDBColumns{