@@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
249
249
return ctx .Err ()
250
250
case <- prTick .C :
251
251
if g .enablePoolRebalance {
252
- err = g .rangeConns (ctx , func (addr string , p pool.Conn ) bool {
252
+ err = g .rangeConns (ctx , true , func (addr string , p pool.Conn ) bool {
253
253
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
254
254
if addr == "" || p == nil {
255
255
disconnectTargets = append (disconnectTargets , addr )
@@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
286
286
})
287
287
}
288
288
case <- hcTick .C :
289
- err = g .rangeConns (ctx , func (addr string , p pool.Conn ) bool {
289
+ err = g .rangeConns (ctx , true , func (addr string , p pool.Conn ) bool {
290
290
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
291
291
if addr == "" || p == nil {
292
292
disconnectTargets = append (disconnectTargets , addr )
@@ -415,7 +415,7 @@ func (g *gRPCClient) Range(
415
415
if g .conns .Len () == 0 {
416
416
return errors .ErrGRPCClientConnNotFound ("*" )
417
417
}
418
- err = g .rangeConns (ctx , func (addr string , p pool.Conn ) bool {
418
+ err = g .rangeConns (ctx , false , func (addr string , p pool.Conn ) bool {
419
419
ssctx , sspan := trace .StartSpan (sctx , apiName + "/Client.Range/" + addr )
420
420
defer func () {
421
421
if sspan != nil {
@@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent(
478
478
if g .conns .Len () == 0 {
479
479
return errors .ErrGRPCClientConnNotFound ("*" )
480
480
}
481
- err = g .rangeConns (ctx , func (addr string , p pool.Conn ) bool {
481
+ err = g .rangeConns (ctx , false , func (addr string , p pool.Conn ) bool {
482
482
eg .Go (safety .RecoverFunc (func () (err error ) {
483
483
ssctx , sspan := trace .StartSpan (egctx , apiName + "/Client.RangeConcurrent/" + addr )
484
484
defer func () {
@@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin(
701
701
}
702
702
703
703
do := func () (data any , err error ) {
704
- cerr := g .rangeConns (ctx , func (addr string , p pool.Conn ) bool {
704
+ cerr := g .rangeConns (ctx , false , func (addr string , p pool.Conn ) bool {
705
705
select {
706
706
case <- ctx .Done ():
707
707
err = ctx .Err ()
@@ -1087,7 +1087,7 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
1087
1087
1088
1088
func (g * gRPCClient ) ConnectedAddrs (ctx context.Context ) (addrs []string ) {
1089
1089
addrs = make ([]string , 0 , g .conns .Len ())
1090
- err := g .rangeConns (ctx , func (addr string , p pool.Conn ) bool {
1090
+ err := g .rangeConns (ctx , false , func (addr string , p pool.Conn ) bool {
1091
1091
if p != nil && p .IsHealthy (ctx ) {
1092
1092
addrs = append (addrs , addr )
1093
1093
}
@@ -1118,14 +1118,20 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) {
1118
1118
return err
1119
1119
}
1120
1120
1121
- func (g * gRPCClient ) rangeConns (ctx context.Context , fn func (addr string , p pool.Conn ) bool ) error {
1121
+ func (g * gRPCClient ) rangeConns (ctx context.Context , force bool , fn func (addr string , p pool.Conn ) bool ) error {
1122
1122
var cnt int
1123
1123
g .conns .Range (func (addr string , p pool.Conn ) bool {
1124
+ if force {
1125
+ cnt ++
1126
+ return fn (addr , p )
1127
+ }
1124
1128
if p == nil || ! p .IsHealthy (ctx ) {
1125
1129
pc , err := p .Connect (ctx )
1126
1130
if pc == nil || err != nil || ! pc .IsHealthy (ctx ) {
1127
1131
if pc != nil {
1128
- pc .Disconnect (ctx )
1132
+ if derr := pc .Disconnect (ctx ); derr != nil {
1133
+ log .Debugf ("Failed to disconnect unhealthy connection for %s: %v" , addr , derr )
1134
+ }
1129
1135
}
1130
1136
log .Debugf ("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t %s" , addr , p .String ())
1131
1137
return true
0 commit comments