From 56f15d1a614f622f284926144792051453a40a64 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 14 Feb 2024 19:34:02 +0530 Subject: [PATCH] BigQuery: Skip merge if no primary keys present (#1290) Merge statements currently are syntactically incorrect for tables without a primary key. This case is ideally not seen as tables without pkey/replica identity full are filtered out the UI level but in branches where we rely on signals for adding tables, this case can go unchecked and normalize flow fails. This PR skips a table during merge if there are no primary keys for it. --- flow/connectors/bigquery/bigquery.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4d63a97657..4b9fff6654 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -511,12 +511,16 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor if err != nil { return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) for _, tableName := range distinctTableNames { + normalizeTableSchema := req.TableNameSchemaMapping[tableName] + if len(normalizeTableSchema.PrimaryKeyColumns) == 0 { + c.logger.Warn(fmt.Sprintf("skipping merge for table %s as it has no primary key", tableName)) + continue + } unchangedToastColumns := tableNametoUnchangedToastCols[tableName] dstDatasetTable, _ := c.convertToDatasetTable(tableName) mergeGen := &mergeStmtGenerator{ @@ -527,7 +531,7 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor }, dstTableName: tableName, dstDatasetTable: dstDatasetTable, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], + normalizedTableSchema: normalizeTableSchema, syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, peerdbCols: &protos.PeerDBColumns{