Skip to content

Commit

Permalink
run merge statements outside of txn
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 6, 2024
1 parent 081eb40 commit 8dffe83
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

stmts := make([]string, 0, len(distinctTableNames)+1)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))
Expand All @@ -608,18 +607,20 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
stmts = append(stmts, mergeStmt)

// run the merge statement
_, err = c.client.Query(mergeStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)

query := strings.Join(stmts, "\n")
_, err = c.client.Query(query).Read(c.ctx)
_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s: %v", query, err)
return nil, fmt.Errorf("failed to update metadata table: %v", err)
}

return &model.NormalizeResponse{
Expand Down

0 comments on commit 8dffe83

Please sign in to comment.