diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4d642f402f..7d40c0617e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -930,16 +930,6 @@ func (c *BigQueryConnector) SetupNormalizedTables( ) (*protos.SetupNormalizedTableBatchOutput, error) { tableExistsMapping := make(map[string]bool) for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { - table := c.client.Dataset(c.datasetID).Table(tableIdentifier) - - // check if the table exists - _, err := table.Metadata(c.ctx) - if err == nil { - // table exists, go to next table - tableExistsMapping[tableIdentifier] = true - continue - } - // convert the column names and types to bigquery types columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns)) idx := 0 @@ -952,8 +942,24 @@ func (c *BigQueryConnector) SetupNormalizedTables( idx++ } - // create the table using the columns + table := c.client.Dataset(c.datasetID).Table(tableIdentifier) schema := bigquery.Schema(columns) + // check if the table exists + metadata, err := table.Metadata(c.ctx) + if err == nil { // it exists. + // check if the structure of the table is as needed + existingSchema := metadata.Schema + for i, existingField := range existingSchema { + desiredField := schema[i] + if existingField.Name != desiredField.Name || existingField.Type != desiredField.Type { + return nil, fmt.Errorf("failed to setup normalized table due to incompatible columns") + } + } + // table exists, go to next table + tableExistsMapping[tableIdentifier] = true + continue + } + err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema}) if err != nil { return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)