From a49d73a710757bf4e939ce5e1ea43b5adb3b8aca Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 28 Nov 2023 02:14:51 +0530 Subject: [PATCH] fixed fallback statements for mixed case (#727) --- flow/connectors/postgres/client.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 9a66d2a51b..4c5fb6a3c1 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -488,27 +488,33 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie for columnName, genericColumnType := range normalizedTableSchema.Columns { columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName)) pgType := qValueKindToPostgresType(genericColumnType) - flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS \"%s\"", - columnName, pgType, columnName)) + if strings.Contains(genericColumnType, "array") { + flattenedCastsSQLArray = append(flattenedCastsSQLArray, + fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"", + strings.Trim(columnName, "\""), pgType, columnName)) + } else { + flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS \"%s\"", + strings.Trim(columnName, "\""), pgType, columnName)) + } if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, columnName) { primaryKeyColumnCasts[columnName] = fmt.Sprintf("(_peerdb_data->>'%s')::%s", columnName, pgType) } } flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",") + parsedDstTable, _ := utils.ParseSchemaTable(destinationTableIdentifier) insertColumnsSQL := strings.TrimSuffix(strings.Join(columnNames, ","), ",") updateColumnsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns)) for columnName := range normalizedTableSchema.Columns { - updateColumnsSQLArray = append(updateColumnsSQLArray, fmt.Sprintf("%s=EXCLUDED.%s", columnName, columnName)) + updateColumnsSQLArray = append(updateColumnsSQLArray, fmt.Sprintf(`"%s"=EXCLUDED."%s"`, columnName, columnName)) } updateColumnsSQL := strings.TrimSuffix(strings.Join(updateColumnsSQLArray, ","), ",") deleteWhereClauseArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns)) for columnName, columnCast := range primaryKeyColumnCasts { - deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf("%s.%s=%s AND ", - destinationTableIdentifier, columnName, columnCast)) + deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf(`%s."%s"=%s AND `, + parsedDstTable.String(), columnName, columnCast)) } deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ") - parsedDstTable, _ := utils.ParseSchemaTable(destinationTableIdentifier) fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL, strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), c.metadataSchema,