Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alias deduped cols to be shorter #1001

Merged
merged 3 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
Expand Down
55 changes: 35 additions & 20 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,42 @@ type mergeStmtGenerator struct {
unchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
// map for shorter columns
shortColumn map[string]string
}

// generateFlattenedCTE generates a flattened CTE.
func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR
// statement.
flattenedProjs := make([]string, 0, utils.TableSchemaColumns(m.normalizedTableSchema)+3)
utils.IterColumns(m.normalizedTableSchema, func(colName, colType string) {

for i, colName := range m.normalizedTableSchema.ColumnNames {
colType := m.normalizedTableSchema.ColumnTypes[i]
bqType := qValueKindToBigQueryType(colType)
// CAST doesn't work for FLOAT, so rewrite it to FLOAT64.
if bqType == bigquery.FloatFieldType {
bqType = "FLOAT64"
}
var castStmt string

shortCol := m.shortColumn[colName]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data,'$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, colName)
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, shortCol)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
colName, colName)
colName, shortCol)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data,'$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, colName)
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, shortCol)
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data,'$.%s')) AS %s) AS `%s`",
colName, bqType, colName)
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`",
colName, bqType, shortCol)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
// json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added,
Expand All @@ -75,11 +79,11 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// " AS int64))) AS %s",
// colName, colName)
default:
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data,'$.%s') AS %s) AS `%s`",
colName, bqType, colName)
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, shortCol)
}
flattenedProjs = append(flattenedProjs, castStmt)
})
}
flattenedProjs = append(
flattenedProjs,
"_peerdb_timestamp",
Expand All @@ -105,7 +109,13 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
) _peerdb_ranked
WHERE _peerdb_rank=1
) SELECT * FROM _dd`
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.normalizedTableSchema.PrimaryKeyColumns,

shortPkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, pkeyCol := range m.normalizedTableSchema.PrimaryKeyColumns {
shortPkeys = append(shortPkeys, m.shortColumn[pkeyCol])
}

pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys,
", '_peerdb_concat_', "))
return fmt.Sprintf(cte, pkeyColsStr)
}
Expand All @@ -115,14 +125,19 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
// comma separated list of column names
columnCount := utils.TableSchemaColumns(m.normalizedTableSchema)
backtickColNames := make([]string, 0, columnCount)
shortBacktickColNames := make([]string, 0, columnCount)
pureColNames := make([]string, 0, columnCount)
utils.IterColumns(m.normalizedTableSchema, func(colName, _ string) {
for i, colName := range m.normalizedTableSchema.ColumnNames {
shortCol := fmt.Sprintf("_c%d", i)
m.shortColumn[colName] = shortCol
backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName))
shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", shortCol))
pureColNames = append(pureColNames, colName)
})
csep := strings.Join(backtickColNames, ",")
insertColumnsSQL := csep + fmt.Sprintf(",`%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"
}
csep := strings.Join(backtickColNames, ", ")
shortCsep := strings.Join(shortBacktickColNames, ", ")
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
Expand All @@ -139,7 +154,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns {
pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s",
pkeyColName, pkeyColName))
pkeyColName, m.shortColumn[pkeyColName]))
}
// t.<pkey1> = d.<pkey1> AND t.<pkey2> = d.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")
Expand Down Expand Up @@ -193,7 +208,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, colName))
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, m.shortColumn[colName]))
}

// set the synced at column to the current timestamp
Expand Down
48 changes: 30 additions & 18 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,42 @@ import (
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
m := &mergeStmtGenerator{}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2, col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
" THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2,`col3`=_d.col3," +
" THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='' " +
"THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2," +
"`col3`=_d.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " +
"THEN UPDATE SET `col1`=_d.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND _rt!=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d.col1,`col3`=_d.col3," +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND _rt=2 " +
"AND _ut='col2' " +
"THEN UPDATE SET `col1`=_d.col1,`col3`=_d.col3," +
"THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND _rt!=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d.col1," +
"`col2`=_d.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND _rt=2 AND _ut='col3' " +
"THEN UPDATE SET `col1`=_d.col1," +
"`col2`=_d.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"THEN UPDATE SET `col1`=_d._c0," +
"`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
Expand All @@ -58,23 +64,29 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
}

func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
m := &mergeStmtGenerator{}
m := &mergeStmtGenerator{
shortColumn: map[string]string{
"col1": "_c0",
"col2": "_c1",
"col3": "_c2",
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{""}

expected := []string{
"WHEN MATCHED AND _rt!=2 " +
"AND _ut=''" +
"THEN UPDATE SET " +
"`col1`=_d.col1," +
"`col2`=_d.col2," +
"`col3`=_d.col3," +
"`col1`=_d._c0," +
"`col2`=_d._c1," +
"`col3`=_d._c2," +
"`synced_at`=CURRENT_TIMESTAMP," +
"`deleted`=FALSE",
"WHEN MATCHED AND" +
"_rt=2 AND _ut=''" +
"THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2, " +
"`col3`=_d.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1, " +
"`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols,
Expand Down
Loading