From ad65c28496553337182e65de14ab1a0de3b11292 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 12 Mar 2024 15:54:23 -0400 Subject: [PATCH] For replica identity full don't create pkey on snowflake (#1478) We create primary key indexes on the destination table in Snowflake CDC. For Replica Identity Full on a table, we assume all columns to be primary keys and replay that to the destination. But if one of those columns on the source have null values, our normalize records step fails saying `NULL result in a non-nullable column` This PR fixes this by not creating Primary Keys on the target for replica identity full Co-authored-by: Amogh Bharadwaj --- flow/connectors/postgres/client.go | 2 +- flow/connectors/snowflake/snowflake.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 22ac244aa9..898f962aec 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -458,7 +458,7 @@ func generateCreateTableSQLForNormalizedTable( } // add composite primary key to the table - if len(sourceTableSchema.PrimaryKeyColumns) > 0 { + if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull { primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns)) for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns { primaryKeyColsQuoted = append(primaryKeyColsQuoted, QuoteIdentifier(primaryKeyCol)) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 775c130665..4ba626a053 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -727,7 +727,7 @@ func generateCreateTableSQLForNormalizedTable( } // add composite primary key to the table - if len(sourceTableSchema.PrimaryKeyColumns) > 0 { + if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull { normalizedPrimaryKeyCols := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns)) for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns { normalizedPrimaryKeyCols = append(normalizedPrimaryKeyCols,