Skip to content

Commit

Permalink
BigQuery schema changes: fix added float column (#1495)
Browse files Browse the repository at this point in the history
Currently when we add a float column to a table during a bigquery cdc
mirror, it fails saying :
```
Type not found: FLOAT
```

So we need to catch that and use FLOAT64 instead

---------

Co-authored-by: Kevin Biju <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and heavycrystal authored Mar 19, 2024
1 parent fdff1a7 commit 2399689
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 15 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(

for _, addedColumn := range schemaDelta.AddedColumns {
dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName)
addedColumnBigQueryType := qValueKindToBigQueryTypeString(addedColumn.ColumnType)
query := c.client.Query(fmt.Sprintf(
"ALTER TABLE %s ADD COLUMN IF NOT EXISTS `%s` %s",
dstDatasetTable.table, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType)))
dstDatasetTable.table, addedColumn.ColumnName, addedColumnBigQueryType))
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = dstDatasetTable.dataset
_, err := query.Read(ctx)
Expand Down
16 changes: 5 additions & 11 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"strings"

"cloud.google.com/go/bigquery"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -36,18 +34,14 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {

for _, column := range m.normalizedTableSchema.Columns {
colType := column.Type
bqType := qValueKindToBigQueryType(colType)
// CAST doesn't work for FLOAT, so rewrite it to FLOAT64.
if bqType == bigquery.FloatFieldType {
bqType = "FLOAT64"
}
bqTypeString := qValueKindToBigQueryTypeString(colType)
var castStmt string
shortCol := m.shortColumn[column.Name]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
column.Name, bqType, shortCol)
column.Name, bqTypeString, shortCol)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
Expand All @@ -58,10 +52,10 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
qvalue.QValueKindArrayDate:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
bqType, column.Name, shortCol)
bqTypeString, column.Name, shortCol)
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`",
column.Name, bqType, shortCol)
column.Name, bqTypeString, shortCol)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
// json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added,
Expand All @@ -79,7 +73,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// column.Name, column.Name)
default:
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
column.Name, bqType, shortCol)
column.Name, bqTypeString, shortCol)
}
flattenedProjs = append(flattenedProjs, castStmt)
}
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,16 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind,
return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType)
}
}

func qValueKindToBigQueryTypeString(colType string) string {
bqType := qValueKindToBigQueryType(colType)
bqTypeAsString := string(bqType)
// string(bigquery.FloatFieldType) is "FLOAT" which is not a BigQuery type.
if bqType == bigquery.FloatFieldType {
bqTypeAsString = "FLOAT64"
}
if bqType == bigquery.BooleanFieldType {
bqTypeAsString = "BOOL"
}
return bqTypeAsString
}
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,11 +705,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// alter source table, add column c3, drop column c2 and insert another row.
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 FLOAT`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c2 and added column c3")
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName))
INSERT INTO %s(c1,c3) VALUES (3,3.5)`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c3 in the source table")

Expand Down

0 comments on commit 2399689

Please sign in to comment.