caLHNqi{>~JQ96RI(GT}=JCVgoYML54+c>A7sZ(;|jFH{uOm~FO&g%HP
zOSD=C7H6uSX2f=9h9GGW$%Rw%fhr0wE+5iPdUw*@#t8mZbjj3%F||YJJMyCt5}72z
zPf2>gmabI^E&rglty{ji+iJ=k%0H+p~&e^|^1nHwWLex;D{h|XKYtpZ=zh;NE6k`4pxNfw6Q~uO5t=l*Iop%q))xCM0nS_ty7AG=3+m;us>DT$CC)0ydmalQsrFgR28nbP9MM%pgff^n~JO9lDZ
zX1r?61}7VJneD^X!wr7yt^V?{2f8D@n)>)t-TN)uKKLu}RRuo*juNghDNzm!JMs)vR$LfMRqw)GI8{VYV8@#22+Y16BFjQm#XTe+s0{z`>|3>D|>jR
zL?ea_Jlz5gr`rs~02P5>+cA9Ov#D*L!r!L^#?;tfiK6M=f~~)>!@l(z=G_(gi4w!A
z5tn{DIFZ7z=g@>|cT}H<`iL#;e_ZU)>U;)rXV(#Xp`Rewrh27#A4RhCPfDgyB1EUM
z@O7N)>VLDsO5NLp4DNHS>T)^#rHn5@bN)fbtj-@inMMxOjVti`hq
gdtzcD@ksF5cbVOS_U*<~zaAj>SxKr&0uu8700L0;!vFvP
literal 0
HcmV?d00001
From ad65c28496553337182e65de14ab1a0de3b11292 Mon Sep 17 00:00:00 2001
From: Kaushik Iska
Date: Tue, 12 Mar 2024 15:54:23 -0400
Subject: [PATCH 2/2] For replica identity full don't create pkey on snowflake
(#1478)
We create primary key indexes on the destination table in Snowflake CDC.
For Replica Identity Full on a table, we assume all columns to be
primary keys and replay that to the destination.
But if one of those columns on the source have null values, our
normalize records step fails saying `NULL result in a non-nullable
column`
This PR fixes this by not creating Primary Keys on the target for
replica identity full
Co-authored-by: Amogh Bharadwaj
---
flow/connectors/postgres/client.go | 2 +-
flow/connectors/snowflake/snowflake.go | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go
index 22ac244aa9..898f962aec 100644
--- a/flow/connectors/postgres/client.go
+++ b/flow/connectors/postgres/client.go
@@ -458,7 +458,7 @@ func generateCreateTableSQLForNormalizedTable(
}
// add composite primary key to the table
- if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
+ if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull {
primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsQuoted = append(primaryKeyColsQuoted, QuoteIdentifier(primaryKeyCol))
diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go
index 775c130665..4ba626a053 100644
--- a/flow/connectors/snowflake/snowflake.go
+++ b/flow/connectors/snowflake/snowflake.go
@@ -727,7 +727,7 @@ func generateCreateTableSQLForNormalizedTable(
}
// add composite primary key to the table
- if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
+ if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull {
normalizedPrimaryKeyCols := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
normalizedPrimaryKeyCols = append(normalizedPrimaryKeyCols,