Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job error to report index correction error status #2231

Merged
merged 11 commits into from
Nov 9, 2023
16 changes: 16 additions & 0 deletions internal/errors/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,24 @@
// ErrIndexReplicaOne represents an error that nothing to correct when index replica is 1.
var ErrIndexReplicaOne = New("nothing to correct when index replica is 1")

// ErrAgentReplicaOne represents an error that nothing to correct when agent replica is 1.
var ErrAgentReplicaOne = New("nothing to correct when agent replica is 1")

// ErrNoAvailableAgentToInsert represents an error that no available agent to insert replica.
var ErrNoAvailableAgentToInsert = New("no available agent to insert replica")

// ErrFailedToCorrectReplicaNum represents an error that failed to correct replica number after correction process.
var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process")

// ErrFailedToReceiveVectorFromStream represents an error that failed to receive vector from stream while index correction process.
var ErrFailedToReceiveVectorFromStream = New("failed to receive vector from stream")

// ErrFailedToCheckConsistency represents an error that failed to check consistency process while index correction process.
var ErrFailedToCheckConsistency = func(err error) error {
return Wrap(err, "failed to check consistency while index correctioin process")

Check warning on line 37 in internal/errors/corrector.go

View check run for this annotation

Codecov / codecov/patch

internal/errors/corrector.go#L36-L37

Added lines #L36 - L37 were not covered by tests
}

// ErrStreamListObjectStreamFinishedUnexpectedly represents an error that StreamListObject finished not because of io.EOF.
var ErrStreamListObjectStreamFinishedUnexpectedly = func(err error) error {
return Wrap(err, "stream list object stream finished unexpectedly")

Check warning on line 42 in internal/errors/corrector.go

View check run for this annotation

Codecov / codecov/patch

internal/errors/corrector.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}
12 changes: 12 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package errors

import (
"cmp"
"errors"
"fmt"
"reflect"
"runtime"
"slices"
"strings"

"github.com/vdaas/vald/internal/sync"
Expand Down Expand Up @@ -279,6 +281,16 @@ func Join(errs ...error) error {
return e
}

func RemoveDuplicates(errs []error) []error {
if len(errs) < 2 {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
return errs
}
slices.SortStableFunc(errs, func(l error, r error) int {
return cmp.Compare(l.Error(), r.Error())
})
return slices.CompactFunc(errs, Is)
}

type joinError struct {
errs []error
}
Expand Down
68 changes: 68 additions & 0 deletions internal/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,74 @@ func TestAs(t *testing.T) {
}
}

func TestRemoveDuplicates(t *testing.T) {
type args struct {
errs []error
}
tests := []struct {
name string
args args
want []error
}{
{
name: "succeeds to remove duplicated errors",
args: args{
errs: []error{
New("same error1"),
New("same error1"),
New("same error2"),
New("same error2"),
New("same error2"),
New("same error3"),
},
},
want: []error{
New("same error1"),
New("same error2"),
New("same error3"),
},
},
{
name: "single error remains the same",
args: args{
errs: []error{
New("same error"),
},
},
want: []error{
New("same error"),
},
},
{
name: "empty errs remains the same",
args: args{
errs: []error{},
},
want: []error{},
},
}

equalErrs := func(errs1, errs2 []error) bool {
if len(errs1) != len(errs2) {
return false
}
for i := range errs1 {
if !Is(errs1[i], errs2[i]) {
return false
}
}
return true
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := RemoveDuplicates(tt.args.errs); !equalErrs(got, tt.want) {
t.Errorf("removeDuplicatedErrs() = %v, want %v", got, tt.want)
}
})
}
}

// NOT IMPLEMENTED BELOW
//
// func TestUnwrap(t *testing.T) {
Expand Down
14 changes: 1 addition & 13 deletions internal/net/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package grpc

import (
"cmp"
"context"
"fmt"
"runtime"
"slices"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
Expand Down Expand Up @@ -75,9 +73,7 @@ func BidirectionalStream[Q any, R any](ctx context.Context, stream ServerStream,
errs = append(errs, err)
emu.Unlock()
}
removeDuplicates(errs, func(left, right error) int {
return cmp.Compare(left.Error(), right.Error())
})
errs := errors.RemoveDuplicates(errs)
emu.Lock()
err = errors.Join(errs...)
emu.Unlock()
Expand Down Expand Up @@ -229,11 +225,3 @@ func BidirectionalStreamClient(stream ClientStream,
}
}()
}

func removeDuplicates[S ~[]E, E comparable](x S, less func(left, right E) int) S {
if len(x) < 2 {
return x
}
slices.SortStableFunc(x, less)
return slices.Compact(x)
}
50 changes: 30 additions & 20 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
)

type Corrector interface {
Start(ctx context.Context) (<-chan error, error)
Start(ctx context.Context) error
StartMonitoring(ctx context.Context) (<-chan error, error)
PreStop(ctx context.Context) error
// For metrics
NumberOfCheckedIndex() uint64
Expand Down Expand Up @@ -89,14 +90,17 @@
}, nil
}

