Skip to content

Commit

Permalink
Add job error to report index correction error status (#2231)
Browse files Browse the repository at this point in the history
* Fix to monitor discoverer error for index correction

* Add errors.RemoveDuplicates

* Add jobErrs to report job status of index correction

* Simplify function literal

* Update Corrector interface method name to
StartMonitoring for clearity

* Removed unnecessary error handling

* Update Corrector interface method name to
StartClient

* Fix error handling in corrector.go to catch all the err in defer function

---------

Co-authored-by: Kiichiro YUKAWA <[email protected]>
  • Loading branch information
ykadowak and vankichi authored Nov 9, 2023
1 parent c5cbfc9 commit dfc9c2f
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 42 deletions.
16 changes: 16 additions & 0 deletions internal/errors/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,24 @@ package errors
// ErrIndexReplicaOne represents an error that nothing to correct when index replica is 1.
var ErrIndexReplicaOne = New("nothing to correct when index replica is 1")

// ErrAgentReplicaOne represents an error that nothing to correct when agent replica is 1.
var ErrAgentReplicaOne = New("nothing to correct when agent replica is 1")

// ErrNoAvailableAgentToInsert represents an error that no available agent to insert replica.
var ErrNoAvailableAgentToInsert = New("no available agent to insert replica")

// ErrFailedToCorrectReplicaNum represents an error that failed to correct replica number after correction process.
var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process")

// ErrFailedToReceiveVectorFromStream represents an error that failed to receive vector from stream while index correction process.
var ErrFailedToReceiveVectorFromStream = New("failed to receive vector from stream")

// ErrFailedToCheckConsistency represents an error that failed to check consistency process while index correction process.
var ErrFailedToCheckConsistency = func(err error) error {
return Wrap(err, "failed to check consistency while index correctioin process")
}

// ErrStreamListObjectStreamFinishedUnexpectedly represents an error that StreamListObject finished not because of io.EOF.
var ErrStreamListObjectStreamFinishedUnexpectedly = func(err error) error {
return Wrap(err, "stream list object stream finished unexpectedly")
}
12 changes: 12 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package errors

import (
"cmp"
"errors"
"fmt"
"reflect"
"runtime"
"slices"
"strings"

"github.com/vdaas/vald/internal/sync"
Expand Down Expand Up @@ -279,6 +281,16 @@ func Join(errs ...error) error {
return e
}

func RemoveDuplicates(errs []error) []error {
if len(errs) < 2 {
return errs
}
slices.SortStableFunc(errs, func(l error, r error) int {
return cmp.Compare(l.Error(), r.Error())
})
return slices.CompactFunc(errs, Is)
}

type joinError struct {
errs []error
}
Expand Down
68 changes: 68 additions & 0 deletions internal/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,74 @@ func TestAs(t *testing.T) {
}
}

func TestRemoveDuplicates(t *testing.T) {
type args struct {
errs []error
}
tests := []struct {
name string
args args
want []error
}{
{
name: "succeeds to remove duplicated errors",
args: args{
errs: []error{
New("same error1"),
New("same error1"),
New("same error2"),
New("same error2"),
New("same error2"),
New("same error3"),
},
},
want: []error{
New("same error1"),
New("same error2"),
New("same error3"),
},
},
{
name: "single error remains the same",
args: args{
errs: []error{
New("same error"),
},
},
want: []error{
New("same error"),
},
},
{
name: "empty errs remains the same",
args: args{
errs: []error{},
},
want: []error{},
},
}

equalErrs := func(errs1, errs2 []error) bool {
if len(errs1) != len(errs2) {
return false
}
for i := range errs1 {
if !Is(errs1[i], errs2[i]) {
return false
}
}
return true
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := RemoveDuplicates(tt.args.errs); !equalErrs(got, tt.want) {
t.Errorf("removeDuplicatedErrs() = %v, want %v", got, tt.want)
}
})
}
}

// NOT IMPLEMENTED BELOW
//
// func TestUnwrap(t *testing.T) {
Expand Down
14 changes: 1 addition & 13 deletions internal/net/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package grpc

import (
"cmp"
"context"
"fmt"
"runtime"
"slices"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
Expand Down Expand Up @@ -75,9 +73,7 @@ func BidirectionalStream[Q any, R any](ctx context.Context, stream ServerStream,
errs = append(errs, err)
emu.Unlock()
}
removeDuplicates(errs, func(left, right error) int {
return cmp.Compare(left.Error(), right.Error())
})
errs := errors.RemoveDuplicates(errs)
emu.Lock()
err = errors.Join(errs...)
emu.Unlock()
Expand Down Expand Up @@ -229,11 +225,3 @@ func BidirectionalStreamClient(stream ClientStream,
}
}()
}

func removeDuplicates[S ~[]E, E comparable](x S, less func(left, right E) int) S {
if len(x) < 2 {
return x
}
slices.SortStableFunc(x, less)
return slices.Compact(x)
}
50 changes: 27 additions & 23 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const (
)

type Corrector interface {
Start(ctx context.Context) (<-chan error, error)
Start(ctx context.Context) error
StartClient(ctx context.Context) (<-chan error, error)
PreStop(ctx context.Context) error
// For metrics
NumberOfCheckedIndex() uint64
Expand Down Expand Up @@ -89,15 +90,14 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) {
}, nil
}

