Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change typeset to typelist for cdc #48

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.18
go-version: 1.22

- name: Install utilities
run: |
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ydb-platform/terraform-provider-ydb

go 1.18
go 1.22.0

require (
github.com/golang/mock v1.6.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM=
github.com/apparentlymart/go-dump v0.0.0-20190214190832-042adf3cf4a0 h1:MzVXffFUye+ZcSR6opIgz9Co7WcDx6ZcY+RjfFHoA0I=
github.com/apparentlymart/go-dump v0.0.0-20190214190832-042adf3cf4a0/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM=
github.com/apparentlymart/go-textseg v1.0.0/go.mod h1:z96Txxhf3xSFMPmb5X/1W05FF/Nj9VFpLOpjS5yuumk=
github.com/apparentlymart/go-textseg/v12 v12.0.0/go.mod h1:S/4uRK2UtaQttw1GenVJEynmyUenKwP++x/+DdGV/Ec=
github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw=
Expand Down Expand Up @@ -120,6 +121,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
Expand All @@ -138,6 +140,7 @@ github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/nsf/jsondiff v0.0.0-20200515183724-f29ed568f4ce h1:RPclfga2SEJmgMmz2k+Mg7cowZ8yv4Trqw9UsJby758=
github.com/nsf/jsondiff v0.0.0-20200515183724-f29ed568f4ce/go.mod h1:uFMI8w+ref4v2r9jz+c9i1IfIttS/OkmLfrk1jne5hs=
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
100 changes: 100 additions & 0 deletions internal/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package helpers
import (
"context"
"fmt"
"sort"
"strings"

"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
"google.golang.org/grpc"

"github.com/ydb-platform/terraform-provider-ydb/internal/helpers/topic"
"github.com/ydb-platform/terraform-provider-ydb/sdk/terraform/auth"
)

Expand Down Expand Up @@ -110,3 +113,100 @@ func GetToken(ctx context.Context, creds auth.YdbCredentials, conn *grpc.ClientC
}
return creds.Token, nil
}

func codecsSort(schCodecs []interface{}, descCodecs []topictypes.Codec) []topictypes.Codec {
// Создаем множество элементов из b
setDescCodecs := make(map[topictypes.Codec]struct{})
for _, codec := range descCodecs {
setDescCodecs[codec] = struct{}{}
}

// Создаем множество элементов из a
setSchCodecs := make(map[topictypes.Codec]struct{})
for _, codecRaw := range schCodecs {
codecStr := topic.YDBTopicCodecNameToCodec[strings.ToLower(codecRaw.(string))]
setSchCodecs[codecStr] = struct{}{}
}

var res []topictypes.Codec

// Добавляем элементы из a, которые есть в b (в порядке a)
for _, codecRaw := range schCodecs {
codecStr := topic.YDBTopicCodecNameToCodec[strings.ToLower(codecRaw.(string))]
if _, ok := setDescCodecs[codecStr]; ok {
res = append(res, codecStr)
}
}

// Добавляем элементы из b, которых нет в a (в порядке b)
for _, codec := range descCodecs {
if _, ok := setSchCodecs[codec]; !ok {
res = append(res, codec)
}
}

return res
}

func ConsumerSort(schRaw interface{}, descRaw []topictypes.Consumer) []topictypes.Consumer {
nameMap := make(map[string]topictypes.Consumer, len(descRaw))
for _, c := range descRaw {
nameMap[c.Name] = c
}

result := make([]topictypes.Consumer, 0, len(descRaw))

for _, raw := range schRaw.([]interface{}) {
schCons := raw.(map[string]interface{})
name := schCons["name"].(string)

if consumer, ok := nameMap[name]; ok {
codecsRaw := schCons["supported_codecs"].([]interface{})
consumer.SupportedCodecs = codecsSort(codecsRaw, consumer.SupportedCodecs)
result = append(result, consumer)
delete(nameMap, name)
}
}

consVal := make([]topictypes.Consumer, 0, len(nameMap))
for _, v := range nameMap {
consVal = append(consVal, v)
}
sort.Slice(consVal, func(i, j int) bool {
return consVal[i].Name < consVal[j].Name
})
result = append(result, consVal...)

return result
}

func AreAllElementsUnique(consumers []topictypes.Consumer) error {
// Используем struct{} вместо bool для экономии памяти
uniqueConsumers := make(map[string]struct{}, len(consumers))
var codecCache map[topictypes.Codec]struct{} // Будем переиспользовать мапу

for _, consumer := range consumers {
// Проверка уникальности имени потребителя
if _, exists := uniqueConsumers[consumer.Name]; exists {
return fmt.Errorf("non unique consumer: %s", consumer.Name)
}
uniqueConsumers[consumer.Name] = struct{}{}

// Переиспользуем мапу с очисткой вместо создания новой
if codecCache == nil {
codecCache = make(map[topictypes.Codec]struct{}, len(consumer.SupportedCodecs))
} else {
clear(codecCache)
}

// Проверка уникальности кодеков
for _, codec := range consumer.SupportedCodecs {
if _, exists := codecCache[codec]; exists {
codecName := topic.YDBTopicCodecToCodecName[codec] // Выносим преобразование
return fmt.Errorf("non unique codec: %s in consumer: %s", codecName, consumer.Name)
}
codecCache[codec] = struct{}{}
}
}
return nil
}
157 changes: 156 additions & 1 deletion internal/helpers/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package helpers

