Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into wait-for
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
2 parents aea6c65 + 320f6f9 commit 9a3d50d
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 81 deletions.
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
Expand Down
108 changes: 61 additions & 47 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,42 @@ type mergeStmtGenerator struct {
unchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
// map for shorter columns
shortColumn map[string]string
}

// generateFlattenedCTE generates a flattened CTE.
func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR
// statement.
flattenedProjs := make([]string, 0, utils.TableSchemaColumns(m.normalizedTableSchema)+3)
utils.IterColumns(m.normalizedTableSchema, func(colName, colType string) {

for i, colName := range m.normalizedTableSchema.ColumnNames {
colType := m.normalizedTableSchema.ColumnTypes[i]
bqType := qValueKindToBigQueryType(colType)
// CAST doesn't work for FLOAT, so rewrite it to FLOAT64.
if bqType == bigquery.FloatFieldType {
bqType = "FLOAT64"
}
var castStmt string

shortCol := m.shortColumn[colName]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, colName)
colName, bqType, shortCol)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`",
colName, colName)
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
colName, shortCol)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, colName)
bqType, colName, shortCol)
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`",
colName, bqType, colName)
colName, bqType, shortCol)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
// json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added,
Expand All @@ -76,37 +80,42 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// colName, colName)
default:
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, colName)
colName, bqType, shortCol)
}
flattenedProjs = append(flattenedProjs, castStmt)
})
}
flattenedProjs = append(
flattenedProjs,
"_peerdb_timestamp",
"_peerdb_record_type",
"_peerdb_unchanged_toast_columns",
"_peerdb_record_type AS _rt",
"_peerdb_unchanged_toast_columns AS _ut",
)

