Skip to content

Commit

Permalink
account for peerdb cols in sf, better logging, checking
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 5, 2023
1 parent a0be7d4 commit 401a6ad
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
19 changes: 16 additions & 3 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,10 +949,23 @@ func (c *BigQueryConnector) SetupNormalizedTables(
if err == nil { // it exists.
// check if the structure of the table is as needed
existingSchema := metadata.Schema
for i, existingField := range existingSchema {
desiredField := schema[i]
if len(existingSchema) != len(schema) {
return nil, fmt.Errorf("failed to setup normalized table: number of columns on both sides differ")
}
for _, existingField := range existingSchema {
var desiredField *bigquery.FieldSchema
for id := range schema {
if schema[id].Name == existingField.Name {
desiredField = schema[id]
break
}
}
if desiredField == nil {
return nil, fmt.Errorf("destination column %s not found in source: ", existingField.Name)
}
if existingField.Name != desiredField.Name || existingField.Type != desiredField.Type {
return nil, fmt.Errorf("failed to setup normalized table due to incompatible columns")
return nil, fmt.Errorf("failed to setup normalized table due to incompatible columns: %s (destination) and %s (source)",
existingField.Name, desiredField.Name)
}
}
// table exists, go to next table
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab
tableExistsMapping[tableIdentifier] = true
log.Infoln("found existing normalized table, checking if it matches the desired schema")
if len(destinationColumns) != len(sourceColumns) {
return nil, fmt.Errorf("failed to setup normalized table: schemas on both sides differ")
return nil, fmt.Errorf("failed to setup normalized table: number of columns on both sides differ")
}
for id := range destinationColumns {
column := &destinationColumns[id]
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,12 @@ func (c *SnowflakeConnector) SetupNormalizedTables(
if destinationColumns != nil {
sourceColumns := req.TableNameSchemaMapping[tableIdentifier].Columns
log.Infoln("found existing normalized table, checking if it matches the desired schema")
if len(destinationColumns) != len(sourceColumns) {
return nil, fmt.Errorf("failed to setup normalized table: schemas on both sides differ")
}
for id := range destinationColumns {
column := &destinationColumns[id]
existingName := strings.ToLower(column.Name)
if _, found := strings.CutPrefix(existingName, "_peerdb"); found {
continue
}
sourceType, ok := sourceColumns[existingName]
if !ok {
return nil, fmt.Errorf("failed to setup normalized table:"+
Expand Down

0 comments on commit 401a6ad

Please sign in to comment.