diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 9c117824ad..252a2b2ae7 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -295,10 +295,14 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas( for _, addedColumn := range schemaDelta.AddedColumns { dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName) + addedColumnBigQueryType := qValueKindToBigQueryType(addedColumn.ColumnType) + // string(bigquery.FloatFieldType) is "FLOAT" which is not a BigQuery type. + if addedColumnBigQueryType == bigquery.FloatFieldType { + addedColumnBigQueryType = "FLOAT64" + } query := c.client.Query(fmt.Sprintf( "ALTER TABLE %s ADD COLUMN IF NOT EXISTS `%s` %s", - dstDatasetTable.table, addedColumn.ColumnName, - qValueKindToBigQueryType(addedColumn.ColumnType))) + dstDatasetTable.table, addedColumn.ColumnName, addedColumnBigQueryType)) query.DefaultProjectID = c.projectID query.DefaultDatasetID = dstDatasetTable.dataset _, err := query.Read(ctx) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 3dff6e310e..310bbca408 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -696,11 +696,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) + ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 FLOAT`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName)) + INSERT INTO %s(c1,c3) VALUES (3,3.5)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table")