Skip to content

Commit

Permalink
feat: add function option for serevice layer
Browse files Browse the repository at this point in the history
Signed-off-by: hlts2 <[email protected]>
  • Loading branch information
hlts2 committed Nov 9, 2023
1 parent dbb1a75 commit d23c26a
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 48 deletions.
17 changes: 0 additions & 17 deletions internal/config/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,3 @@ func (c *Corrector) Bind() *Corrector {
}
return c
}

// GetStreamListConcurrency returns the StreamListConcurrency field value if set, otherwise 200 is set,
// since not setting this could use up all the available momory
func (c *Corrector) GetStreamListConcurrency() int {
if c != nil {
return c.StreamListConcurrency
}
return 200 //nolint:gomnd
}

// GetBboltAsyncWriteConcurrency returns 2048 when not specified since not setting this could use up all the available momory
func (c *Corrector) GetBboltAsyncWriteConcurrency() int {
if c != nil {
return c.BboltAsyncWriteConcurrency
}
return 2048 //nolint:gomnd
}
55 changes: 38 additions & 17 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"os"
"reflect"
"slices"
"sync/atomic"
"time"
Expand All @@ -37,7 +38,6 @@ import (
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
"github.com/vdaas/vald/pkg/index/job/correction/config"
)

type contextTimeKey string
Expand All @@ -60,7 +60,6 @@ type Corrector interface {
}

type correct struct {
cfg *config.Data
discoverer discoverer.Client
agentAddrs []string
indexInfos sync.Map[string, *payload.Info_Index_Count]
Expand All @@ -70,24 +69,46 @@ type correct struct {
checkedIndexCount atomic.Uint64
correctedOldIndexCount atomic.Uint64
correctedReplicationCount atomic.Uint64

indexReplica int
streamListConcurrency int
bboltAsyncWriteConcurrency int
}

const filemode = 0o600

func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) {
d := file.Join(os.TempDir(), "bbolt")
file.MkdirAll(d, os.ModePerm)
dbfile := file.Join(d, "checkedid.db")
bolt, err := bbolt.New(dbfile, "", os.FileMode(filemode))
if err != nil {
func New(opts ...Option) (_ Corrector, err error) {
c := new(correct)
for _, opt := range append(defaultOpts, opts...) {
if err := opt(c); err != nil {
oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt))
e := &errors.ErrCriticalOption{}
if errors.As(oerr, &e) {
log.Error(err)
return nil, oerr
}
log.Warn(oerr)
}
}
if err := c.kvsInit(); err != nil {
return nil, err
}
return c, nil
}

func (c *correct) kvsInit() error {
dpath := file.Join(os.TempDir(), "bbolt")
err := file.MkdirAll(dpath, os.ModePerm)
if err != nil {
return err
}

return &correct{
cfg: cfg,
discoverer: discoverer,
checkedID: bolt,
}, nil
dbfile := file.Join(dpath, "checkedid.db")
c.checkedID, err = bbolt.New(dbfile, "", os.FileMode(filemode))
if err != nil {
return err
}
return nil
}