import "testing"
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
)

func TestParseYDBDatabaseEndpoint(t *testing.T) {
testData := []struct {
Expand Down Expand Up @@ -84,3 +89,153 @@ func TestParseYDBDatabaseEndpoint(t *testing.T) {
})
}
}

func TestConsumerSort(t *testing.T) {
sch := []interface{}{
map[string]interface{}{
"name": "cons1",
"supported_codecs": []interface{}{
"gzip", // 2
"raw", // 1
// "zstd", // 4
},
},
map[string]interface{}{
"name": "cons2",
"supported_codecs": []interface{}{
"gzip",
"raw",
},
},
}
consDesc := []topictypes.Consumer{
{
Name: "cons2",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
{
Name: "cons1",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
{
Name: "cons3",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
{
Name: "cons4",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
}

res := ConsumerSort(sch, consDesc)
assert.Equal(t, []topictypes.Consumer{
{
Name: "cons1",
SupportedCodecs: []topictypes.Codec{
2,
1,
4,
},
},
{
Name: "cons2",
SupportedCodecs: []topictypes.Codec{
2,
1,
4,
},
},
{
Name: "cons3",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
{
Name: "cons4",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
}, res)
}

func TestAreAllElementsUnique(t *testing.T) {
consDesc := []topictypes.Consumer{
{
Name: "cons2",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
{
Name: "cons1",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
}
consDescDup := []topictypes.Consumer{
{
Name: "cons2",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
{
Name: "cons2",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
}
consDescCodecDup := []topictypes.Consumer{
{
Name: "cons2",
SupportedCodecs: []topictypes.Codec{
1,
2,
4,
},
},
{
Name: "cons1",
SupportedCodecs: []topictypes.Codec{
1,
2,
2,
},
},
}
assert.NoError(t, AreAllElementsUnique(consDesc))
assert.Error(t, AreAllElementsUnique(consDescDup))
assert.Error(t, AreAllElementsUnique(consDescCodecDup))
}
20 changes: 12 additions & 8 deletions internal/resources/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ func expandConsumers(d *schema.ResourceData) []topictypes.Consumer {
return nil
}

pSet := v.(*schema.Set)
result := make([]topictypes.Consumer, 0, len(pSet.List()))
for _, l := range pSet.List() {
pList := v.([]interface{})
result := make([]topictypes.Consumer, 0, len(pList))
for _, l := range pList {
consumer := l.(map[string]interface{})
supportedCodecs, ok := consumer["supported_codecs"].(*schema.Set)
supportedCodecs, ok := consumer["supported_codecs"].([]interface{})
if !ok {
for _, vv := range topic.YDBTopicAllowedCodecs {
supportedCodecs.Add(vv)
supportedCodecs = append(supportedCodecs, vv)
}
}
consumerName := consumer["name"].(string)
startingMessageTS, ok := consumer["starting_message_timestamp_ms"].(int)
if !ok {
startingMessageTS = 0
}
codecs := make([]topictypes.Codec, 0, len(supportedCodecs.List()))
for _, c := range supportedCodecs.List() {
codecs := make([]topictypes.Codec, 0, len(supportedCodecs))
for _, c := range supportedCodecs {
codec := c.(string)
codecs = append(codecs, topic.YDBTopicCodecNameToCodec[strings.ToLower(codec)])
}
Expand Down Expand Up @@ -164,7 +164,11 @@ func flattenCDCDescription(
return
}

return d.Set("consumer", topic.FlattenConsumersDescription(consumers))
curConsRaw := d.Get("consumer")
cons := helpers.ConsumerSort(curConsRaw, consumers)

// Записываем обновленных потребителей обратно в данные
return d.Set("consumer", topic.FlattenConsumersDescription(cons))
}

func parseTablePathFromCDCEntity(entityPath string) string {
Expand Down
5 changes: 5 additions & 0 deletions internal/resources/changefeed/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func (h *handler) Create(ctx context.Context, d *schema.ResourceData, meta inter
},
}
}

if err := helpers.AreAllElementsUnique(cdcResource.Consumers); err != nil {
return diag.FromErr(err)
}

db, err := tbl.CreateDBConnection(ctx, tbl.ClientParams{
DatabaseEndpoint: cdcResource.getConnectionString(),
AuthCreds: h.authCreds,
Expand Down
Loading
Loading