Skip to content

Commit

Permalink
dialPoolPrivate: a dialPool-like solution, but where tmClients are no…
Browse files Browse the repository at this point in the history
…t shared. A real pool

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Mar 21, 2024
1 parent 2bbef59 commit 48485a3
Showing 1 changed file with 79 additions and 4 deletions.
83 changes: 79 additions & 4 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"

"github.com/spf13/pflag"
Expand Down Expand Up @@ -98,8 +99,10 @@ type grpcClient struct {
// CheckThrottler and FullStatus. Note we'll keep the clients open and close them upon Close() only.
// But that's OK because usually the tasks that use them are one-purpose only.
// The map is protected by the mutex.
mu sync.Mutex
rpcClientMap map[string]chan *tmc
mu sync.Mutex
rpcClientMap map[string]chan *tmc
rpcClientPoolMap map[string](chan *tmc)
rpcClientPoolRequests atomic.Int32
}

type dialer interface {
Expand All @@ -109,6 +112,7 @@ type dialer interface {

type poolDialer interface {
dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error)
dialPoolPrivate(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, func(error), error)
}

// Client implements tmclient.TabletManagerClient.
Expand Down Expand Up @@ -151,6 +155,69 @@ func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) (

return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil
}
func (client *grpcClient) dialPoolPrivate(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, func(error), error) {
addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"]))
opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name)
if err != nil {
return nil, nil, err
}
client.mu.Lock()
defer client.mu.Unlock()

if client.rpcClientPoolMap == nil {
client.rpcClientPoolMap = make(map[string](chan *tmc))
}

createNewPooledClient := func() (*tmc, error) {
cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt)
if err != nil {
return nil, err
}
return &tmc{
cc: cc,
client: tabletmanagerservicepb.NewTabletManagerClient(cc),
}, nil
}
pool, ok := client.rpcClientPoolMap[addr]
if !ok {
// One time population of the pool
client.rpcClientPoolRequests.Add(int32(concurrency))
pool = make(chan *tmc, concurrency)
client.rpcClientPoolMap[addr] = pool
}
for client.rpcClientPoolRequests.Load() > 0 {
tmClient, err := createNewPooledClient()
if err != nil {
return nil, nil, err
}
pool <- tmClient
client.rpcClientPoolRequests.Add(-1)
}
var tmClient *tmc
var recycle func(error)
if len(pool) == 0 {
// By choice, if the pool is empty, we do not block. We instead create a new
// client on the fly. Since this client was not part of the pool, it will also not return to the pool.
var err error
tmClient, err = createNewPooledClient()
if err != nil {
return nil, nil, err
}
} else {
tmClient = <-pool
recycle = func(err error) {
if err == nil {
pool <- tmClient
} else {
// The connection had an error. We will close it and not return it to the pool
tmClient.cc.Close()
// Indicate that the pool needs to be populated with another client:
client.rpcClientPoolRequests.Add(1)
}
}
}
return tmClient.client, recycle, nil
}

func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) {
addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"]))
Expand Down Expand Up @@ -575,9 +642,10 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb.
// and dialing the other tablet every time is not practical.
func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) {
var c tabletmanagerservicepb.TabletManagerClient
var recycle func(error)
var err error
if poolDialer, ok := client.dialer.(poolDialer); ok {
c, err = poolDialer.dialPool(ctx, tablet)
c, recycle, err = poolDialer.dialPoolPrivate(ctx, tablet)
if err != nil {
return nil, err
}
Expand All @@ -593,6 +661,9 @@ func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet)
}

response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{})
if recycle != nil {
defer recycle(err)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1065,9 +1136,10 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req
// and dialing the other tablet every time is not practical.
func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
var c tabletmanagerservicepb.TabletManagerClient
var recycle func(error)
var err error
if poolDialer, ok := client.dialer.(poolDialer); ok {
c, err = poolDialer.dialPool(ctx, tablet)
c, recycle, err = poolDialer.dialPoolPrivate(ctx, tablet)
if err != nil {
return nil, err
}
Expand All @@ -1083,6 +1155,9 @@ func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tab
}

response, err := c.CheckThrottler(ctx, req)
if recycle != nil {
defer recycle(err)
}
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 48485a3

Please sign in to comment.