diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 22ac244aa9..898f962aec 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -458,7 +458,7 @@ func generateCreateTableSQLForNormalizedTable( } // add composite primary key to the table - if len(sourceTableSchema.PrimaryKeyColumns) > 0 { + if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull { primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns)) for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns { primaryKeyColsQuoted = append(primaryKeyColsQuoted, QuoteIdentifier(primaryKeyCol)) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 775c130665..4ba626a053 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -727,7 +727,7 @@ func generateCreateTableSQLForNormalizedTable( } // add composite primary key to the table - if len(sourceTableSchema.PrimaryKeyColumns) > 0 { + if len(sourceTableSchema.PrimaryKeyColumns) > 0 && !sourceTableSchema.IsReplicaIdentityFull { normalizedPrimaryKeyCols := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns)) for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns { normalizedPrimaryKeyCols = append(normalizedPrimaryKeyCols,