Skip to content

Commit

Permalink
[BUGFIX] add Health Check for Range over gRPC Connection Loop
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Jan 29, 2025
1 parent c3b5832 commit 5582529
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 89 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ LDFLAGS = -static -fPIC -pthread -std=gnu++23 -lstdc++ -lm -z relro -z now -flto

NGT_LDFLAGS = -fopenmp -lopenblas -llapack
FAISS_LDFLAGS = $(NGT_LDFLAGS) -lgfortran
HDF5_LDFLAGS = -lhdf5 -lhdf5_hl -lsz -laec -lz -ldl
HDF5_LDFLAGS = -lhdf5 -lhdf5_hl -lsz -laec -lz -ldl -lcurl -lssl -lcrypt
CGO_LDFLAGS = $(FAISS_LDFLAGS) $(HDF5_LDFLAGS)

ifeq ($(GOARCH),amd64)
Expand Down
7 changes: 2 additions & 5 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package backoff

import (
"context"
"maps"
"math"
"strconv"
"time"
Expand Down Expand Up @@ -240,9 +241,5 @@ func Metrics(context.Context) map[string]int64 {
return nil
}

m := make(map[string]int64, len(metrics))
for name, cnt := range metrics {
m[name] = cnt
}
return m
return maps.Clone(metrics)

Check warning on line 244 in internal/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

internal/backoff/backoff.go#L244

Added line #L244 was not covered by tests
}
60 changes: 41 additions & 19 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Client interface {
GetCallOption() []CallOption
GetBackoff() backoff.Backoff
SetDisableResolveDNSAddr(addr string, disabled bool)
ConnectedAddrs() []string
ConnectedAddrs(context.Context) []string
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
return ctx.Err()
case <-prTick.C:
if g.enablePoolRebalance {
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, true, func(addr string, p pool.Conn) bool {
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
})
}
case <-hcTick.C:
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, true, func(addr string, p pool.Conn) bool {
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -415,7 +415,7 @@ func (g *gRPCClient) Range(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, false, func(addr string, p pool.Conn) bool {
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, false, func(addr string, p pool.Conn) bool {
eg.Go(safety.RecoverFunc(func() (err error) {
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr)
defer func() {
Expand Down Expand Up @@ -565,7 +565,7 @@ func (g *gRPCClient) OrderedRange(
return nil
default:
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
continue
Expand Down Expand Up @@ -634,7 +634,7 @@ func (g *gRPCClient) OrderedRangeConcurrent(
addr := order
eg.Go(safety.RecoverFunc(func() (err error) {
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
return nil
Expand Down Expand Up @@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin(
}

do := func() (data any, err error) {
cerr := g.rangeConns(func(addr string, p pool.Conn) bool {
cerr := g.rangeConns(ctx, false, func(addr string, p pool.Conn) bool {
select {
case <-ctx.Done():
err = ctx.Err()
Expand Down Expand Up @@ -879,14 +879,14 @@ func (g *gRPCClient) connectWithBackoff(
errors.Is(err, context.DeadlineExceeded) {
return nil, false, err
}
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err
}
status.Log(st.Code(), err)
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err
}
return nil, false, err
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
atomic.AddUint64(&g.clientCount, ^uint64(0))
if p != nil {
log.Debugf("gRPC client connection pool addr = %s will disconnect soon...", addr)
return nil, p.Disconnect()
return nil, p.Disconnect(ctx)
}
return nil, nil
})
Expand All @@ -1085,10 +1085,10 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
return nil
}

func (g *gRPCClient) ConnectedAddrs() (addrs []string) {
func (g *gRPCClient) ConnectedAddrs(ctx context.Context) (addrs []string) {
addrs = make([]string, 0, g.conns.Len())
err := g.rangeConns(func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(context.Background()) {
err := g.rangeConns(ctx, false, func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(ctx) {
addrs = append(addrs, addr)
}
return true
Expand All @@ -1104,18 +1104,40 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) {
g.stopMonitor()
}
g.conns.Range(func(addr string, p pool.Conn) bool {
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
select {
case <-ctx.Done():
return false
default:
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
}
return true
}
return true
})
return err
}

func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error {
func (g *gRPCClient) rangeConns(ctx context.Context, force bool, fn func(addr string, p pool.Conn) bool) error {
var cnt int
g.conns.Range(func(addr string, p pool.Conn) bool {
if force {
cnt++
return fn(addr, p)
}
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
if derr := pc.Disconnect(ctx); derr != nil {
log.Debugf("Failed to disconnect unhealthy connection for %s: %v", addr, derr)
}
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
return true
}
p = pc
}
cnt++
return fn(addr, p)
})
Expand Down
3 changes: 2 additions & 1 deletion internal/net/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3107,7 +3107,7 @@ package grpc
// stopMonitor: test.fields.stopMonitor,
// }
//
// gotAddrs := g.ConnectedAddrs()
gotAddrs := g.ConnectedAddrs(context.Background())
// if err := checkFunc(test.want, gotAddrs); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down Expand Up @@ -3303,6 +3303,7 @@ package grpc
//
// func Test_gRPCClient_rangeConns(t *testing.T) {
// type args struct {
// ctx context.Context
// fn func(addr string, p pool.Conn) bool
// }
// type fields struct {
Expand Down
Loading

0 comments on commit 5582529

Please sign in to comment.