diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 93bedcc337..5b2b8b01f8 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -295,10 +295,10 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas( for _, addedColumn := range schemaDelta.AddedColumns { dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName) + addedColumnBigQueryType := qValueKindToBigQueryTypeString(addedColumn.ColumnType) query := c.client.Query(fmt.Sprintf( "ALTER TABLE %s ADD COLUMN IF NOT EXISTS `%s` %s", - dstDatasetTable.table, addedColumn.ColumnName, - qValueKindToBigQueryType(addedColumn.ColumnType))) + dstDatasetTable.table, addedColumn.ColumnName, addedColumnBigQueryType)) query.DefaultProjectID = c.projectID query.DefaultDatasetID = dstDatasetTable.dataset _, err := query.Read(ctx) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 57fd00417e..989e8f7435 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -4,8 +4,6 @@ import ( "fmt" "strings" - "cloud.google.com/go/bigquery" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -36,18 +34,14 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { for _, column := range m.normalizedTableSchema.Columns { colType := column.Type - bqType := qValueKindToBigQueryType(colType) - // CAST doesn't work for FLOAT, so rewrite it to FLOAT64. - if bqType == bigquery.FloatFieldType { - bqType = "FLOAT64" - } + bqTypeString := qValueKindToBigQueryTypeString(colType) var castStmt string shortCol := m.shortColumn[column.Name] switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON, qvalue.QValueKindHStore: // if the type is JSON, then just extract JSON castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", - column.Name, bqType, shortCol) + column.Name, bqTypeString, shortCol) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`", @@ -58,10 +52,10 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { qvalue.QValueKindArrayDate: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element WHERE element IS NOT null) AS `%s`", - bqType, column.Name, shortCol) + bqTypeString, column.Name, shortCol) case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: castStmt = fmt.Sprintf("CAST(ST_GEOGFROMTEXT(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`", - column.Name, bqType, shortCol) + column.Name, bqTypeString, shortCol) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} // json.Marshal in SyncRecords for Postgres already does this - once new data-stores are added, @@ -79,7 +73,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // column.Name, column.Name) default: castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", - column.Name, bqType, shortCol) + column.Name, bqTypeString, shortCol) } flattenedProjs = append(flattenedProjs, castStmt) } diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index 2fc179ea25..6a21b217bd 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -92,3 +92,16 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind, return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType) } } + +func qValueKindToBigQueryTypeString(colType string) string { + bqType := qValueKindToBigQueryType(colType) + bqTypeAsString := string(bqType) + // string(bigquery.FloatFieldType) is "FLOAT" which is not a BigQuery type. + if bqType == bigquery.FloatFieldType { + bqTypeAsString = "FLOAT64" + } + if bqType == bigquery.BooleanFieldType { + bqTypeAsString = "BOOL" + } + return bqTypeAsString +} diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 678b3fc15f..c5c4170fbd 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -705,11 +705,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) + ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 FLOAT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName)) + INSERT INTO %s(c1,c3) VALUES (3,3.5)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table")