Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Feb 5, 2025
1 parent 551c681 commit 9726362
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
12 changes: 11 additions & 1 deletion internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,14 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
return ctx.Err()
case <-prTick.C:
if g.enablePoolRebalance {
log.Debug("starting to force rebalance pool connection")
err = g.rangeConns(ctx, true, func(addr string, p pool.Conn) bool {
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
return true
}
log.Debugf("rebalancing pool connection for addr: %s, detail: %s", addr, p.String())
var err error
// for rebalancing connection we don't need to check connection health
p, err = p.Connect(ctx)
Expand Down Expand Up @@ -291,12 +293,14 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
})
}
case <-hcTick.C:
log.Debug("starting to checking health of pool connection")
err = g.rangeConns(ctx, true, func(addr string, p pool.Conn) bool {
// if addr or pool is nil or empty the registration of conns is invalid let's disconnect them
if addr == "" || p == nil {
disconnectTargets = append(disconnectTargets, addr)
return true
}
log.Debugf("checking health of pool connection for addr: %s, detail: %s", addr, p.String())
// for health check we don't need to reconnect when connection is healthy
if p.IsHealthy(ctx) {
return true
Expand Down Expand Up @@ -340,6 +344,8 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
}
}
}

log.Debugf("starting to bulk reconnection for addrs %v", g.crl)
clctx, cancel := context.WithTimeout(ctx, reconnLimitDuration)
g.crl.Range(func(addr string, enabled bool) bool {
select {
Expand Down Expand Up @@ -374,6 +380,7 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
port uint16
disconnected = make(map[string]bool, len(disconnectTargets))
)
log.Debugf("starting to bulk disconnection for addrs %v", disconnectTargets)
for _, addr := range disconnectTargets {
host, port, _, isIPv4, isIPv6, err = net.Parse(addr)
disconnectFlag = isIPv4 || isIPv6 // Disconnect only if the connection is a direct IP connection; do not delete connections via DNS due to retry.
Expand Down Expand Up @@ -1138,9 +1145,12 @@ func (g *gRPCClient) rangeConns(
if p == nil || !p.IsHealthy(ctx) {
pc, err := p.Connect(ctx)
if pc == nil || err != nil || !pc.IsHealthy(ctx) {
if pc != nil {
if pc != nil && pc.IsIPConn() {
log.Debugf("Failed to re-connect unhealthy connection for %s: %v, trying to disconnect unhealthy", addr, err)
if derr := pc.Disconnect(ctx); derr != nil {
log.Debugf("Failed to disconnect unhealthy connection for %s: %v", addr, derr)
} else {
g.conns.Delete(addr)
}
}
log.Debugf("Unhealthy connection detected for %s during gRPC Connection Range over Loop:\t%s", addr, p.String())
Expand Down
7 changes: 4 additions & 3 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,18 +795,19 @@ func (p *pool) String() (str string) {
if rh != nil {
hash = *rh
}
return fmt.Sprintf("addr: %s, host: %s, port %d, isIP: %v, resolveDNS: %v, hash: %s, pool_size: %d, current_seek: %d, dopt_len: %d, dial_timeout: %v, roccd: %v, closing: %v",
return fmt.Sprintf("addr: %s, host: %s, port %d, isIP: %t, resolveDNS: %t, hash: %s, pool_len: %d, pool_size: %d, current_seek: %d, dopt_len: %d, dial_timeout: %s, roccd: %s, closing: %t",
p.addr,
p.host,
p.port,
p.isIP,
p.resolveDNS,
hash,
len(p.pool),
p.size.Load(),
p.current.Load(),
len(p.dopts),
p.dialTimeout,
p.roccd,
p.dialTimeout.String(),
p.roccd.String(),
p.closing.Load())
}

Expand Down

0 comments on commit 9726362

Please sign in to comment.