func (c *correct) StartClient(ctx context.Context) (<-chan error, error) {
Expand Down Expand Up @@ -161,7 +182,7 @@ func (c *correct) correct(ctx context.Context) (err error) {
}

curTargetAgent := 0
jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency)
jobErrs := make([]error, 0, c.streamListConcurrency)
if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) {
// current address is the leftAgentAddrs[0] because this is OrderedRange and
Expand All @@ -178,12 +199,12 @@ func (c *correct) correct(ctx context.Context) (err error) {
sctx, scancel := context.WithCancel(ctx)
defer scancel()
seg, sctx := errgroup.WithContext(sctx)
sconcurrency := c.cfg.Corrector.GetStreamListConcurrency()
sconcurrency := c.streamListConcurrency
seg.SetLimit(sconcurrency)

// errgroup for bbolt AsyncSet
bolteg, ctx := errgroup.WithContext(ctx)
bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency()
bconcurrency := c.bboltAsyncWriteConcurrency
bolteg.SetLimit(bconcurrency)

log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency)
Expand Down Expand Up @@ -419,7 +440,7 @@ func (c *correct) correctReplica(
) error {
// diff < 0 means there is less replica than the correct number
existReplica := len(foundReplicas) + 1
diff := existReplica - c.cfg.Corrector.IndexReplica
diff := existReplica - c.indexReplica
if diff == 0 {
// replica number is correct
return nil
Expand Down
10 changes: 2 additions & 8 deletions pkg/index/job/correction/service/corrector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
tmock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/vdaas/vald/apis/grpc/v1/payload"
iconfig "github.com/vdaas/vald/internal/config"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/test/mock"
"github.com/vdaas/vald/pkg/index/job/correction/config"
)

type mockDiscovererClient struct {
Expand Down Expand Up @@ -430,12 +428,8 @@ func Test_correct_correctReplica(t *testing.T) {
for _, tc := range tests {
test := tc
c := &correct{
discoverer: &m,
cfg: &config.Data{
Corrector: &iconfig.Corrector{
IndexReplica: test.args.indexReplica,
},
},
discoverer: &m,
indexReplica: test.args.indexReplica,
}

// agentAddrs = availableAddrs + target.addr + found.addr
Expand Down
71 changes: 71 additions & 0 deletions pkg/index/job/correction/service/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service

import (
"github.com/vdaas/vald/internal/client/v1/client/discoverer"
"github.com/vdaas/vald/internal/errors"
)

// Option represents the functional option for index corrector.
type Option func(*correct) error

var defaultOpts = []Option{
WithStreamListConcurrency(200), //nolint:gomnd
WithBboltAsyncWriteConcurrency(2048), //nolint:gomnd
}

// WithIndexReplica returns Option that sets index replica.
func WithIndexReplica(num int) Option {
return func(c *correct) error {
if num <= 1 {
return errors.NewErrCriticalOption("indexReplica", num, errors.ErrIndexReplicaOne)
}
c.indexReplica = num
return nil
}
}

// WithDiscoverer returns Option that sets discoverer client.
func WithDiscoverer(client discoverer.Client) Option {
return func(c *correct) error {
if client == nil {
return errors.NewErrCriticalOption("discoverer", client)
}
c.discoverer = client
return nil
}
}

// WithStreamListConcurrency returns Option that sets concurrency for StreamList field value.
func WithStreamListConcurrency(num int) Option {
return func(c *correct) error {
if num <= 0 {
return errors.NewErrInvalidOption("streamListConcurrency", num)
}
c.streamListConcurrency = num
return nil
}
}

// WithBboltAsyncWriteConcurrency returns Option that sets concurrency for kvs async write.
func WithBboltAsyncWriteConcurrency(num int) Option {
return func(c *correct) error {
if num <= 0 {
return errors.NewErrInvalidOption("bboltAsyncWriteConcurrency", num)
}
c.bboltAsyncWriteConcurrency = num
return nil
}
}
12 changes: 6 additions & 6 deletions pkg/index/job/correction/usecase/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/vdaas/vald/internal/client/v1/client/discoverer"
iconf "github.com/vdaas/vald/internal/config"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover"
Expand All @@ -45,10 +44,6 @@ type run struct {
}

func New(cfg *config.Data) (r runner.Runner, err error) {
if cfg.Corrector.IndexReplica == 1 {
return nil, errors.ErrIndexReplicaOne
}

eg := errgroup.Get()

dOpts, err := cfg.Corrector.Discoverer.Client.Opts()
Expand Down Expand Up @@ -105,7 +100,12 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
return nil, err
}

corrector, err := service.New(cfg, discoverer)
corrector, err := service.New(
service.WithDiscoverer(discoverer),
service.WithIndexReplica(cfg.Corrector.IndexReplica),
service.WithBboltAsyncWriteConcurrency(cfg.Corrector.BboltAsyncWriteConcurrency),
service.WithStreamListConcurrency(cfg.Corrector.StreamListConcurrency),
)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d23c26a

Please sign in to comment.