Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split BQ merge statements and run them individually #1038

Merged
merged 4 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

stmts := make([]string, 0, len(distinctTableNames)+1)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))
Expand All @@ -608,19 +607,25 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
shortColumn: map[string]string{},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
stmts = append(stmts, mergeStmt)
mergeStmts := mergeGen.generateMergeStmts()
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
_, err = q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)

query := strings.Join(stmts, "\n")
_, err = c.client.Query(query).Read(c.ctx)
_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s: %v", query, err)
return nil, fmt.Errorf("failed to execute update metadata statements %s: %v", updateMetadataStmt, err)
}

return &model.NormalizeResponse{
Expand Down
18 changes: 16 additions & 2 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt() string {
func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) string {
// comma separated list of column names
columnCount := utils.TableSchemaColumns(m.normalizedTableSchema)
backtickColNames := make([]string, 0, columnCount)
Expand All @@ -140,7 +140,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
unchangedToastColumns, m.peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"
Expand Down Expand Up @@ -181,6 +181,20 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

func (m *mergeStmtGenerator) generateMergeStmts() []string {
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
partitions := utils.ArrayPartition(m.unchangedToastColumns, batchSize)
serprex marked this conversation as resolved.
Show resolved Hide resolved

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
mergeStmts = append(mergeStmts, m.generateMergeStmt(partition))
}

return mergeStmts
}

/*
This function takes an array of unique unchanged toast column groups and an array of all column names,
and returns suitable UPDATE statements as part of a MERGE operation.
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,16 @@ func ArrayMinus(first []string, second []string) []string {
}
return result
}

func ArrayPartition[T any](slice []T, size int) [][]T {
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
var partitions [][]T
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved

for size < len(slice) {
slice, partitions = slice[size:], append(partitions, slice[0:size])
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
}

// Add the last remaining values
partitions = append(partitions, slice)

return partitions
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
}
Loading