Skip to content

Commit f85a3af

Browse files
committed
[BUGFIX] add Health Check for Range over gRPC Connection Loop
Signed-off-by: kpango <[email protected]>
1 parent 6065fd9 commit f85a3af

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

internal/net/grpc/client.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
249249
return ctx.Err()
250250
case <-prTick.C:
251251
if g.enablePoolRebalance {
252-
err = g.rangeConns(func(addr string, p pool.Conn) bool {
252+
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
253253
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
254254
if addr == "" || p == nil {
255255
disconnectTargets = append(disconnectTargets, addr)
@@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
286286
})
287287
}
288288
case <-hcTick.C:
289-
err = g.rangeConns(func(addr string, p pool.Conn) bool {
289+
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
290290
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
291291
if addr == "" || p == nil {
292292
disconnectTargets = append(disconnectTargets, addr)
@@ -415,7 +415,7 @@ func (g *gRPCClient) Range(
415415
if g.conns.Len() == 0 {
416416
return errors.ErrGRPCClientConnNotFound("*")
417417
}
418-
err = g.rangeConns(func(addr string, p pool.Conn) bool {
418+
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
419419
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr)
420420
defer func() {
421421
if sspan != nil {
@@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent(
478478
if g.conns.Len() == 0 {
479479
return errors.ErrGRPCClientConnNotFound("*")
480480
}
481-
err = g.rangeConns(func(addr string, p pool.Conn) bool {
481+
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
482482
eg.Go(safety.RecoverFunc(func() (err error) {
483483
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr)
484484
defer func() {
@@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin(
701701
}
702702

703703
do := func() (data any, err error) {
704-
cerr := g.rangeConns(func(addr string, p pool.Conn) bool {
704+
cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
705705
select {
706706
case <-ctx.Done():
707707
err = ctx.Err()
@@ -1087,8 +1087,9 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
10871087

10881088
func (g *gRPCClient) ConnectedAddrs() (addrs []string) {
10891089
addrs = make([]string, 0, g.conns.Len())
1090-
err := g.rangeConns(func(addr string, p pool.Conn) bool {
1091-
if p != nil && p.IsHealthy(context.Background()) {
1090+
ctx := context.Background()
1091+
err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
1092+
if p != nil && p.IsHealthy(ctx) {
10921093
addrs = append(addrs, addr)
10931094
}
10941095
return true
@@ -1113,9 +1114,20 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) {
11131114
return err
11141115
}
11151116

1116-
func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error {
1117+
func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error {
11171118
var cnt int
11181119
g.conns.Range(func(addr string, p pool.Conn) bool {
1120+
if p == nil || !p.IsHealthy(ctx) {
1121+
pc, err := p.Connect(ctx)
1122+
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
1123+
if pc != nil {
1124+
pc.Disconnect()
1125+
}
1126+
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
1127+
return true
1128+
}
1129+
p = pc
1130+
}
11191131
cnt++
11201132
return fn(addr, p)
11211133
})

0 commit comments

Comments
 (0)