From 6ce26787db879c85a743934e7254a5d5a0b46d1d Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 23 Oct 2024 02:09:49 +0530 Subject: [PATCH] remove excluded columns from PrimaryKeyColumns during processing (#2170) --- flow/shared/schema_helpers.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flow/shared/schema_helpers.go b/flow/shared/schema_helpers.go index 08babdd4cc..7f48d3cbb5 100644 --- a/flow/shared/schema_helpers.go +++ b/flow/shared/schema_helpers.go @@ -50,14 +50,19 @@ func BuildProcessedSchemaMapping( dstTableName = mapping.DestinationTableIdentifier if len(mapping.Exclude) != 0 { columns := make([]*protos.FieldDescription, 0, len(tableSchema.Columns)) + pkeyColumns := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) for _, column := range tableSchema.Columns { if !slices.Contains(mapping.Exclude, column.Name) { columns = append(columns, column) } + if slices.Contains(tableSchema.PrimaryKeyColumns, column.Name) && + !slices.Contains(mapping.Exclude, column.Name) { + pkeyColumns = append(pkeyColumns, column.Name) + } } tableSchema = &protos.TableSchema{ TableIdentifier: tableSchema.TableIdentifier, - PrimaryKeyColumns: tableSchema.PrimaryKeyColumns, + PrimaryKeyColumns: pkeyColumns, IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, NullableEnabled: tableSchema.NullableEnabled, System: tableSchema.System,