Skip to content

Commit

Permalink
For replica identity full don't create pkey on snowflake (#1478)
Browse files Browse the repository at this point in the history
We create primary key indexes on the destination table in Snowflake CDC.
For Replica Identity Full on a table, we assume all columns to be
primary keys and replay that to the destination.
But if one of those columns on the source have null values, our
normalize records step fails saying `NULL result in a non-nullable
column`
This PR fixes this by not creating Primary Keys on the target for
replica identity full

Co-authored-by: Amogh Bharadwaj <[email protected]>
  • Loading branch information
iskakaushik and Amogh-Bharadwaj authored Mar 12, 2024
1 parent d29f06d commit ad65c28
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func generateCreateTableSQLForNormalizedTable(
}

// add composite primary key to the table
if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull {
primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsQuoted = append(primaryKeyColsQuoted, QuoteIdentifier(primaryKeyCol))
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func generateCreateTableSQLForNormalizedTable(
}

// add composite primary key to the table
if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull {
normalizedPrimaryKeyCols := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
normalizedPrimaryKeyCols = append(normalizedPrimaryKeyCols,
Expand Down

0 comments on commit ad65c28

Please sign in to comment.