Skip to content

Commit

Permalink
fixed fallback statements for mixed case (#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 27, 2023
1 parent f302598 commit a49d73a
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,27 +488,33 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
for columnName, genericColumnType := range normalizedTableSchema.Columns {
columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName))
pgType := qValueKindToPostgresType(genericColumnType)
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS \"%s\"",
columnName, pgType, columnName))
if strings.Contains(genericColumnType, "array") {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
} else {
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
}
if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, columnName) {
primaryKeyColumnCasts[columnName] = fmt.Sprintf("(_peerdb_data->>'%s')::%s", columnName, pgType)
}
}
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",")
parsedDstTable, _ := utils.ParseSchemaTable(destinationTableIdentifier)

insertColumnsSQL := strings.TrimSuffix(strings.Join(columnNames, ","), ",")
updateColumnsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns))
for columnName := range normalizedTableSchema.Columns {
updateColumnsSQLArray = append(updateColumnsSQLArray, fmt.Sprintf("%s=EXCLUDED.%s", columnName, columnName))
updateColumnsSQLArray = append(updateColumnsSQLArray, fmt.Sprintf(`"%s"=EXCLUDED."%s"`, columnName, columnName))
}
updateColumnsSQL := strings.TrimSuffix(strings.Join(updateColumnsSQLArray, ","), ",")
deleteWhereClauseArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
for columnName, columnCast := range primaryKeyColumnCasts {
deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf("%s.%s=%s AND ",
destinationTableIdentifier, columnName, columnCast))
deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf(`%s."%s"=%s AND `,
parsedDstTable.String(), columnName, columnCast))
}
deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ")
parsedDstTable, _ := utils.ParseSchemaTable(destinationTableIdentifier)

fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL,
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), c.metadataSchema,
Expand Down

0 comments on commit a49d73a

Please sign in to comment.