Skip to content

Commit

Permalink
short aliases for deduped table columns
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 6, 2024
1 parent 6a263b2 commit 17b2f82
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmt := mergeGen.generateMergeStmt()
Expand Down
42 changes: 25 additions & 17 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,43 @@ type mergeStmtGenerator struct {
unchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
// map for shorter columns
shortColumn map[string]string
}

// generateFlattenedCTE generates a flattened CTE.
func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR
// statement.
flattenedProjs := make([]string, 0, utils.TableSchemaColumns(m.normalizedTableSchema)+3)
utils.IterColumns(m.normalizedTableSchema, func(colName, colType string) {
i := 0
for colName, colType := range m.normalizedTableSchema.Columns {
bqType := qValueKindToBigQueryType(colType)
// CAST doesn't work for FLOAT, so rewrite it to FLOAT64.
if bqType == bigquery.FloatFieldType {
bqType = "FLOAT64"
}
var castStmt string

i += 1
shortCol := fmt.Sprintf("_c%d", i)
m.shortColumn[colName] = shortCol
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data,'$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, colName)
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, shortCol)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
colName, colName)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data,'$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, colName)
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, shortCol)
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data,'$.%s')) AS %s) AS `%s`",
colName, bqType, colName)
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`",
colName, bqType, shortCol)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
// json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added,
Expand All @@ -75,11 +80,11 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// " AS int64))) AS %s",
// colName, colName)
default:
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data,'$.%s') AS %s) AS `%s`",
colName, bqType, colName)
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, shortCol)
}
flattenedProjs = append(flattenedProjs, castStmt)
})
}
flattenedProjs = append(
flattenedProjs,
"_peerdb_timestamp",
Expand Down Expand Up @@ -115,14 +120,17 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
// comma separated list of column names
columnCount := utils.TableSchemaColumns(m.normalizedTableSchema)
backtickColNames := make([]string, 0, columnCount)
shortBacktickColNames := make([]string, 0, columnCount)
pureColNames := make([]string, 0, columnCount)
utils.IterColumns(m.normalizedTableSchema, func(colName, _ string) {
for colName := range m.normalizedTableSchema.Columns {
backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName))
shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", m.shortColumn[colName]))
pureColNames = append(pureColNames, colName)
})
csep := strings.Join(backtickColNames, ",")
insertColumnsSQL := csep + fmt.Sprintf(",`%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"
}
csep := strings.Join(backtickColNames, ", ")
shortCsep := strings.Join(shortBacktickColNames, ", ")
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := shortCsep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.unchangedToastColumns, m.peerdbCols)
Expand Down Expand Up @@ -193,7 +201,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, colName))
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, m.shortColumn[colName]))
}

// set the synced at column to the current timestamp
Expand Down

0 comments on commit 17b2f82

Please sign in to comment.