Skip to content

Commit

Permalink
[BUGFIX] add Health Check for Range over gRPC Connection Loop
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Jan 10, 2025
1 parent 6065fd9 commit 65795ab
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
34 changes: 23 additions & 11 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
return ctx.Err()
case <-prTick.C:
if g.enablePoolRebalance {
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 252 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L252

Added line #L252 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
})
}
case <-hcTick.C:
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 289 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L289

Added line #L289 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -415,7 +415,7 @@ func (g *gRPCClient) Range(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 418 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L418

Added line #L418 was not covered by tests
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 481 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L481

Added line #L481 was not covered by tests
eg.Go(safety.RecoverFunc(func() (err error) {
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr)
defer func() {
Expand Down Expand Up @@ -565,7 +565,7 @@ func (g *gRPCClient) OrderedRange(
return nil
default:
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 568 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L568

Added line #L568 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
continue
Expand Down Expand Up @@ -634,7 +634,7 @@ func (g *gRPCClient) OrderedRangeConcurrent(
addr := order
eg.Go(safety.RecoverFunc(func() (err error) {
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 637 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L637

Added line #L637 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
return nil
Expand Down Expand Up @@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin(
}

do := func() (data any, err error) {
cerr := g.rangeConns(func(addr string, p pool.Conn) bool {
cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 704 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L704

Added line #L704 was not covered by tests
select {
case <-ctx.Done():
err = ctx.Err()
Expand Down Expand Up @@ -879,7 +879,7 @@ func (g *gRPCClient) connectWithBackoff(
errors.Is(err, context.DeadlineExceeded) {
return nil, false, err
}
return nil, err != nil, err
return nil, err != nil && p.IsHealthy(ctx), err

Check warning on line 882 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L882

Added line #L882 was not covered by tests
}
status.Log(st.Code(), err)
switch st.Code() {
Expand Down Expand Up @@ -1087,8 +1087,9 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {

func (g *gRPCClient) ConnectedAddrs() (addrs []string) {
addrs = make([]string, 0, g.conns.Len())
err := g.rangeConns(func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(context.Background()) {
ctx := context.Background()
err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(ctx) {

Check warning on line 1092 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1090-L1092

Added lines #L1090 - L1092 were not covered by tests
addrs = append(addrs, addr)
}
return true
Expand All @@ -1113,9 +1114,20 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) {
return err
}

func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error {
func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error {

Check warning on line 1117 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1117

Added line #L1117 was not covered by tests
var cnt int
g.conns.Range(func(addr string, p pool.Conn) bool {
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
pc.Disconnect()
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
return true

Check warning on line 1127 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1120-L1127

Added lines #L1120 - L1127 were not covered by tests
}
p = pc

Check warning on line 1129 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1129

Added line #L1129 was not covered by tests
}
cnt++
return fn(addr, p)
})
Expand Down
1 change: 1 addition & 0 deletions internal/net/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3303,6 +3303,7 @@ package grpc
//
// func Test_gRPCClient_rangeConns(t *testing.T) {
// type args struct {
// ctx context.Context
// fn func(addr string, p pool.Conn) bool
// }
// type fields struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,8 @@ func (p *pool) String() (str string) {

func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error {
tdelay := delay / 10
if tdelay < time.Millisecond*200 {
tdelay = time.Millisecond * 200
if tdelay < time.Millisecond*5 {
tdelay = time.Millisecond * 5

Check warning on line 761 in internal/net/grpc/pool/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/pool/pool.go#L760-L761

Added lines #L760 - L761 were not covered by tests
} else if tdelay > time.Minute {
tdelay = time.Second * 5
}
Expand Down

0 comments on commit 65795ab

Please sign in to comment.