func (c *correct) Start(ctx context.Context) (<-chan error, error) {
func (c *correct) StartClient(ctx context.Context) (<-chan error, error) {
return c.discoverer.Start(ctx)
}

func (c *correct) Start(ctx context.Context) error {
// set current time to context
ctx = embedTime(ctx)

dech, err := c.discoverer.Start(ctx)
if err != nil {
return nil, err
}

// addrs is sorted by the memory usage of each agent(descending order)
// this is decending because it's supposed to be used for index manager to decide
// which pod to make a create index rpc(higher memory, first to commit)
Expand All @@ -106,12 +106,12 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) {

if l := len(c.agentAddrs); l <= 1 {
log.Warn("only %d agent found, there must be more than two agents for correction to happen", l)
return nil, err
return errors.ErrAgentReplicaOne
}

err = c.loadInfos(ctx)
err := c.loadInfos(ctx)
if err != nil {
return nil, err
return err
}

c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool {
Expand All @@ -122,11 +122,11 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) {
log.Info("starting correction with bbolt disk cache...")
if err := c.correct(ctx); err != nil {
log.Errorf("there's some errors while correction: %v", err)
return nil, err
return err
}
log.Info("correction finished successfully")

return dech, nil
return nil
}

func (c *correct) PreStop(_ context.Context) error {
Expand Down Expand Up @@ -161,11 +161,16 @@ func (c *correct) correct(ctx context.Context) (err error) {
}

curTargetAgent := 0
jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency)
if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) {
// current address is the leftAgentAddrs[0] because this is OrderedRange and
// leftAgentAddrs is copied from c.agentAddrs
defer func() {
if err != nil {
// catch the err that happened in this scope using named return err
jobErrs = append(jobErrs, err)
}
curTargetAgent++
}()

Expand All @@ -181,7 +186,6 @@ func (c *correct) correct(ctx context.Context) (err error) {
bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency()
bolteg.SetLimit(bconcurrency)

var mu sync.Mutex
log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency)

vc := vald.NewValdClient(conn)
Expand All @@ -190,6 +194,7 @@ func (c *correct) correct(ctx context.Context) (err error) {
return err
}

var mu sync.Mutex
// The number of items to be received in advance is not known in advance.
// This is because there is a possibility of new items being inserted during processing.
for {
Expand Down Expand Up @@ -230,16 +235,14 @@ func (c *correct) correct(ctx context.Context) (err error) {
return nil
}
if err != nil {
log.Errorf("StreamListObject stream finished unexpectedly: %v", err)
return err
return errors.ErrStreamListObjectStreamFinishedUnexpectedly(err)
}

vec := res.GetVector()
if vec == nil {
st := res.GetStatus()
log.Error(st.GetCode(), st.GetMessage(), st.GetDetails())
// continue
return nil
return errors.ErrFailedToReceiveVectorFromStream
}

// skip if the vector is inserted after correction start
Expand All @@ -256,7 +259,7 @@ func (c *correct) correct(ctx context.Context) (err error) {
id := vec.GetId()
_, ok, err := c.checkedID.Get([]byte(id))
if err != nil {
log.Errorf("failed to perform Get from bbolt: %v", err)
log.Errorf("failed to perform Get from bbolt but still try to finish processing without cache: %v", err)
}
if ok {
// already checked index
Expand All @@ -271,8 +274,7 @@ func (c *correct) correct(ctx context.Context) (err error) {
},
curTargetAgent,
); err != nil {
log.Errorf("failed to check consistency: %v", err)
return nil // continue other processes
return errors.ErrFailedToCheckConsistency(err)
}

// now this id is checked so set it to the disk cache
Expand All @@ -285,11 +287,13 @@ func (c *correct) correct(ctx context.Context) (err error) {
}
},
); err != nil {
log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err)
// This only happnes when ErrGRPCClientConnNotFound is returned.
// In other cases, OrderedRange continues processing, so jobErrrs is used to keep track of the error status of correction.
return err
}

return nil
jobErrs = errors.RemoveDuplicates(jobErrs)
return errors.Join(jobErrs...)
}

type vectorReplica struct {
Expand Down
18 changes: 12 additions & 6 deletions pkg/index/job/correction/usecase/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,19 @@ func (r *run) PreStart(ctx context.Context) error {
func (r *run) Start(ctx context.Context) (<-chan error, error) {
log.Info("starting servers")
ech := make(chan error, 3) //nolint:gomnd
var oech, nech, sech <-chan error
var oech <-chan error
if r.observability != nil {
oech = r.observability.Start(ctx)
}
sech := r.server.ListenAndServe(ctx)
nech, err := r.corrector.StartClient(ctx)
if err != nil {
close(ech)
return nil, err
}

r.eg.Go(safety.RecoverFunc(func() (err error) {
defer close(ech)
if r.observability != nil {
oech = r.observability.Start(ctx)
}
sech = r.server.ListenAndServe(ctx)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -189,7 +195,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) {
}()

start := time.Now()
_, err = r.corrector.Start(ctx)
err = r.corrector.Start(ctx)
if err != nil {
log.Errorf("index correction process failed: %v", err)
return err
Expand Down

0 comments on commit dfc9c2f

Please sign in to comment.