From d23c26a573f4fbb7ea214d84fa05d0173565d484 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 9 Nov 2023 15:49:02 +0900 Subject: [PATCH] feat: add function option for serevice layer Signed-off-by: hlts2 --- internal/config/corrector.go | 17 ----- pkg/index/job/correction/service/corrector.go | 55 +++++++++----- .../job/correction/service/corrector_test.go | 10 +-- pkg/index/job/correction/service/options.go | 71 +++++++++++++++++++ pkg/index/job/correction/usecase/corrector.go | 12 ++-- 5 files changed, 117 insertions(+), 48 deletions(-) create mode 100644 pkg/index/job/correction/service/options.go diff --git a/internal/config/corrector.go b/internal/config/corrector.go index 2b5b56beec..3e1176c8bb 100644 --- a/internal/config/corrector.go +++ b/internal/config/corrector.go @@ -60,20 +60,3 @@ func (c *Corrector) Bind() *Corrector { } return c } - -// GetStreamListConcurrency returns the StreamListConcurrency field value if set, otherwise 200 is set, -// since not setting this could use up all the available momory -func (c *Corrector) GetStreamListConcurrency() int { - if c != nil { - return c.StreamListConcurrency - } - return 200 //nolint:gomnd -} - -// GetBboltAsyncWriteConcurrency returns 2048 when not specified since not setting this could use up all the available momory -func (c *Corrector) GetBboltAsyncWriteConcurrency() int { - if c != nil { - return c.BboltAsyncWriteConcurrency - } - return 2048 //nolint:gomnd -} diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 44f8b834be..e63eed5c16 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "os" + "reflect" "slices" "sync/atomic" "time" @@ -37,7 +38,6 @@ import ( "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync" "github.com/vdaas/vald/internal/sync/errgroup" - "github.com/vdaas/vald/pkg/index/job/correction/config" ) type contextTimeKey string @@ -60,7 +60,6 @@ type Corrector interface { } type correct struct { - cfg *config.Data discoverer discoverer.Client agentAddrs []string indexInfos sync.Map[string, *payload.Info_Index_Count] @@ -70,24 +69,46 @@ type correct struct { checkedIndexCount atomic.Uint64 correctedOldIndexCount atomic.Uint64 correctedReplicationCount atomic.Uint64 + + indexReplica int + streamListConcurrency int + bboltAsyncWriteConcurrency int } const filemode = 0o600 -func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { - d := file.Join(os.TempDir(), "bbolt") - file.MkdirAll(d, os.ModePerm) - dbfile := file.Join(d, "checkedid.db") - bolt, err := bbolt.New(dbfile, "", os.FileMode(filemode)) - if err != nil { +func New(opts ...Option) (_ Corrector, err error) { + c := new(correct) + for _, opt := range append(defaultOpts, opts...) { + if err := opt(c); err != nil { + oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + e := &errors.ErrCriticalOption{} + if errors.As(oerr, &e) { + log.Error(err) + return nil, oerr + } + log.Warn(oerr) + } + } + if err := c.kvsInit(); err != nil { return nil, err } + return c, nil +} + +func (c *correct) kvsInit() error { + dpath := file.Join(os.TempDir(), "bbolt") + err := file.MkdirAll(dpath, os.ModePerm) + if err != nil { + return err + } - return &correct{ - cfg: cfg, - discoverer: discoverer, - checkedID: bolt, - }, nil + dbfile := file.Join(dpath, "checkedid.db") + c.checkedID, err = bbolt.New(dbfile, "", os.FileMode(filemode)) + if err != nil { + return err + } + return nil } func (c *correct) StartClient(ctx context.Context) (<-chan error, error) { @@ -161,7 +182,7 @@ func (c *correct) correct(ctx context.Context) (err error) { } curTargetAgent := 0 - jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency) + jobErrs := make([]error, 0, c.streamListConcurrency) if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) { // current address is the leftAgentAddrs[0] because this is OrderedRange and @@ -178,12 +199,12 @@ func (c *correct) correct(ctx context.Context) (err error) { sctx, scancel := context.WithCancel(ctx) defer scancel() seg, sctx := errgroup.WithContext(sctx) - sconcurrency := c.cfg.Corrector.GetStreamListConcurrency() + sconcurrency := c.streamListConcurrency seg.SetLimit(sconcurrency) // errgroup for bbolt AsyncSet bolteg, ctx := errgroup.WithContext(ctx) - bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency() + bconcurrency := c.bboltAsyncWriteConcurrency bolteg.SetLimit(bconcurrency) log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency) @@ -419,7 +440,7 @@ func (c *correct) correctReplica( ) error { // diff < 0 means there is less replica than the correct number existReplica := len(foundReplicas) + 1 - diff := existReplica - c.cfg.Corrector.IndexReplica + diff := existReplica - c.indexReplica if diff == 0 { // replica number is correct return nil diff --git a/pkg/index/job/correction/service/corrector_test.go b/pkg/index/job/correction/service/corrector_test.go index 91a6b2fd4c..f63f2d8686 100644 --- a/pkg/index/job/correction/service/corrector_test.go +++ b/pkg/index/job/correction/service/corrector_test.go @@ -20,11 +20,9 @@ import ( tmock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/vdaas/vald/apis/grpc/v1/payload" - iconfig "github.com/vdaas/vald/internal/config" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/test/mock" - "github.com/vdaas/vald/pkg/index/job/correction/config" ) type mockDiscovererClient struct { @@ -430,12 +428,8 @@ func Test_correct_correctReplica(t *testing.T) { for _, tc := range tests { test := tc c := &correct{ - discoverer: &m, - cfg: &config.Data{ - Corrector: &iconfig.Corrector{ - IndexReplica: test.args.indexReplica, - }, - }, + discoverer: &m, + indexReplica: test.args.indexReplica, } // agentAddrs = availableAddrs + target.addr + found.addr diff --git a/pkg/index/job/correction/service/options.go b/pkg/index/job/correction/service/options.go new file mode 100644 index 0000000000..3d1ec633a7 --- /dev/null +++ b/pkg/index/job/correction/service/options.go @@ -0,0 +1,71 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/errors" +) + +// Option represents the functional option for index corrector. +type Option func(*correct) error + +var defaultOpts = []Option{ + WithStreamListConcurrency(200), //nolint:gomnd + WithBboltAsyncWriteConcurrency(2048), //nolint:gomnd +} + +// WithIndexReplica returns Option that sets index replica. +func WithIndexReplica(num int) Option { + return func(c *correct) error { + if num <= 1 { + return errors.NewErrCriticalOption("indexReplica", num, errors.ErrIndexReplicaOne) + } + c.indexReplica = num + return nil + } +} + +// WithDiscoverer returns Option that sets discoverer client. +func WithDiscoverer(client discoverer.Client) Option { + return func(c *correct) error { + if client == nil { + return errors.NewErrCriticalOption("discoverer", client) + } + c.discoverer = client + return nil + } +} + +// WithStreamListConcurrency returns Option that sets concurrency for StreamList field value. +func WithStreamListConcurrency(num int) Option { + return func(c *correct) error { + if num <= 0 { + return errors.NewErrInvalidOption("streamListConcurrency", num) + } + c.streamListConcurrency = num + return nil + } +} + +// WithBboltAsyncWriteConcurrency returns Option that sets concurrency for kvs async write. +func WithBboltAsyncWriteConcurrency(num int) Option { + return func(c *correct) error { + if num <= 0 { + return errors.NewErrInvalidOption("bboltAsyncWriteConcurrency", num) + } + c.bboltAsyncWriteConcurrency = num + return nil + } +} diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index 08463dbbbf..4a377561ed 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -21,7 +21,6 @@ import ( "github.com/vdaas/vald/internal/client/v1/client/discoverer" iconf "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" @@ -45,10 +44,6 @@ type run struct { } func New(cfg *config.Data) (r runner.Runner, err error) { - if cfg.Corrector.IndexReplica == 1 { - return nil, errors.ErrIndexReplicaOne - } - eg := errgroup.Get() dOpts, err := cfg.Corrector.Discoverer.Client.Opts() @@ -105,7 +100,12 @@ func New(cfg *config.Data) (r runner.Runner, err error) { return nil, err } - corrector, err := service.New(cfg, discoverer) + corrector, err := service.New( + service.WithDiscoverer(discoverer), + service.WithIndexReplica(cfg.Corrector.IndexReplica), + service.WithBboltAsyncWriteConcurrency(cfg.Corrector.BboltAsyncWriteConcurrency), + service.WithStreamListConcurrency(cfg.Corrector.StreamListConcurrency), + ) if err != nil { return nil, err }