Skip to content

Commit

Permalink
fix a mistake in generating merge statements (#1066)
Browse files Browse the repository at this point in the history
partitioning based on unchanged toast columns wasn't being done right
  • Loading branch information
iskakaushik authored Jan 12, 2024
1 parent 53da969 commit 20940fd
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 19 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: &datasetTable{
Expand All @@ -598,7 +599,6 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
normalizedTableSchema: c.tableNameSchemaMapping[tableName],
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
Expand All @@ -607,7 +607,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
shortColumn: map[string]string{},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
mergeStmts := mergeGen.generateMergeStmts(unchangedToastColumns)
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
Expand Down
16 changes: 7 additions & 9 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type mergeStmtGenerator struct {
normalizeBatchID int64
// the schema of the table to merge into
normalizedTableSchema *protos.TableSchema
// array of toast column combinations that are unchanged
unchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
// map for shorter columns
Expand Down Expand Up @@ -139,7 +137,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames)
updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, unchangedToastColumns)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"
Expand Down Expand Up @@ -180,11 +178,11 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

func (m *mergeStmtGenerator) generateMergeStmts() []string {
func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string {
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
partitions := utils.ArrayChunks(m.unchangedToastColumns, batchSize)
partitions := utils.ArrayChunks(allUnchangedToastColas, batchSize)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
Expand All @@ -209,17 +207,17 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string {
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastColumns []string) []string {
handleSoftDelete := m.peerdbCols.SoftDelete && (m.peerdbCols.SoftDeleteColName != "")
// weird way of doing it but avoids prealloc lint
updateStmts := make([]string, 0, func() int {
if handleSoftDelete {
return 2 * len(m.unchangedToastColumns)
return 2 * len(unchangedToastColumns)
}
return len(m.unchangedToastColumns)
return len(unchangedToastColumns)
}())

for _, cols := range m.unchangedToastColumns {
for _, cols := range unchangedToastColumns {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
Expand Down
12 changes: 4 additions & 8 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ func TestGenerateUpdateStatement(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
Expand All @@ -35,7 +34,7 @@ func TestGenerateUpdateStatement(t *testing.T) {
"`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
Expand All @@ -51,7 +50,6 @@ func TestGenerateUpdateStatement_WithSoftDelete(t *testing.T) {
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}
m := &mergeStmtGenerator{
unchangedToastColumns: unchangedToastCols,
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
Expand Down Expand Up @@ -79,7 +77,7 @@ func TestGenerateUpdateStatement_WithSoftDelete(t *testing.T) {
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
Expand All @@ -100,7 +98,6 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: false,
SoftDeleteColName: "deleted",
Expand All @@ -123,7 +120,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP",
}

result := m.generateUpdateStatements(allCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
Expand All @@ -144,7 +141,6 @@ func TestGenerateUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *testing.
"col2": "_c1",
"col3": "_c2",
},
unchangedToastColumns: unchangedToastCols,
peerdbCols: &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
Expand Down Expand Up @@ -180,7 +176,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *testing.
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols)

for i := range expected {
expected[i] = utils.RemoveSpacesTabsNewlines(expected[i])
Expand Down

0 comments on commit 20940fd

Please sign in to comment.