From 17b2f825ef3d4dc47adcf218c67e20ff21c4f0a0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 6 Jan 2024 18:05:01 +0530 Subject: [PATCH 1/3] short aliases for deduped table columns --- flow/connectors/bigquery/bigquery.go | 1 + .../bigquery/merge_statement_generator.go | 42 +++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2d38cbf898..27d2870290 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -605,6 +605,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) SyncedAtColName: req.SyncedAtColName, SoftDelete: req.SoftDelete, }, + shortColumn: map[string]string{}, } // normalize anything between last normalized batch id to last sync batchid mergeStmt := mergeGen.generateMergeStmt() diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 0770b7ee28..357270a0ea 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -27,6 +27,8 @@ type mergeStmtGenerator struct { unchangedToastColumns []string // _PEERDB_IS_DELETED and _SYNCED_AT columns peerdbCols *protos.PeerDBColumns + // map for shorter columns + shortColumn map[string]string } // generateFlattenedCTE generates a flattened CTE. @@ -34,19 +36,22 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR // statement. flattenedProjs := make([]string, 0, utils.TableSchemaColumns(m.normalizedTableSchema)+3) - utils.IterColumns(m.normalizedTableSchema, func(colName, colType string) { + i := 0 + for colName, colType := range m.normalizedTableSchema.Columns { bqType := qValueKindToBigQueryType(colType) // CAST doesn't work for FLOAT, so rewrite it to FLOAT64. if bqType == bigquery.FloatFieldType { bqType = "FLOAT64" } var castStmt string - + i += 1 + shortCol := fmt.Sprintf("_c%d", i) + m.shortColumn[colName] = shortCol switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: // if the type is JSON, then just extract JSON - castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data,'$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", - colName, bqType, colName) + castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", + colName, bqType, shortCol) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`", @@ -54,11 +59,11 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ - "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data,'$.%s') AS ARRAY)) AS element) AS `%s`", - bqType, colName, colName) + "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", + bqType, colName, shortCol) case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: - castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data,'$.%s')) AS %s) AS `%s`", - colName, bqType, colName) + castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`", + colName, bqType, shortCol) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} // json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added, @@ -75,11 +80,11 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // " AS int64))) AS %s", // colName, colName) default: - castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data,'$.%s') AS %s) AS `%s`", - colName, bqType, colName) + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", + colName, bqType, shortCol) } flattenedProjs = append(flattenedProjs, castStmt) - }) + } flattenedProjs = append( flattenedProjs, "_peerdb_timestamp", @@ -115,14 +120,17 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { // comma separated list of column names columnCount := utils.TableSchemaColumns(m.normalizedTableSchema) backtickColNames := make([]string, 0, columnCount) + shortBacktickColNames := make([]string, 0, columnCount) pureColNames := make([]string, 0, columnCount) - utils.IterColumns(m.normalizedTableSchema, func(colName, _ string) { + for colName := range m.normalizedTableSchema.Columns { backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName)) + shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", m.shortColumn[colName])) pureColNames = append(pureColNames, colName) - }) - csep := strings.Join(backtickColNames, ",") - insertColumnsSQL := csep + fmt.Sprintf(",`%s`", m.peerdbCols.SyncedAtColName) - insertValuesSQL := csep + ",CURRENT_TIMESTAMP" + } + csep := strings.Join(backtickColNames, ", ") + shortCsep := strings.Join(shortBacktickColNames, ", ") + insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName) + insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP" updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.unchangedToastColumns, m.peerdbCols) @@ -193,7 +201,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements( otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) for _, colName := range otherCols { - tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, colName)) + tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, m.shortColumn[colName])) } // set the synced at column to the current timestamp From c67e5e1b8f11f8faafce9889f64d3e926230ed0f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 6 Jan 2024 18:43:24 +0530 Subject: [PATCH 2/3] fix-aliasing --- .../bigquery/merge_statement_generator.go | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 357270a0ea..47dd2b3936 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -36,17 +36,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR // statement. flattenedProjs := make([]string, 0, utils.TableSchemaColumns(m.normalizedTableSchema)+3) - i := 0 - for colName, colType := range m.normalizedTableSchema.Columns { + + for i, colName := range m.normalizedTableSchema.ColumnNames { + colType := m.normalizedTableSchema.ColumnTypes[i] bqType := qValueKindToBigQueryType(colType) // CAST doesn't work for FLOAT, so rewrite it to FLOAT64. if bqType == bigquery.FloatFieldType { bqType = "FLOAT64" } var castStmt string - i += 1 - shortCol := fmt.Sprintf("_c%d", i) - m.shortColumn[colName] = shortCol + shortCol := m.shortColumn[colName] switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: // if the type is JSON, then just extract JSON @@ -55,7 +54,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`", - colName, colName) + colName, shortCol) case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ @@ -110,7 +109,13 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { ) _peerdb_ranked WHERE _peerdb_rank=1 ) SELECT * FROM _dd` - pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.normalizedTableSchema.PrimaryKeyColumns, + + shortPkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) + for _, pkeyCol := range m.normalizedTableSchema.PrimaryKeyColumns { + shortPkeys = append(shortPkeys, m.shortColumn[pkeyCol]) + } + + pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys, ", '_peerdb_concat_', ")) return fmt.Sprintf(cte, pkeyColsStr) } @@ -122,9 +127,11 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { backtickColNames := make([]string, 0, columnCount) shortBacktickColNames := make([]string, 0, columnCount) pureColNames := make([]string, 0, columnCount) - for colName := range m.normalizedTableSchema.Columns { + for i, colName := range m.normalizedTableSchema.ColumnNames { + shortCol := fmt.Sprintf("_c%d", i) + m.shortColumn[colName] = shortCol backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName)) - shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", m.shortColumn[colName])) + shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", shortCol)) pureColNames = append(pureColNames, colName) } csep := strings.Join(backtickColNames, ", ") @@ -147,7 +154,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns { pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s", - pkeyColName, pkeyColName)) + pkeyColName, m.shortColumn[pkeyColName])) } // t. = d. AND t. = d. ... pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") From 89af8f934424fe099c3fed2bc31e1a785e710493 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 6 Jan 2024 18:57:38 +0530 Subject: [PATCH 3/3] fix merge tests --- .../bigquery/merge_stmt_generator_test.go | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 005554bd4b..0818caed56 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -9,36 +9,42 @@ import ( ) func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { - m := &mergeStmtGenerator{} + m := &mergeStmtGenerator{ + shortColumn: map[string]string{ + "col1": "_c0", + "col2": "_c1", + "col3": "_c2", + }, + } allCols := []string{"col1", "col2", "col3"} unchangedToastCols := []string{"", "col2, col3", "col2", "col3"} expected := []string{ "WHEN MATCHED AND _rt!=2 AND _ut=''" + - " THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2,`col3`=_d.col3," + + " THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1,`col3`=_d._c2," + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE", "WHEN MATCHED AND _rt=2 " + "AND _ut='' " + - "THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2," + - "`col3`=_d.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1," + + "`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", "WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " + - "THEN UPDATE SET `col1`=_d.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", + "THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", "WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " + - "THEN UPDATE SET `col1`=_d.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "THEN UPDATE SET `col1`=_d._c0,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", "WHEN MATCHED AND _rt!=2 " + "AND _ut='col2' " + - "THEN UPDATE SET `col1`=_d.col1,`col3`=_d.col3," + + "THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE", "WHEN MATCHED AND _rt=2 " + "AND _ut='col2' " + - "THEN UPDATE SET `col1`=_d.col1,`col3`=_d.col3," + + "THEN UPDATE SET `col1`=_d._c0,`col3`=_d._c2," + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ", "WHEN MATCHED AND _rt!=2 AND _ut='col3' " + - "THEN UPDATE SET `col1`=_d.col1," + - "`col2`=_d.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", + "THEN UPDATE SET `col1`=_d._c0," + + "`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", "WHEN MATCHED AND _rt=2 AND _ut='col3' " + - "THEN UPDATE SET `col1`=_d.col1," + - "`col2`=_d.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "THEN UPDATE SET `col1`=_d._c0," + + "`col2`=_d._c1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{ @@ -58,7 +64,13 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { } func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { - m := &mergeStmtGenerator{} + m := &mergeStmtGenerator{ + shortColumn: map[string]string{ + "col1": "_c0", + "col2": "_c1", + "col3": "_c2", + }, + } allCols := []string{"col1", "col2", "col3"} unchangedToastCols := []string{""} @@ -66,15 +78,15 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { "WHEN MATCHED AND _rt!=2 " + "AND _ut=''" + "THEN UPDATE SET " + - "`col1`=_d.col1," + - "`col2`=_d.col2," + - "`col3`=_d.col3," + + "`col1`=_d._c0," + + "`col2`=_d._c1," + + "`col3`=_d._c2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", "WHEN MATCHED AND" + "_rt=2 AND _ut=''" + - "THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2, " + - "`col3`=_d.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "THEN UPDATE SET `col1`=_d._c0,`col2`=_d._c1, " + + "`col3`=_d._c2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } result := m.generateUpdateStatements(allCols, unchangedToastCols,