diff --git a/flow/shared/schema_helpers.go b/flow/shared/schema_helpers.go index 08babdd4cc..7f48d3cbb5 100644 --- a/flow/shared/schema_helpers.go +++ b/flow/shared/schema_helpers.go @@ -50,14 +50,19 @@ func BuildProcessedSchemaMapping( dstTableName = mapping.DestinationTableIdentifier if len(mapping.Exclude) != 0 { columns := make([]*protos.FieldDescription, 0, len(tableSchema.Columns)) + pkeyColumns := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) for _, column := range tableSchema.Columns { if !slices.Contains(mapping.Exclude, column.Name) { columns = append(columns, column) } + if slices.Contains(tableSchema.PrimaryKeyColumns, column.Name) && + !slices.Contains(mapping.Exclude, column.Name) { + pkeyColumns = append(pkeyColumns, column.Name) + } } tableSchema = &protos.TableSchema{ TableIdentifier: tableSchema.TableIdentifier, - PrimaryKeyColumns: tableSchema.PrimaryKeyColumns, + PrimaryKeyColumns: pkeyColumns, IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, NullableEnabled: tableSchema.NullableEnabled, System: tableSchema.System,