diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2d38cbf898..ff8c8902e0 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -582,7 +582,6 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - stmts := make([]string, 0, len(distinctTableNames)+1) // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) @@ -608,18 +607,20 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } // normalize anything between last normalized batch id to last sync batchid mergeStmt := mergeGen.generateMergeStmt() - stmts = append(stmts, mergeStmt) + + // run the merge statement + _, err = c.client.Query(mergeStmt).Read(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + } } // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName) - stmts = append(stmts, updateMetadataStmt) - - query := strings.Join(stmts, "\n") - _, err = c.client.Query(query).Read(c.ctx) + _, err = c.client.Query(updateMetadataStmt).Read(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements %s: %v", query, err) + return nil, fmt.Errorf("failed to update metadata table: %v", err) } return &model.NormalizeResponse{