Skip to content

Commit

Permalink
[BUGFIX] add Health Check for Range over gRPC Connection Loop
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Jan 23, 2025
1 parent 6065fd9 commit 82b29ce
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 57 deletions.
54 changes: 35 additions & 19 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Client interface {
GetCallOption() []CallOption
GetBackoff() backoff.Backoff
SetDisableResolveDNSAddr(addr string, disabled bool)
ConnectedAddrs() []string
ConnectedAddrs(context.Context) []string
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
return ctx.Err()
case <-prTick.C:
if g.enablePoolRebalance {
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 252 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L252

Added line #L252 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
})
}
case <-hcTick.C:
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 289 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L289

Added line #L289 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -415,7 +415,7 @@ func (g *gRPCClient) Range(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 418 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L418

Added line #L418 was not covered by tests
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 481 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L481

Added line #L481 was not covered by tests
eg.Go(safety.RecoverFunc(func() (err error) {
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr)
defer func() {
Expand Down Expand Up @@ -565,7 +565,7 @@ func (g *gRPCClient) OrderedRange(
return nil
default:
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 568 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L568

Added line #L568 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
continue
Expand Down Expand Up @@ -634,7 +634,7 @@ func (g *gRPCClient) OrderedRangeConcurrent(
addr := order
eg.Go(safety.RecoverFunc(func() (err error) {
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 637 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L637

Added line #L637 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
return nil
Expand Down Expand Up @@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin(
}

do := func() (data any, err error) {
cerr := g.rangeConns(func(addr string, p pool.Conn) bool {
cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 704 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L704

Added line #L704 was not covered by tests
select {
case <-ctx.Done():
err = ctx.Err()
Expand Down Expand Up @@ -879,14 +879,14 @@ func (g *gRPCClient) connectWithBackoff(
errors.Is(err, context.DeadlineExceeded) {
return nil, false, err
}
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err

Check warning on line 882 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L882

Added line #L882 was not covered by tests
}
status.Log(st.Code(), err)
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err

Check warning on line 889 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L889

Added line #L889 was not covered by tests
}
return nil, false, err
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
atomic.AddUint64(&g.clientCount, ^uint64(0))
if p != nil {
log.Debugf("gRPC client connection pool addr = %s will disconnect soon...", addr)
return nil, p.Disconnect()
return nil, p.Disconnect(ctx)

Check warning on line 1069 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1069

Added line #L1069 was not covered by tests
}
return nil, nil
})
Expand All @@ -1085,10 +1085,10 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
return nil
}

func (g *gRPCClient) ConnectedAddrs() (addrs []string) {
func (g *gRPCClient) ConnectedAddrs(ctx context.Context) (addrs []string) {

Check warning on line 1088 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1088

Added line #L1088 was not covered by tests
addrs = make([]string, 0, g.conns.Len())
err := g.rangeConns(func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(context.Background()) {
err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(ctx) {

Check warning on line 1091 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1090-L1091

Added lines #L1090 - L1091 were not covered by tests
addrs = append(addrs, addr)
}
return true
Expand All @@ -1104,18 +1104,34 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) {
g.stopMonitor()
}
g.conns.Range(func(addr string, p pool.Conn) bool {
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
select {
case <-ctx.Done():
return false
default:
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
}
return true

Check warning on line 1115 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1107-L1115

Added lines #L1107 - L1115 were not covered by tests
}
return true
})
return err
}

func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error {
func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error {

Check warning on line 1121 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1121

Added line #L1121 was not covered by tests
var cnt int
g.conns.Range(func(addr string, p pool.Conn) bool {
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
pc.Disconnect(ctx)
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
return true

Check warning on line 1131 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1124-L1131

Added lines #L1124 - L1131 were not covered by tests
}
p = pc

Check warning on line 1133 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1133

Added line #L1133 was not covered by tests
}
cnt++
return fn(addr, p)
})
Expand Down
3 changes: 2 additions & 1 deletion internal/net/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3107,7 +3107,7 @@ package grpc
// stopMonitor: test.fields.stopMonitor,
// }
//
// gotAddrs := g.ConnectedAddrs()
// gotAddrs := g.ConnectedAddrs(context.Background)
// if err := checkFunc(test.want, gotAddrs); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down Expand Up @@ -3303,6 +3303,7 @@ package grpc
//
// func Test_gRPCClient_rangeConns(t *testing.T) {
// type args struct {
// ctx context.Context
// fn func(addr string, p pool.Conn) bool
// }
// type fields struct {
Expand Down
65 changes: 32 additions & 33 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/vdaas/vald/internal/strings"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
"github.com/vdaas/vald/internal/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
Expand All @@ -45,9 +46,9 @@ type (

type Conn interface {
Connect(context.Context) (Conn, error)
Disconnect() error
Disconnect(context.Context) error
Do(ctx context.Context, f func(*ClientConn) error) error
Get(ctx context.Context) (conn *ClientConn, ok bool)
Get(context.Context) (conn *ClientConn, ok bool)
IsHealthy(context.Context) bool
IsIPConn() bool
Len() uint64
Expand All @@ -72,6 +73,7 @@ type pool struct {
current atomic.Uint64
bo backoff.Backoff
eg errgroup.Group
group singleflight.Group[Conn]
dopts []DialOption
dialTimeout time.Duration
roccd time.Duration // reconnection old connection closing duration
Expand All @@ -94,6 +96,8 @@ func New(ctx context.Context, opts ...Option) (c Conn, err error) {
p.init(true)
p.closing.Store(false)

p.group = singleflight.New[Conn]()

Check warning on line 100 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L99-L100

Added lines #L99 - L100 were not covered by tests
var (
isIPv4, isIPv6 bool
port uint16
Expand Down Expand Up @@ -222,11 +226,11 @@ func (p *pool) store(idx int, pc *poolConn) {
return
}
p.init(false)
p.pmu.RLock()
p.pmu.Lock()

Check warning on line 229 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L229

Added line #L229 was not covered by tests
if p.pool != nil && p.Size() > uint64(idx) && len(p.pool) > idx {
p.pool[idx].Store(pc)
}
p.pmu.RUnlock()
p.pmu.Unlock()

Check warning on line 233 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L233

Added line #L233 was not covered by tests
}

func (p *pool) loop(
Expand Down Expand Up @@ -335,7 +339,6 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) {
if p == nil || p.closing.Load() {
return p, nil
}

p.init(false)

if p.isIP || !p.resolveDNS {
Expand Down Expand Up @@ -437,8 +440,7 @@ func (p *pool) singleTargetConnect(ctx context.Context) (c Conn, err error) {
return p, nil
}

func (p *pool) Disconnect() (err error) {
ctx := context.Background()
func (p *pool) Disconnect(ctx context.Context) (err error) {

Check warning on line 443 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L443

Added line #L443 was not covered by tests
p.closing.Store(true)
defer p.closing.Store(false)
emap := make(map[string]error, p.len())
Expand Down Expand Up @@ -572,40 +574,41 @@ func (p *pool) Do(ctx context.Context, f func(conn *ClientConn) error) (err erro
if p == nil {
return errors.ErrGRPCClientConnNotFound("*")
}
idx, conn, ok := p.getHealthyConn(ctx, 0, p.Len())
if !ok || conn == nil {
idx, pc, ok := p.getHealthyConn(ctx, 0, p.Len())
if !ok || pc == nil || pc.conn == nil {

Check warning on line 578 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L577-L578

Added lines #L577 - L578 were not covered by tests
return errors.ErrGRPCClientConnNotFound(p.addr)
}
conn := pc.conn

Check warning on line 581 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L581

Added line #L581 was not covered by tests
err = f(conn)
if errors.Is(err, grpc.ErrClientConnClosing) {
if conn != nil {
if cerr := conn.Close(); cerr != nil && !errors.Is(cerr, grpc.ErrClientConnClosing) {
log.Warnf("Failed to close connection: %v", cerr)
}
}
conn, err = p.dial(ctx, p.addr)
if err == nil && conn != nil && isHealthy(ctx, conn) {
p.store(idx, &poolConn{
conn: conn,
addr: p.addr,
})
if newErr := f(conn); newErr != nil {
rerr := p.refreshConn(ctx, idx, pc, p.addr)
if rerr == nil {
if newErr := f(p.load(idx).conn); newErr != nil {

Check warning on line 591 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L589-L591

Added lines #L589 - L591 were not covered by tests
return errors.Join(err, newErr)
}
return nil
}
err = errors.Join(err, rerr)

Check warning on line 596 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L596

Added line #L596 was not covered by tests
}
return err
}

func (p *pool) Get(ctx context.Context) (conn *ClientConn, ok bool) {
_, conn, ok = p.getHealthyConn(ctx, 0, p.Len())
return conn, ok
_, pc, ok := p.getHealthyConn(ctx, 0, p.Len())
if ok && pc != nil {
return pc.conn, true
}
return nil, false

Check warning on line 606 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L602-L606

Added lines #L602 - L606 were not covered by tests
}

func (p *pool) getHealthyConn(
ctx context.Context, cnt, retry uint64,
) (idx int, conn *ClientConn, ok bool) {
) (idx int, pc *poolConn, ok bool) {

Check warning on line 611 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L611

Added line #L611 was not covered by tests
if p == nil || p.closing.Load() {
return 0, nil, false
}
Expand All @@ -618,33 +621,29 @@ func (p *pool) getHealthyConn(
if retry <= 0 || retry > math.MaxUint64-pl || pl <= 0 {
if p.isIP {
log.Warnf("failed to find gRPC IP connection pool for %s.\tlen(pool): %d,\tretried: %d,\tseems IP %s is unhealthy will going to disconnect...", p.addr, pl, cnt, p.addr)
if err := p.Disconnect(); err != nil {
if err := p.Disconnect(ctx); err != nil {

Check warning on line 624 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L624

Added line #L624 was not covered by tests
log.Debugf("failed to disconnect gRPC IP direct connection for %s,\terr: %v", p.addr, err)
}
return 0, nil, false
}
if pl > 0 {
idx = int(p.current.Add(1) % pl)
}
if pc := p.load(idx); pc != nil && isHealthy(ctx, pc.conn) {
return idx, pc.conn, true
if pc = p.load(idx); pc != nil && isHealthy(ctx, pc.conn) {
return idx, pc, true

Check warning on line 633 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L632-L633

Added lines #L632 - L633 were not covered by tests
}
conn, err := p.dial(ctx, p.addr)
if err == nil && conn != nil && isHealthy(ctx, conn) {
p.store(idx, &poolConn{
conn: conn,
addr: p.addr,
})
return idx, conn, true
err := p.refreshConn(ctx, idx, pc, p.addr)
if err == nil {
return idx, p.load(idx), true

Check warning on line 637 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L635-L637

Added lines #L635 - L637 were not covered by tests
}
log.Warnf("failed to find gRPC connection pool for %s.\tlen(pool): %d,\tretried: %d,\terror: %v", p.addr, pl, cnt, err)
return idx, nil, false
}

if pl > 0 {
idx = int(p.current.Add(1) % pl)
if pc := p.load(idx); pc != nil && isHealthy(ctx, pc.conn) {
return idx, pc.conn, true
if pc = p.load(idx); pc != nil && isHealthy(ctx, pc.conn) {
return idx, pc, true

Check warning on line 646 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L645-L646

Added lines #L645 - L646 were not covered by tests
}
}
retry--
Expand Down Expand Up @@ -757,8 +756,8 @@ func (p *pool) String() (str string) {

func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error {
tdelay := delay / 10
if tdelay < time.Millisecond*200 {
tdelay = time.Millisecond * 200
if tdelay < time.Millisecond*5 {
tdelay = time.Millisecond * 5

Check warning on line 760 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L759-L760

Added lines #L759 - L760 were not covered by tests
} else if tdelay > time.Minute {
tdelay = time.Second * 5
}
Expand Down
2 changes: 1 addition & 1 deletion internal/net/grpc/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,7 @@ package pool
// reconnectHash: test.fields.reconnectHash,
// }
//
// err := p.Disconnect()
// err := p.Disconnect(context.Background)
// if err := checkFunc(test.want, err); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down
2 changes: 1 addition & 1 deletion internal/test/mock/grpc/grpc_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (gc *GRPCClientMock) OrderedRangeConcurrent(
}

// ConnectedAddrs calls the ConnectedAddrsFunc object.
func (gc *GRPCClientMock) ConnectedAddrs() []string {
func (gc *GRPCClientMock) ConnectedAddrs(_ context.Context) []string {

Check warning on line 54 in internal/test/mock/grpc/grpc_client_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/grpc/grpc_client_mock.go#L54

Added line #L54 was not covered by tests
return gc.ConnectedAddrsFunc()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/test/mock/grpc_testify_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *ClientInternal) GetBackoff() backoff.Backoff {
return v
}

func (c *ClientInternal) ConnectedAddrs() []string {
func (c *ClientInternal) ConnectedAddrs(ctx context.Context) []string {

Check warning on line 202 in internal/test/mock/grpc_testify_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/grpc_testify_mock.go#L202

Added line #L202 was not covered by tests
args := c.Called()
v, ok := args.Get(0).([]string)
if !ok {
Expand Down
Loading

0 comments on commit 82b29ce

Please sign in to comment.