Skip to content

Commit

Permalink
fix added float column
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 16, 2024
1 parent 78a8cdb commit 2ca2636
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
8 changes: 6 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,14 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(

for _, addedColumn := range schemaDelta.AddedColumns {
dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName)
addedColumnBigQueryType := qValueKindToBigQueryType(addedColumn.ColumnType)
// string(bigquery.FloatFieldType) is "FLOAT" which is not a BigQuery type.
if addedColumnBigQueryType == bigquery.FloatFieldType {
addedColumnBigQueryType = "FLOAT64"
}
query := c.client.Query(fmt.Sprintf(
"ALTER TABLE %s ADD COLUMN IF NOT EXISTS `%s` %s",
dstDatasetTable.table, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType)))
dstDatasetTable.table, addedColumn.ColumnName, addedColumnBigQueryType))
query.DefaultProjectID = c.projectID
query.DefaultDatasetID = dstDatasetTable.dataset
_, err := query.Read(ctx)
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// alter source table, add column c3, drop column c2 and insert another row.
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 FLOAT`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c2 and added column c3")
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName))
INSERT INTO %s(c1,c3) VALUES (3,3.5)`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c3 in the source table")

Expand Down

0 comments on commit 2ca2636

Please sign in to comment.