From 05af1c219a50f4c1139f0ef205d93712edbb5266 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 6 Jan 2024 16:56:10 +0530 Subject: [PATCH] made CTEs and some columns use alias, eliminate whitespace --- .../bigquery/merge_statement_generator.go | 79 +++++++++---------- .../bigquery/merge_stmt_generator_test.go | 62 +++++++-------- 2 files changed, 70 insertions(+), 71 deletions(-) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index b2bc5144f1..0770b7ee28 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -45,19 +45,19 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: // if the type is JSON, then just extract JSON - castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data,'$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", colName, bqType, colName) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: - castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`", + castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`", colName, colName) case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ - "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", + "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data,'$.%s') AS ARRAY)) AS element) AS `%s`", bqType, colName, colName) case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: - castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data,'$.%s')) AS %s) AS `%s`", colName, bqType, colName) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} @@ -75,7 +75,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // " AS int64))) AS %s", // colName, colName) default: - castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data,'$.%s') AS %s) AS `%s`", colName, bqType, colName) } flattenedProjs = append(flattenedProjs, castStmt) @@ -83,29 +83,28 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { flattenedProjs = append( flattenedProjs, "_peerdb_timestamp", - "_peerdb_record_type", - "_peerdb_unchanged_toast_columns", + "_peerdb_record_type AS _rt", + "_peerdb_unchanged_toast_columns AS _ut", ) // normalize anything between last normalized batch id to last sync batchid - return fmt.Sprintf(`WITH _peerdb_flattened AS - (SELECT %s FROM %s WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d and + return fmt.Sprintf(`WITH _f AS + (SELECT %s FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d AND _peerdb_destination_table_name='%s')`, - strings.Join(flattenedProjs, ", "), m.rawDatasetTable.string(), m.normalizeBatchID, + strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.normalizeBatchID, m.syncBatchID, m.dstTableName) } // generateDeDupedCTE generates a de-duped CTE. func (m *mergeStmtGenerator) generateDeDupedCTE() string { - const cte = `_peerdb_de_duplicated_data_res AS ( - SELECT _peerdb_ranked.* - FROM ( - SELECT RANK() OVER ( + const cte = `_dd AS ( + SELECT _peerdb_ranked.* FROM( + SELECT RANK() OVER( PARTITION BY %s ORDER BY _peerdb_timestamp DESC - ) as _peerdb_rank, * FROM _peerdb_flattened + ) AS _peerdb_rank,* FROM _f ) _peerdb_ranked - WHERE _peerdb_rank = 1 - ) SELECT * FROM _peerdb_de_duplicated_data_res` + WHERE _peerdb_rank=1 + ) SELECT * FROM _dd` pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.normalizedTableSchema.PrimaryKeyColumns, ", '_peerdb_concat_', ")) return fmt.Sprintf(cte, pkeyColsStr) @@ -121,47 +120,47 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName)) pureColNames = append(pureColNames, colName) }) - csep := strings.Join(backtickColNames, ", ") - insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName) + csep := strings.Join(backtickColNames, ",") + insertColumnsSQL := csep + fmt.Sprintf(",`%s`", m.peerdbCols.SyncedAtColName) insertValuesSQL := csep + ",CURRENT_TIMESTAMP" updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.unchangedToastColumns, m.peerdbCols) if m.peerdbCols.SoftDelete { - softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", m.peerdbCols.SoftDeleteColName) - softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE" + softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(",`%s`", m.peerdbCols.SoftDeleteColName) + softDeleteInsertValuesSQL := insertValuesSQL + ",TRUE" updateStatementsforToastCols = append(updateStatementsforToastCols, - fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)", + fmt.Sprintf("WHEN NOT MATCHED AND _d._rt=2 THEN INSERT (%s) VALUES(%s)", softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL)) } updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns { - pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_peerdb_target.%s = _peerdb_deduped.%s", + pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s", pkeyColName, pkeyColName)) } - // _peerdb_target. = _peerdb_deduped. AND _peerdb_target. = _peerdb_deduped. ... + // t. = d. AND t. = d. ... pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") deletePart := "DELETE" if m.peerdbCols.SoftDelete { colName := m.peerdbCols.SoftDeleteColName - deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName) + deletePart = fmt.Sprintf("UPDATE SET %s=TRUE", colName) if m.peerdbCols.SyncedAtColName != "" { - deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP", + deletePart = fmt.Sprintf("%s,%s=CURRENT_TIMESTAMP", deletePart, m.peerdbCols.SyncedAtColName) } } return fmt.Sprintf(` - MERGE %s _peerdb_target USING (%s,%s) _peerdb_deduped + MERGE %s _t USING(%s,%s) _d ON %s - WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN - INSERT (%s) VALUES (%s) + WHEN NOT MATCHED AND _d._rt!=2 THEN + INSERT (%s) VALUES(%s) %s - WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN + WHEN MATCHED AND _d._rt=2 THEN %s; `, m.dstDatasetTable.string(), m.generateFlattenedCTE(), m.generateDeDupedCTE(), pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) @@ -194,24 +193,24 @@ func (m *mergeStmtGenerator) generateUpdateStatements( otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) for _, colName := range otherCols { - tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName)) + tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, colName)) } // set the synced at column to the current timestamp if peerdbCols.SyncedAtColName != "" { - tmpArray = append(tmpArray, fmt.Sprintf("`%s` = CURRENT_TIMESTAMP", + tmpArray = append(tmpArray, fmt.Sprintf("`%s`=CURRENT_TIMESTAMP", peerdbCols.SyncedAtColName)) } // set soft-deleted to false, tackles insert after soft-delete if peerdbCols.SoftDeleteColName != "" { - tmpArray = append(tmpArray, fmt.Sprintf("`%s` = FALSE", + tmpArray = append(tmpArray, fmt.Sprintf("`%s`=FALSE", peerdbCols.SoftDeleteColName)) } - ssep := strings.Join(tmpArray, ", ") + ssep := strings.Join(tmpArray, ",") updateStmt := fmt.Sprintf(`WHEN MATCHED AND - (_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s' - THEN UPDATE SET %s `, cols, ssep) + _rt!=2 AND _ut='%s' + THEN UPDATE SET %s`, cols, ssep) updateStmts = append(updateStmts, updateStmt) // generates update statements for the case where updates and deletes happen in the same branch @@ -219,11 +218,11 @@ func (m *mergeStmtGenerator) generateUpdateStatements( // and then set soft-delete to true. if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") { tmpArray = append(tmpArray[:len(tmpArray)-1], - fmt.Sprintf("`%s` = TRUE", peerdbCols.SoftDeleteColName)) - ssep := strings.Join(tmpArray, ", ") + fmt.Sprintf("`%s`=TRUE", peerdbCols.SoftDeleteColName)) + ssep := strings.Join(tmpArray, ",") updateStmt := fmt.Sprintf(`WHEN MATCHED AND - (_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns='%s' - THEN UPDATE SET %s `, cols, ssep) + _rt=2 AND _ut='%s' + THEN UPDATE SET %s`, cols, ssep) updateStmts = append(updateStmts, updateStmt) } } diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 37dd3e07ed..005554bd4b 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -14,31 +14,31 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { unchangedToastCols := []string{"", "col2, col3", "col2", "col3"} expected := []string{ - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns=''" + - " THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2,`col3`=_peerdb_deduped.col3," + + "WHEN MATCHED AND _rt!=2 AND _ut=''" + + " THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2,`col3`=_d.col3," + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) " + - "AND _peerdb_unchanged_toast_columns='' " + - "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2," + - "`col3`=_peerdb_deduped.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col2,col3' " + - "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col2,col3' " + - "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) " + - "AND _peerdb_unchanged_toast_columns='col2' " + - "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," + + "WHEN MATCHED AND _rt=2 " + + "AND _ut='' " + + "THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2," + + "`col3`=_d.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "WHEN MATCHED AND _rt!=2 AND _ut='col2,col3' " + + "THEN UPDATE SET `col1`=_d.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", + "WHEN MATCHED AND _rt=2 AND _ut='col2,col3' " + + "THEN UPDATE SET `col1`=_d.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "WHEN MATCHED AND _rt!=2 " + + "AND _ut='col2' " + + "THEN UPDATE SET `col1`=_d.col1,`col3`=_d.col3," + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE", - "WHEN MATCHED AND(_peerdb_deduped._peerdb_record_type=2) " + - "AND _peerdb_unchanged_toast_columns='col2' " + - "THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," + + "WHEN MATCHED AND _rt=2 " + + "AND _ut='col2' " + + "THEN UPDATE SET `col1`=_d.col1,`col3`=_d.col3," + "`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col3' " + - "THEN UPDATE SET `col1`=_peerdb_deduped.col1," + - "`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col3' " + - "THEN UPDATE SET `col1`=_peerdb_deduped.col1," + - "`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", + "WHEN MATCHED AND _rt!=2 AND _ut='col3' " + + "THEN UPDATE SET `col1`=_d.col1," + + "`col2`=_d.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ", + "WHEN MATCHED AND _rt=2 AND _ut='col3' " + + "THEN UPDATE SET `col1`=_d.col1," + + "`col2`=_d.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{ @@ -63,18 +63,18 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { unchangedToastCols := []string{""} expected := []string{ - "WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2) " + - "AND _peerdb_unchanged_toast_columns=''" + + "WHEN MATCHED AND _rt!=2 " + + "AND _ut=''" + "THEN UPDATE SET " + - "`col1` = _peerdb_deduped.col1," + - " `col2` = _peerdb_deduped.col2," + - " `col3` = _peerdb_deduped.col3," + - " `synced_at`=CURRENT_TIMESTAMP," + + "`col1`=_d.col1," + + "`col2`=_d.col2," + + "`col3`=_d.col3," + + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE", "WHEN MATCHED AND" + - "(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns=''" + - "THEN UPDATE SET `col1` = _peerdb_deduped.col1, `col2` = _peerdb_deduped.col2, " + - "`col3` = _peerdb_deduped.col3, `synced_at` = CURRENT_TIMESTAMP, `deleted` = TRUE", + "_rt=2 AND _ut=''" + + "THEN UPDATE SET `col1`=_d.col1,`col2`=_d.col2, " + + "`col3`=_d.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE", } result := m.generateUpdateStatements(allCols, unchangedToastCols,