func (c *correct) Start(ctx context.Context) (<-chan error, error) {
// set current time to context
ctx = embedTime(ctx)

func (c *correct) StartMonitoring(ctx context.Context) (<-chan error, error) {

Check warning on line 93 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L93

Added line #L93 was not covered by tests
dech, err := c.discoverer.Start(ctx)
if err != nil {
return nil, err
}
return dech, nil

Check warning on line 98 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L98

Added line #L98 was not covered by tests
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *correct) Start(ctx context.Context) error {
// set current time to context
ctx = embedTime(ctx)

Check warning on line 103 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L101-L103

Added lines #L101 - L103 were not covered by tests

// addrs is sorted by the memory usage of each agent(descending order)
// this is decending because it's supposed to be used for index manager to decide
Expand All @@ -106,12 +110,12 @@

if l := len(c.agentAddrs); l <= 1 {
log.Warn("only %d agent found, there must be more than two agents for correction to happen", l)
return nil, err
return errors.ErrAgentReplicaOne

Check warning on line 113 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L113

Added line #L113 was not covered by tests
}

err = c.loadInfos(ctx)
err := c.loadInfos(ctx)

Check warning on line 116 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L116

Added line #L116 was not covered by tests
if err != nil {
return nil, err
return err

Check warning on line 118 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L118

Added line #L118 was not covered by tests
}

c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool {
Expand All @@ -122,11 +126,11 @@
log.Info("starting correction with bbolt disk cache...")
if err := c.correct(ctx); err != nil {
log.Errorf("there's some errors while correction: %v", err)
return nil, err
return err

Check warning on line 129 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L129

Added line #L129 was not covered by tests
}
log.Info("correction finished successfully")

return dech, nil
return nil

Check warning on line 133 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L133

Added line #L133 was not covered by tests
}

func (c *correct) PreStop(_ context.Context) error {
Expand Down Expand Up @@ -161,6 +165,7 @@
}

curTargetAgent := 0
jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency)

Check warning on line 168 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L168

Added line #L168 was not covered by tests
if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
// current address is the leftAgentAddrs[0] because this is OrderedRange and
Expand All @@ -181,15 +186,16 @@
bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency()
bolteg.SetLimit(bconcurrency)

var mu sync.Mutex
log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency)

vc := vald.NewValdClient(conn)
stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{})
if err != nil {
jobErrs = append(jobErrs, err)

Check warning on line 194 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L194

Added line #L194 was not covered by tests
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
return err
}

var mu sync.Mutex

Check warning on line 198 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L198

Added line #L198 was not covered by tests
// The number of items to be received in advance is not known in advance.
// This is because there is a possibility of new items being inserted during processing.
for {
Expand All @@ -213,6 +219,11 @@
log.Info("bbolt all batch finished")
}

// Aggregate errors for job status
if err != nil {
jobErrs = append(jobErrs, err)
}

Check warning on line 225 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L223-L225

Added lines #L223 - L225 were not covered by tests

log.Infof("correction finished for agent %s", addr)
return err

Expand All @@ -230,16 +241,14 @@
return nil
}
if err != nil {
log.Errorf("StreamListObject stream finished unexpectedly: %v", err)
return err
return errors.ErrStreamListObjectStreamFinishedUnexpectedly(err)

Check warning on line 244 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L244

Added line #L244 was not covered by tests
}

vec := res.GetVector()
if vec == nil {
st := res.GetStatus()
log.Error(st.GetCode(), st.GetMessage(), st.GetDetails())
// continue
return nil
return errors.ErrFailedToReceiveVectorFromStream

Check warning on line 251 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L251

Added line #L251 was not covered by tests
}

// skip if the vector is inserted after correction start
Expand All @@ -256,7 +265,7 @@
id := vec.GetId()
_, ok, err := c.checkedID.Get([]byte(id))
if err != nil {
log.Errorf("failed to perform Get from bbolt: %v", err)
log.Errorf("failed to perform Get from bbolt but still try to finish processing without cache: %v", err)

Check warning on line 268 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L268

Added line #L268 was not covered by tests
}
if ok {
// already checked index
Expand All @@ -271,8 +280,7 @@
},
curTargetAgent,
); err != nil {
log.Errorf("failed to check consistency: %v", err)
return nil // continue other processes
return errors.ErrFailedToCheckConsistency(err)

Check warning on line 283 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L283

Added line #L283 was not covered by tests
}

// now this id is checked so set it to the disk cache
Expand All @@ -285,11 +293,13 @@
}
},
); err != nil {
log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err)
// This only happnes when ErrGRPCClientConnNotFound is returned.
// In other cases, OrderedRange continues processing, so jobErrrs is used to keep track of the error status of correction.

Check warning on line 297 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L296-L297

Added lines #L296 - L297 were not covered by tests
return err
}

return nil
jobErrs = errors.RemoveDuplicates(jobErrs)
return errors.Join(jobErrs...)

Check warning on line 302 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L301-L302

Added lines #L301 - L302 were not covered by tests
}

type vectorReplica struct {
Expand Down
18 changes: 12 additions & 6 deletions pkg/index/job/correction/usecase/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,19 @@
func (r *run) Start(ctx context.Context) (<-chan error, error) {
log.Info("starting servers")
ech := make(chan error, 3) //nolint:gomnd
var oech, nech, sech <-chan error
var oech <-chan error
if r.observability != nil {
oech = r.observability.Start(ctx)
}
sech := r.server.ListenAndServe(ctx)
nech, err := r.corrector.StartMonitoring(ctx)
if err != nil {
close(ech)
return nil, err
}

Check warning on line 157 in pkg/index/job/correction/usecase/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/usecase/corrector.go#L148-L157

Added lines #L148 - L157 were not covered by tests

r.eg.Go(safety.RecoverFunc(func() (err error) {
defer close(ech)
if r.observability != nil {
oech = r.observability.Start(ctx)
}
sech = r.server.ListenAndServe(ctx)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -189,7 +195,7 @@
}()

start := time.Now()
_, err = r.corrector.Start(ctx)
err = r.corrector.Start(ctx)

Check warning on line 198 in pkg/index/job/correction/usecase/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/usecase/corrector.go#L198

Added line #L198 was not covered by tests
if err != nil {
log.Errorf("index correction process failed: %v", err)
return err
Expand Down
Loading