Skip to content

Commit 288aec0

Browse files
committed
fix
Signed-off-by: kpango <[email protected]>
1 parent 82b29ce commit 288aec0

File tree

1 file changed

+38
-23
lines changed

1 file changed

+38
-23
lines changed

internal/net/grpc/pool/pool.go

+38-23
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"github.com/vdaas/vald/internal/errors"
3131
"github.com/vdaas/vald/internal/log"
3232
"github.com/vdaas/vald/internal/net"
33+
"github.com/vdaas/vald/internal/net/grpc/codes"
34+
"github.com/vdaas/vald/internal/net/grpc/status"
3335
"github.com/vdaas/vald/internal/safety"
3436
"github.com/vdaas/vald/internal/strings"
3537
"github.com/vdaas/vald/internal/sync"
@@ -131,7 +133,7 @@ func New(ctx context.Context, opts ...Option) (c Conn, err error) {
131133
p.addr = net.JoinHostPort(p.host, p.port)
132134
}
133135

134-
conn, err := grpc.NewClient(p.addr, p.dopts...)
136+
conn, err := p.dial(ctx, p.addr)
135137
if err != nil {
136138
log.Warnf("grpc.New initial Dial check to %s returned error: %v", p.addr, err)
137139
if conn != nil {
@@ -147,7 +149,7 @@ func New(ctx context.Context, opts ...Option) (c Conn, err error) {
147149
}
148150
p.port = port
149151
p.addr = net.JoinHostPort(p.host, p.port)
150-
conn, err = grpc.NewClient(p.addr, p.dopts...)
152+
conn, err := p.dial(ctx, p.addr)
151153
if err != nil {
152154
if conn != nil {
153155
cerr := conn.Close()
@@ -294,9 +296,9 @@ func (p *pool) refreshConn(ctx context.Context, idx int, pc *poolConn, addr stri
294296
return nil
295297
}
296298
if pc != nil {
297-
log.Debugf("connection for %s pool %d/%d is unhealthy trying to establish new pool member connection to %s", pc.addr, idx+1, p.Size(), addr)
299+
log.Debugf("connection for %s pool %d/%d len %d is unhealthy trying to establish new pool member connection to %s", pc.addr, idx+1, p.Size(), p.Len(), addr)
298300
} else {
299-
log.Debugf("connection pool %d/%d is empty, establish new pool member connection to %s", idx+1, p.Size(), addr)
301+
log.Debugf("connection pool %d/%d len %d is empty, establish new pool member connection to %s", idx+1, p.Size(), p.Len(), addr)
300302
}
301303
conn, err := p.dial(ctx, addr)
302304
if err != nil {
@@ -580,20 +582,27 @@ func (p *pool) Do(ctx context.Context, f func(conn *ClientConn) error) (err erro
580582
}
581583
conn := pc.conn
582584
err = f(conn)
583-
if errors.Is(err, grpc.ErrClientConnClosing) {
584-
if conn != nil {
585-
if cerr := conn.Close(); cerr != nil && !errors.Is(cerr, grpc.ErrClientConnClosing) {
586-
log.Warnf("Failed to close connection: %v", cerr)
585+
if err != nil {
586+
st, ok := status.FromError(err)
587+
if ok && st != nil && st.Code() != codes.Canceled { // connection closing or closed
588+
if conn != nil {
589+
cerr := conn.Close()
590+
if cerr != nil {
591+
st, ok := status.FromError(cerr)
592+
if ok && st != nil && st.Code() != codes.Canceled { // connection closing or closed
593+
log.Warnf("Failed to close connection: %v", cerr)
594+
}
595+
}
587596
}
588-
}
589-
rerr := p.refreshConn(ctx, idx, pc, p.addr)
590-
if rerr == nil {
591-
if newErr := f(p.load(idx).conn); newErr != nil {
592-
return errors.Join(err, newErr)
597+
rerr := p.refreshConn(ctx, idx, pc, p.addr)
598+
if rerr == nil {
599+
if newErr := f(p.load(idx).conn); newErr != nil {
600+
return errors.Join(err, newErr)
601+
}
602+
return nil
593603
}
594-
return nil
604+
err = errors.Join(err, rerr)
595605
}
596-
err = errors.Join(err, rerr)
597606
}
598607
return err
599608
}
@@ -769,13 +778,16 @@ func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error {
769778
select {
770779
case <-ctx.Done():
771780
err := pc.conn.Close()
772-
if err != nil && !errors.Is(err, grpc.ErrClientConnClosing) {
773-
if ctx.Err() != nil &&
774-
!errors.Is(ctx.Err(), context.DeadlineExceeded) &&
775-
!errors.Is(ctx.Err(), context.Canceled) {
776-
return errors.Join(err, ctx.Err())
781+
if err != nil {
782+
st, ok := status.FromError(err)
783+
if ok && st != nil && st.Code() != codes.Canceled { // connection closing or closed
784+
if ctx.Err() != nil &&
785+
!errors.Is(ctx.Err(), context.DeadlineExceeded) &&
786+
!errors.Is(ctx.Err(), context.Canceled) {
787+
return errors.Join(err, ctx.Err())
788+
}
789+
return err
777790
}
778-
return err
779791
}
780792
if ctx.Err() != nil &&
781793
!errors.Is(ctx.Err(), context.DeadlineExceeded) &&
@@ -787,8 +799,11 @@ func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error {
787799
switch pc.conn.GetState() {
788800
case connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure:
789801
err := pc.conn.Close()
790-
if err != nil && !errors.Is(err, grpc.ErrClientConnClosing) {
791-
return err
802+
if err != nil {
803+
st, ok := status.FromError(err)
804+
if ok && st != nil && st.Code() != codes.Canceled {
805+
return err
806+
}
792807
}
793808
return nil
794809
case connectivity.Shutdown:

0 commit comments

Comments
 (0)