Skip to content

Commit

Permalink
bigquery check
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 5, 2023
1 parent 5f44454 commit a4a439f
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,16 +930,6 @@ func (c *BigQueryConnector) SetupNormalizedTables(
) (*protos.SetupNormalizedTableBatchOutput, error) {
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
table := c.client.Dataset(c.datasetID).Table(tableIdentifier)

// check if the table exists
_, err := table.Metadata(c.ctx)
if err == nil {
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
continue
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns))
idx := 0
Expand All @@ -952,8 +942,24 @@ func (c *BigQueryConnector) SetupNormalizedTables(
idx++
}

// create the table using the columns
table := c.client.Dataset(c.datasetID).Table(tableIdentifier)
schema := bigquery.Schema(columns)
// check if the table exists
metadata, err := table.Metadata(c.ctx)
if err == nil { // it exists.
// check if the structure of the table is as needed
existingSchema := metadata.Schema
for i, existingField := range existingSchema {
desiredField := schema[i]
if existingField.Name != desiredField.Name || existingField.Type != desiredField.Type {
return nil, fmt.Errorf("failed to setup normalized table due to incompatible columns")
}
}
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
continue
}

err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})
if err != nil {
return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)
Expand Down

0 comments on commit a4a439f

Please sign in to comment.