Skip to content

Commit

Permalink
[BUGFIX] add Health Check for Range over gRPC Connection Loop
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Jan 9, 2025
1 parent 6065fd9 commit d9e319e
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
return ctx.Err()
case <-prTick.C:
if g.enablePoolRebalance {
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 252 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L252

Added line #L252 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
})
}
case <-hcTick.C:
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 289 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L289

Added line #L289 was not covered by tests
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
Expand Down Expand Up @@ -415,7 +415,7 @@ func (g *gRPCClient) Range(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 418 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L418

Added line #L418 was not covered by tests
ssctx, sspan := trace.StartSpan(sctx, apiName+"/Client.Range/"+addr)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (g *gRPCClient) RangeConcurrent(
if g.conns.Len() == 0 {
return errors.ErrGRPCClientConnNotFound("*")
}
err = g.rangeConns(func(addr string, p pool.Conn) bool {
err = g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 481 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L481

Added line #L481 was not covered by tests
eg.Go(safety.RecoverFunc(func() (err error) {
ssctx, sspan := trace.StartSpan(egctx, apiName+"/Client.RangeConcurrent/"+addr)
defer func() {
Expand Down Expand Up @@ -565,7 +565,7 @@ func (g *gRPCClient) OrderedRange(
return nil
default:
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 568 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L568

Added line #L568 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRange operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
continue
Expand Down Expand Up @@ -634,7 +634,7 @@ func (g *gRPCClient) OrderedRangeConcurrent(
addr := order
eg.Go(safety.RecoverFunc(func() (err error) {
p, ok := g.conns.Load(addr)
if !ok || p == nil {
if !ok || p == nil || !p.IsHealthy(sctx) {

Check warning on line 637 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L637

Added line #L637 was not covered by tests
g.crl.Store(addr, true)
log.Warnf("gRPCClient.OrderedRangeConcurrent operation failed, gRPC connection pool for %s is invalid,\terror: %v", addr, errors.ErrGRPCClientConnNotFound(addr))
return nil
Expand Down Expand Up @@ -701,7 +701,7 @@ func (g *gRPCClient) RoundRobin(
}

do := func() (data any, err error) {
cerr := g.rangeConns(func(addr string, p pool.Conn) bool {
cerr := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {

Check warning on line 704 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L704

Added line #L704 was not covered by tests
select {
case <-ctx.Done():
err = ctx.Err()
Expand Down Expand Up @@ -1087,8 +1087,9 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {

func (g *gRPCClient) ConnectedAddrs() (addrs []string) {
addrs = make([]string, 0, g.conns.Len())
err := g.rangeConns(func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(context.Background()) {
ctx := context.Background()
err := g.rangeConns(ctx, func(addr string, p pool.Conn) bool {
if p != nil && p.IsHealthy(ctx) {

Check warning on line 1092 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1090-L1092

Added lines #L1090 - L1092 were not covered by tests
addrs = append(addrs, addr)
}
return true
Expand All @@ -1113,9 +1114,20 @@ func (g *gRPCClient) Close(ctx context.Context) (err error) {
return err
}

func (g *gRPCClient) rangeConns(fn func(addr string, p pool.Conn) bool) error {
func (g *gRPCClient) rangeConns(ctx context.Context, fn func(addr string, p pool.Conn) bool) error {

Check warning on line 1117 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1117

Added line #L1117 was not covered by tests
var cnt int
g.conns.Range(func(addr string, p pool.Conn) bool {
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
pc.Disconnect()
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
return true

Check warning on line 1127 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1120-L1127

Added lines #L1120 - L1127 were not covered by tests
}
p = pc

Check warning on line 1129 in internal/net/grpc/client.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/client.go#L1129

Added line #L1129 was not covered by tests
}
cnt++
return fn(addr, p)
})
Expand Down

0 comments on commit d9e319e

Please sign in to comment.