From a4a439f2b3b3020ef9c48a0038a9f9b6334bbdf3 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 4 Dec 2023 18:24:42 +0530 Subject: [PATCH] bigquery check --- flow/connectors/bigquery/bigquery.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4d642f402f..7d40c0617e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -930,16 +930,6 @@ func (c *BigQueryConnector) SetupNormalizedTables( ) (*protos.SetupNormalizedTableBatchOutput, error) { tableExistsMapping := make(map[string]bool) for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { - table := c.client.Dataset(c.datasetID).Table(tableIdentifier) - - // check if the table exists - _, err := table.Metadata(c.ctx) - if err == nil { - // table exists, go to next table - tableExistsMapping[tableIdentifier] = true - continue - } - // convert the column names and types to bigquery types columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns)) idx := 0 @@ -952,8 +942,24 @@ func (c *BigQueryConnector) SetupNormalizedTables( idx++ } - // create the table using the columns + table := c.client.Dataset(c.datasetID).Table(tableIdentifier) schema := bigquery.Schema(columns) + // check if the table exists + metadata, err := table.Metadata(c.ctx) + if err == nil { // it exists. + // check if the structure of the table is as needed + existingSchema := metadata.Schema + for i, existingField := range existingSchema { + desiredField := schema[i] + if existingField.Name != desiredField.Name || existingField.Type != desiredField.Type { + return nil, fmt.Errorf("failed to setup normalized table due to incompatible columns") + } + } + // table exists, go to next table + tableExistsMapping[tableIdentifier] = true + continue + } + err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema}) if err != nil { return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)