Skip to content

Commit

Permalink
Unchanged toast split (#1004)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jan 6, 2024
1 parent 4f3eccc commit 15f3907
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/customer-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
docker-build:
strategy:
matrix:
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runner: [ubuntu-latest]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand Down
14 changes: 10 additions & 4 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,13 +606,19 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
mergeStmts := mergeGen.generateMergeStmts()

// run the merge statement
_, err = c.client.Query(mergeStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
_, err = q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
Expand Down
25 changes: 21 additions & 4 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,22 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
return fmt.Sprintf(cte, pkeyColsStr)
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt() string {
// generateMergeStmts generates merge statements, partitioned by unchanged toast columns.
func (m *mergeStmtGenerator) generateMergeStmts() []string {
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
partitions := utils.ArrayPartition(m.unchangedToastColumns, batchSize)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
mergeStmts = append(mergeStmts, m.generateMergeStmt(partition))
}

return mergeStmts
}

func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) string {
// comma separated list of column names
columnCount := utils.TableSchemaColumns(m.normalizedTableSchema)
backtickColNames := make([]string, 0, columnCount)
Expand All @@ -139,8 +153,11 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
updateStatementsforToastCols := m.generateUpdateStatements(
pureColNames,
unchangedToastColumns,
m.peerdbCols,
)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,16 @@ func ArrayMinus(first []string, second []string) []string {
}
return result
}

func ArrayPartition[T any](slice []T, size int) [][]T {
var partitions [][]T

for size < len(slice) {
slice, partitions = slice[size:], append(partitions, slice[0:size])
}

// Add the last remaining values
partitions = append(partitions, slice)

return partitions
}

0 comments on commit 15f3907

Please sign in to comment.