Skip to content

Commit

Permalink
specialized dialPool
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Mar 24, 2024
1 parent 8035926 commit 0a9ba36
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type tmc struct {
}

// rpcClientMap maps an address to a tmc
type rpcClientMap map[string]*tmc
type rpcClientMap map[string](chan *tmc)

// grpcClient implements both dialer and poolDialer.
type grpcClient struct {
Expand Down Expand Up @@ -190,24 +190,31 @@ func (client *grpcClient) dialPool(ctx context.Context, dialPoolGroup DialPoolGr
}
client.mu.Lock()
defer client.mu.Unlock()
m[addr].cc.Close()
for tm := range m[addr] {
tm.cc.Close()
}
close(m[addr])
delete(m, addr)
}

if tm, ok := m[addr]; ok {
return tm.client, invalidator, nil
}

cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt)
if err != nil {
return nil, nil, err
}
tm := &tmc{
cc: cc,
client: tabletmanagerservicepb.NewTabletManagerClient(cc),
if _, ok := m[addr]; !ok {
c := make(chan *tmc, concurrency)
for range cap(m[addr]) {
cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt)
if err != nil {
return nil, nil, err
}
tm := &tmc{
cc: cc,
client: tabletmanagerservicepb.NewTabletManagerClient(cc),
}
c <- tm
}
m[addr] = c
}
m[addr] = tm

c := m[addr]
tm := <-c
c <- tm
return tm.client, invalidator, nil
}

Expand All @@ -216,8 +223,11 @@ func (client *grpcClient) Close() {
client.mu.Lock()
defer client.mu.Unlock()
for _, m := range client.rpcClientMaps {
for _, tm := range m {
tm.cc.Close()
for _, c := range m {
for tm := range c {
tm.cc.Close()
}
close(c)
}
}
client.rpcClientMaps = nil
Expand Down

0 comments on commit 0a9ba36

Please sign in to comment.