Skip to content

Commit

Permalink
split BQ merge statements and run them individually (#1038)
Browse files Browse the repository at this point in the history
For tables with a lot of columns in general and especially many toast
columns, some batches can trigger a MERGE statement so complex that
BigQuery is unable to process them, with errors like

`The query is too large. The maximum standard SQL query length is
1024.00K characters, including comments and white space characters.`
`Error 400: Resources exceeded during query execution: The query is too
complex., resourcesExceeded`

For now, the fix is splitting these complex MERGE statements into
smaller ones that act on different subsets of a raw table [partitioning
on the basis of `_peerdb_unchanged_toast_columns`]. This can lead to
tables need 10+ MERGE statements in a single batch, but this is a
compromise with our current design. Instead of sending MERGEs for all
tables at once, we do it per table now and update metadata at the end,
to avoid exceeding SQL query length limits.

---------

Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
heavycrystal and iskakaushik authored Jan 11, 2024
1 parent 05c32fa commit b5fb6d0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 8 deletions.
19 changes: 12 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

stmts := make([]string, 0, len(distinctTableNames)+1)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))
Expand All @@ -608,19 +607,25 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
shortColumn: map[string]string{},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
stmts = append(stmts, mergeStmt)
mergeStmts := mergeGen.generateMergeStmts()
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
_, err = q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)

query := strings.Join(stmts, "\n")
_, err = c.client.Query(query).Read(c.ctx)
_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s: %v", query, err)
return nil, fmt.Errorf("failed to execute update metadata statements %s: %v", updateMetadataStmt, err)
}

return &model.NormalizeResponse{
Expand Down
16 changes: 15 additions & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt() string {
func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) string {
// comma separated list of column names
columnCount := utils.TableSchemaColumns(m.normalizedTableSchema)
backtickColNames := make([]string, 0, columnCount)
Expand Down Expand Up @@ -180,6 +180,20 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

func (m *mergeStmtGenerator) generateMergeStmts() []string {
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
partitions := utils.ArrayChunks(m.unchangedToastColumns, batchSize)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
mergeStmts = append(mergeStmts, m.generateMergeStmt(partition))
}

return mergeStmts
}

/*
This function takes an array of unique unchanged toast column groups and an array of all column names,
and returns suitable UPDATE statements as part of a MERGE operation.
Expand Down
12 changes: 12 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@ func ArrayMinus(first []string, second []string) []string {
}
return result
}

func ArrayChunks[T any](slice []T, size int) [][]T {
var chunks [][]T

for size < len(slice) {
chunks = append(chunks, slice[:size])
slice = slice[size:]
}

// Add the last remaining values
return append(chunks, slice)
}

0 comments on commit b5fb6d0

Please sign in to comment.