Skip to content

Commit

Permalink
split by unchanged toast cols
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 6, 2024
1 parent 4f3eccc commit 5f035af
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
14 changes: 10 additions & 4 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,13 +606,19 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
mergeStmts := mergeGen.generateMergeStmts()

// run the merge statement
_, err = c.client.Query(mergeStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
_, err = q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
Expand Down
38 changes: 34 additions & 4 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,35 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
return fmt.Sprintf(cte, pkeyColsStr)
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt() string {
// TODO move to utils.
func partition[T any](slice []T, size int) [][]T {
var partitions [][]T

for size < len(slice) {
slice, partitions = slice[size:], append(partitions, slice[0:size])
}

// Add the last remaining values
partitions = append(partitions, slice)

return partitions
}

// generateMergeStmts generates merge statements, partitioned by unchanged toast columns.
func (m *mergeStmtGenerator) generateMergeStmts() []string {
// partition unchanged toast columns into batches of 8
const batchSize = 8
partitions := partition(m.unchangedToastColumns, batchSize)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
mergeStmts = append(mergeStmts, m.generateMergeStmt(partition))
}

return mergeStmts
}

func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) string {
// comma separated list of column names
columnCount := utils.TableSchemaColumns(m.normalizedTableSchema)
backtickColNames := make([]string, 0, columnCount)
Expand All @@ -139,8 +166,11 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
updateStatementsforToastCols := m.generateUpdateStatements(
pureColNames,
unchangedToastColumns,
m.peerdbCols,
)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"
Expand Down

0 comments on commit 5f035af

Please sign in to comment.