-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUGFIX] add Health Check for Range over gRPC Connection Loop #2798
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -90,7 +90,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
GetCallOption() []CallOption | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
GetBackoff() backoff.Backoff | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SetDisableResolveDNSAddr(addr string, disabled bool) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ConnectedAddrs() []string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ConnectedAddrs(context.Context) []string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Close(ctx context.Context) error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -249,7 +249,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return ctx.Err() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-prTick.C: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if g.enablePoolRebalance { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if addr == "" || p == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
disconnectTargets = append(disconnectTargets, addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -286,7 +286,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-hcTick.C: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if addr == "" || p == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
disconnectTargets = append(disconnectTargets, addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -415,7 +415,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if g.conns.Len() == 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return errors.ErrGRPCClientConnNotFound("*") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if sspan != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -478,7 +478,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if g.conns.Len() == 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return errors.ErrGRPCClientConnNotFound("*") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
eg.Go(safety.RecoverFunc(func() (err error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -565,7 +565,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
p, ok := g.conns.Load(addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !ok || p == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !ok || p == nil || !p.IsHealthy(sctx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.crl.Store(addr, true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -634,7 +634,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
addr := order | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
eg.Go(safety.RecoverFunc(func() (err error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
p, ok := g.conns.Load(addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !ok || p == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !ok || p == nil || !p.IsHealthy(sctx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.crl.Store(addr, true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -701,7 +701,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
do := func() (data any, err error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cerr := g.rangeConns(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = ctx.Err() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -879,14 +879,14 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
errors.Is(err, context.DeadlineExceeded) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, false, err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, err != nil, err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, p.IsHealthy(ctx), err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
status.Log(st.Code(), err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
switch st.Code() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case codes.Internal, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
codes.Unavailable, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
codes.ResourceExhausted: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, err != nil, err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, p.IsHealthy(ctx), err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, false, err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1066,7 +1066,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
atomic.AddUint64(&g.clientCount, ^uint64(0)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if p != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debugf("gRPC client connection pool addr = %s will disconnect soon...", addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, p.Disconnect() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, p.Disconnect(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1085,10 +1085,10 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (g *gRPCClient) ConnectedAddrs() (addrs []string) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (g *gRPCClient) ConnectedAddrs(ctx context.Context) (addrs []string) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
addrs = make([]string, 0, g.conns.Len()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err := g.rangeConns(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if p != nil && p.IsHealthy(context.Background()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if p != nil && p.IsHealthy(ctx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
addrs = append(addrs, addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1104,18 +1104,34 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.stopMonitor() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.conns.Range(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
derr := g.Disconnect(ctx, addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = errors.Join(err, derr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
derr := g.Disconnect(ctx, addr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if derr != nil && !errors.Is(derr, errors.ErrGRPCClientConnNotFound(addr)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err = errors.Join(err, derr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var cnt int | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
g.conns.Range(func(addr string, p pool.Conn) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if p == nil || !p.IsHealthy(ctx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pc, err := p.Connect(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if pc == nil || err != nil || !pc.IsHealthy(ctx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if pc != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pc.Disconnect(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
p = pc | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+1121
to
+1134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Improve error handling in rangeConns. The error handling in rangeConns could be improved:
Apply this diff to improve error handling: func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error {
var cnt int
g.conns.Range(func(addr string, p pool.Conn) bool {
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
- pc.Disconnect(ctx)
+ if derr := pc.Disconnect(ctx); derr != nil {
+ log.Debugf("Failed to disconnect unhealthy connection for %s: %v", addr, derr)
+ }
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
return true
}
p = pc
}
cnt++
return fn(addr, p)
})
if cnt == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
return nil
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cnt++ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return fn(addr, p) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3107,7 +3107,7 @@ package grpc | |||||
// stopMonitor: test.fields.stopMonitor, | ||||||
// } | ||||||
// | ||||||
// gotAddrs := g.ConnectedAddrs() | ||||||
// gotAddrs := g.ConnectedAddrs(context.Background) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix the context.Background function call. There's a syntax error in the Apply this diff to fix the syntax error: - gotAddrs := g.ConnectedAddrs(context.Background)
+ gotAddrs := g.ConnectedAddrs(context.Background()) 📝 Committable suggestion
Suggested change
|
||||||
// if err := checkFunc(test.want, gotAddrs); err != nil { | ||||||
// tt.Errorf("error = %v", err) | ||||||
// } | ||||||
|
@@ -3303,6 +3303,7 @@ package grpc | |||||
// | ||||||
// func Test_gRPCClient_rangeConns(t *testing.T) { | ||||||
// type args struct { | ||||||
// ctx context.Context | ||||||
// fn func(addr string, p pool.Conn) bool | ||||||
// } | ||||||
// type fields struct { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Test coverage needed: RoundRobin operation tests are defined but not implemented
Found test stubs in
internal/net/grpc/client_test.go
that are currently commented out and marked as "NOT IMPLEMENTED":Test_gRPCClient_RoundRobin
Test_gRPCClient_rangeConns
This confirms that this critical functionality, which is used throughout the codebase for connection management, lacks proper test coverage.
🔗 Analysis chain
Add test coverage for RoundRobin operation.
The RoundRobin implementation properly handles context and integrates with the circuit breaker. However, this critical path lacks test coverage.
Run the following script to check existing test patterns:
Would you like me to help generate test cases for the RoundRobin operation?
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 34
Script:
Length of output: 5654
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 704-704: internal/net/grpc/client.go#L704
Added line #L704 was not covered by tests