From 401a6ad2f44b3ec818d685d1aebf278ff44be3ef Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 5 Dec 2023 19:07:12 +0530 Subject: [PATCH] account for peerdb cols in sf, better logging, checking --- flow/connectors/bigquery/bigquery.go | 19 ++++++++++++++++--- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/snowflake/snowflake.go | 6 +++--- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7d40c0617e..6b7f7a4044 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -949,10 +949,23 @@ func (c *BigQueryConnector) SetupNormalizedTables( if err == nil { // it exists. // check if the structure of the table is as needed existingSchema := metadata.Schema - for i, existingField := range existingSchema { - desiredField := schema[i] + if len(existingSchema) != len(schema) { + return nil, fmt.Errorf("failed to setup normalized table: number of columns on both sides differ") + } + for _, existingField := range existingSchema { + var desiredField *bigquery.FieldSchema + for id := range schema { + if schema[id].Name == existingField.Name { + desiredField = schema[id] + break + } + } + if desiredField == nil { + return nil, fmt.Errorf("destination column %s not found in source: ", existingField.Name) + } if existingField.Name != desiredField.Name || existingField.Type != desiredField.Type { - return nil, fmt.Errorf("failed to setup normalized table due to incompatible columns") + return nil, fmt.Errorf("failed to setup normalized table due to incompatible columns: %s (destination) and %s (source)", + existingField.Name, desiredField.Name) } } // table exists, go to next table diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 34247812a6..4dbe4cc9bf 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -650,7 +650,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab tableExistsMapping[tableIdentifier] = true log.Infoln("found existing normalized table, checking if it matches the desired schema") if len(destinationColumns) != len(sourceColumns) { - return nil, fmt.Errorf("failed to setup normalized table: schemas on both sides differ") + return nil, fmt.Errorf("failed to setup normalized table: number of columns on both sides differ") } for id := range destinationColumns { column := &destinationColumns[id] diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 00f5cb4476..2371aa7ce0 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -412,12 +412,12 @@ func (c *SnowflakeConnector) SetupNormalizedTables( if destinationColumns != nil { sourceColumns := req.TableNameSchemaMapping[tableIdentifier].Columns log.Infoln("found existing normalized table, checking if it matches the desired schema") - if len(destinationColumns) != len(sourceColumns) { - return nil, fmt.Errorf("failed to setup normalized table: schemas on both sides differ") - } for id := range destinationColumns { column := &destinationColumns[id] existingName := strings.ToLower(column.Name) + if _, found := strings.CutPrefix(existingName, "_peerdb"); found { + continue + } sourceType, ok := sourceColumns[existingName] if !ok { return nil, fmt.Errorf("failed to setup normalized table:"+