// normalize anything between last normalized batch id to last sync batchid
return fmt.Sprintf(`WITH _peerdb_flattened AS
(SELECT %s FROM %s WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d and
return fmt.Sprintf(`WITH _f AS
(SELECT %s FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d AND
_peerdb_destination_table_name='%s')`,
strings.Join(flattenedProjs, ", "), m.rawDatasetTable.string(), m.normalizeBatchID,
strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.normalizeBatchID,
m.syncBatchID, m.dstTableName)
}

// generateDeDupedCTE generates a de-duped CTE.
func (m *mergeStmtGenerator) generateDeDupedCTE() string {
const cte = `_peerdb_de_duplicated_data_res AS (
SELECT _peerdb_ranked.*
FROM (
SELECT RANK() OVER (
const cte = `_dd AS (
SELECT _peerdb_ranked.* FROM(
SELECT RANK() OVER(
PARTITION BY %s ORDER BY _peerdb_timestamp DESC
) as _peerdb_rank, * FROM _peerdb_flattened
) AS _peerdb_rank,* FROM _f
) _peerdb_ranked
WHERE _peerdb_rank = 1
) SELECT * FROM _peerdb_de_duplicated_data_res`
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.normalizedTableSchema.PrimaryKeyColumns,
WHERE _peerdb_rank=1
) SELECT * FROM _dd`

shortPkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, pkeyCol := range m.normalizedTableSchema.PrimaryKeyColumns {
shortPkeys = append(shortPkeys, m.shortColumn[pkeyCol])
}

pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys,
", '_peerdb_concat_', "))
return fmt.Sprintf(cte, pkeyColsStr)
}
Expand All @@ -116,52 +125,57 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
// comma separated list of column names
columnCount := utils.TableSchemaColumns(m.normalizedTableSchema)
backtickColNames := make([]string, 0, columnCount)
shortBacktickColNames := make([]string, 0, columnCount)
pureColNames := make([]string, 0, columnCount)
utils.IterColumns(m.normalizedTableSchema, func(colName, _ string) {
for i, colName := range m.normalizedTableSchema.ColumnNames {
shortCol := fmt.Sprintf("_c%d", i)
m.shortColumn[colName] = shortCol
backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName))
shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", shortCol))
pureColNames = append(pureColNames, colName)
})
}
csep := strings.Join(backtickColNames, ", ")
shortCsep := strings.Join(shortBacktickColNames, ", ")
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)",
fmt.Sprintf("WHEN NOT MATCHED AND _d._rt=2 THEN INSERT (%s) VALUES(%s)",
softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL))
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns {
pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_peerdb_target.%s = _peerdb_deduped.%s",
pkeyColName, pkeyColName))
pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s",
pkeyColName, m.shortColumn[pkeyColName]))
}
// _peerdb_target.<pkey1> = _peerdb_deduped.<pkey1> AND _peerdb_target.<pkey2> = _peerdb_deduped.<pkey2> ...
// t.<pkey1> = d.<pkey1> AND t.<pkey2> = d.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if m.peerdbCols.SoftDelete {
colName := m.peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
deletePart = fmt.Sprintf("UPDATE SET %s=TRUE", colName)
if m.peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart = fmt.Sprintf("%s,%s=CURRENT_TIMESTAMP",
deletePart, m.peerdbCols.SyncedAtColName)
}
}

return fmt.Sprintf(`
MERGE %s _peerdb_target USING (%s,%s) _peerdb_deduped
MERGE %s _t USING(%s,%s) _d
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
WHEN NOT MATCHED AND _d._rt!=2 THEN
INSERT (%s) VALUES(%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
WHEN MATCHED AND _d._rt=2 THEN
%s;
`, m.dstDatasetTable.string(), m.generateFlattenedCTE(), m.generateDeDupedCTE(),
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
Expand Down Expand Up @@ -194,36 +208,36 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, m.shortColumn[colName]))
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = CURRENT_TIMESTAMP",
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = FALSE",
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=FALSE",
peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ", ")
ssep := strings.Join(tmpArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
_rt!=2 AND _ut='%s'
THEN UPDATE SET %s`, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s` = TRUE", peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ", ")
fmt.Sprintf("`%s`=TRUE", peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
_rt=2 AND _ut='%s'
THEN UPDATE SET %s`, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
Expand Down
78 changes: 45 additions & 33 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,42 @@ import (
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
m := &mergeStmtGenerator{}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2, col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns=''" +
" THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2,`col3`=_peerdb_deduped.col3," +
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) " +
"AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2," +
"`col3`=_peerdb_deduped.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col2,col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col2,col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) " +
"AND _peerdb_unchanged_toast_columns='col2' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," +
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND(_peerdb_deduped._peerdb_record_type=2) " +
"AND _peerdb_unchanged_toast_columns='col2' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," +
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1," +
"`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1," +
"`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
Expand All @@ -58,23 +64,29 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
}

func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
m := &mergeStmtGenerator{}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}

expected := []string{
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2) " +
"AND _peerdb_unchanged_toast_columns=''" +
"WHEN MATCHED AND _rt!=2 " +
"AND _ut=''" +
"THEN UPDATE SET " +
"`col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3," +
" `synced_at`=CURRENT_TIMESTAMP," +
"`col1`=_d._c0," +
"`col2`=_d._c1," +
"`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP," +
"`deleted`=FALSE",
"WHEN MATCHED AND" +
"(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns=''" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1, `col2` = _peerdb_deduped.col2, " +
"`col3` = _peerdb_deduped.col3, `synced_at` = CURRENT_TIMESTAMP, `deleted` = TRUE",
"_rt=2 AND _ut=''" +
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1, " +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols,
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/utils/map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"slices"
"strings"

"golang.org/x/exp/maps"
Expand All @@ -11,5 +12,7 @@ func KeysToString(m map[string]struct{}) string {
return ""
}

return strings.Join(maps.Keys(m), ",")
sm := maps.Keys(m)
slices.Sort[[]string](sm)
return strings.Join(sm, ",")
}

0 comments on commit 9a3d50d

Please sign in to comment.