Skip to content

Commit

Permalink
[BUGFIX] add Health Check for Range over gRPC Connection Loop
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Jan 10, 2025
1 parent 6065fd9 commit 770b234
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 31 deletions.
54 changes: 35 additions & 19 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Client interface {
GetCallOption() []CallOption
GetBackoff() backoff.Backoff
SetDisableResolveDNSAddr(addr string, disabled bool)
ConnectedAddrs() []string
ConnectedAddrs(context.Context) []string
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
return ctx.Err()
case <-prTick.C:
if g.enablePoolRebalance {
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 252 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L252

Added line #L252 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
})
}
case <-hcTick.C:
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 289 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L289

Added line #L289 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -415,7 +415,7 @@ func (g *gRPCClient) Range(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 418 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L418

Added line #L418 was not covered by tests
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 481 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L481

Added line #L481 was not covered by tests
eg.Go(safety.RecoverFunc(func() (err error) {
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr)
defer func() {
Expand Down Expand Up @@ -565,7 +565,7 @@ func (g *gRPCClient) OrderedRange(
return nil
default:
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 568 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L568

Added line #L568 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
continue
Expand Down Expand Up @@ -634,7 +634,7 @@ func (g *gRPCClient) OrderedRangeConcurrent(
addr := order
eg.Go(safety.RecoverFunc(func() (err error) {
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 637 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L637

Added line #L637 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
return nil
Expand Down Expand Up @@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin(
}

do := func() (data any, err error) {
cerr := g.rangeConns(func(addr string, p pool.Conn) bool {
cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 704 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L704

Added line #L704 was not covered by tests
select {
case <-ctx.Done():
err = ctx.Err()
Expand Down Expand Up @@ -879,14 +879,14 @@ func (g *gRPCClient) connectWithBackoff(
errors.Is(err, context.DeadlineExceeded) {
return nil, false, err
}
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err

Check warning on line 882 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L882

Added line #L882 was not covered by tests
}
status.Log(st.Code(), err)
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
return nil, err != nil, err
return nil, p.IsHealthy(ctx), err

Check warning on line 889 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L889

Added line #L889 was not covered by tests
}
return nil, false, err
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
atomic.AddUint64(&g.clientCount, ^uint64(0))
if p != nil {
log.Debugf("gRPC client connection pool addr = %s will disconnect soon...", addr)
return nil, p.Disconnect()
return nil, p.Disconnect(ctx)

Check warning on line 1069 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1069

Added line #L1069 was not covered by tests
}
return nil, nil
})
Expand All @@ -1085,10 +1085,10 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
return nil
}

func (g *gRPCClient) ConnectedAddrs() (addrs []string) {
func (g *gRPCClient) ConnectedAddrs(ctx context.Context) (addrs []string) {

Check warning on line 1088 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1088

Added line #L1088 was not covered by tests
addrs = make([]string, 0, g.conns.Len())
err := g.rangeConns(func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(context.Background()) {
err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(ctx) {

Check warning on line 1091 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1090-L1091

Added lines #L1090 - L1091 were not covered by tests
addrs = append(addrs, addr)
}
return true
Expand All @@ -1104,18 +1104,34 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) {
g.stopMonitor()
}
g.conns.Range(func(addr string, p pool.Conn) bool {
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
select {
case <-ctx.Done():
return false
default:
derr := g.Disconnect(ctx, addr)
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) {
err = errors.Join(err, derr)
}
return true

Check warning on line 1115 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1107-L1115

Added lines #L1107 - L1115 were not covered by tests
}
return true
})
return err
}

func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error {
func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error {

Check warning on line 1121 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1121

Added line #L1121 was not covered by tests
var cnt int
g.conns.Range(func(addr string, p pool.Conn) bool {
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
pc.Disconnect(ctx)
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
return true

Check warning on line 1131 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1124-L1131

Added lines #L1124 - L1131 were not covered by tests
}
p = pc

Check warning on line 1133 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1133

Added line #L1133 was not covered by tests
}
cnt++
return fn(addr, p)
})
Expand Down
3 changes: 2 additions & 1 deletion internal/net/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3107,7 +3107,7 @@ package grpc
// stopMonitor: test.fields.stopMonitor,
// }
//
// gotAddrs := g.ConnectedAddrs()
// gotAddrs := g.ConnectedAddrs(context.Background)
// if err := checkFunc(test.want, gotAddrs); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down Expand Up @@ -3303,6 +3303,7 @@ package grpc
//
// func Test_gRPCClient_rangeConns(t *testing.T) {
// type args struct {
// ctx context.Context
// fn func(addr string, p pool.Conn) bool
// }
// type fields struct {
Expand Down
13 changes: 6 additions & 7 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type (

type Conn interface {
Connect(context.Context) (Conn, error)
Disconnect() error
Disconnect(context.Context) error
Do(ctx context.Context, f func(*ClientConn) error) error
Get(ctx context.Context) (conn *ClientConn, ok bool)
Get(context.Context) (conn *ClientConn, ok bool)
IsHealthy(context.Context) bool
IsIPConn() bool
Len() uint64
Expand Down Expand Up @@ -437,8 +437,7 @@ func (p *pool) singleTargetConnect(ctx context.Context) (c Conn, err error) {
return p, nil
}

func (p *pool) Disconnect() (err error) {
ctx := context.Background()
func (p *pool) Disconnect(ctx context.Context) (err error) {

Check warning on line 440 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L440

Added line #L440 was not covered by tests
p.closing.Store(true)
defer p.closing.Store(false)
emap := make(map[string]error, p.len())
Expand Down Expand Up @@ -618,7 +617,7 @@ func (p *pool) getHealthyConn(
if retry <= 0 || retry > math.MaxUint64-pl || pl <= 0 {
if p.isIP {
log.Warnf("failed to find gRPC IP connection pool for %s.\tlen(pool): %d,\tretried: %d,\tseems IP %s is unhealthy will going to disconnect...", p.addr, pl, cnt, p.addr)
if err := p.Disconnect(); err != nil {
if err := p.Disconnect(ctx); err != nil {

Check warning on line 620 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L620

Added line #L620 was not covered by tests
log.Debugf("failed to disconnect gRPC IP direct connection for %s,\terr: %v", p.addr, err)
}
return 0, nil, false
Expand Down Expand Up @@ -757,8 +756,8 @@ func (p *pool) String() (str string) {

func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error {
tdelay := delay / 10
if tdelay < time.Millisecond*200 {
tdelay = time.Millisecond * 200
if tdelay < time.Millisecond*5 {
tdelay = time.Millisecond * 5

Check warning on line 760 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L759-L760

Added lines #L759 - L760 were not covered by tests
} else if tdelay > time.Minute {
tdelay = time.Second * 5
}
Expand Down
2 changes: 1 addition & 1 deletion internal/net/grpc/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,7 @@ package pool
// reconnectHash: test.fields.reconnectHash,
// }
//
// err := p.Disconnect()
// err := p.Disconnect(context.Background)
// if err := checkFunc(test.want, err); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down
2 changes: 1 addition & 1 deletion internal/test/mock/grpc/grpc_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (gc *GRPCClientMock) OrderedRangeConcurrent(
}

// ConnectedAddrs calls the ConnectedAddrsFunc object.
func (gc *GRPCClientMock) ConnectedAddrs() []string {
func (gc *GRPCClientMock) ConnectedAddrs(_ context.Context) []string {

Check warning on line 54 in internal/test/mock/grpc/grpc_client_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/grpc/grpc_client_mock.go#L54

Added line #L54 was not covered by tests
return gc.ConnectedAddrsFunc()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/test/mock/grpc_testify_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *ClientInternal) GetBackoff() backoff.Backoff {
return v
}

func (c *ClientInternal) ConnectedAddrs() []string {
func (c *ClientInternal) ConnectedAddrs(ctx context.Context) []string {

Check warning on line 202 in internal/test/mock/grpc_testify_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/grpc_testify_mock.go#L202

Added line #L202 was not covered by tests
args := c.Called()
v, ok := args.Get(0).([]string)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/gateway/lb/handler/grpc/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *server) aggregationSearch(
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 104 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L104

Added line #L104 was not covered by tests
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
Expand All @@ -112,6 +113,7 @@ func (s *server) aggregationSearch(
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 116 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L116

Added line #L116 was not covered by tests
return nil
default:
st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response",
Expand Down Expand Up @@ -168,6 +170,7 @@ func (s *server) aggregationSearch(
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 173 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L173

Added line #L173 was not covered by tests
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
Expand All @@ -179,6 +182,7 @@ func (s *server) aggregationSearch(
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)

Check warning on line 185 in pkg/gateway/lb/handler/grpc/aggregation.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/lb/handler/grpc/aggregation.go#L185

Added line #L185 was not covered by tests
return nil
default:
st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response",
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/mirror/service/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (m *mirr) Start(ctx context.Context) <-chan error { // skipcq: GO-R1005
}
}
}
log.Debugf("[mirror]: connected mirror gateway targets: %v", m.gateway.GRPCClient().ConnectedAddrs())
log.Debugf("[mirror]: connected mirror gateway targets: %v", m.gateway.GRPCClient().ConnectedAddrs(ctx))

Check warning on line 163 in pkg/gateway/mirror/service/mirror.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/mirror/service/mirror.go#L163

Added line #L163 was not covered by tests
}
}
})
Expand Down

0 comments on commit 770b234

Please sign in to comment.