Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
fix race issuers in PeerClient
Browse files Browse the repository at this point in the history
  • Loading branch information
miparnisari committed Feb 20, 2024
1 parent 6f1e32a commit 498acea
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 76 deletions.
8 changes: 1 addition & 7 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,6 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
s.log.WithField("peers", peerInfo).Debug("peers updated")

// Shutdown any old peers we no longer need
ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout)
defer cancel()

var shutdownPeers []*PeerClient
for _, peer := range oldLocalPicker.Peers() {
if peerInfo := s.conf.LocalPicker.GetByPeerInfo(peer.Info()); peerInfo == nil {
Expand All @@ -649,10 +646,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
for _, p := range shutdownPeers {
wg.Run(func(obj interface{}) error {
pc := obj.(*PeerClient)
err := pc.Shutdown(ctx)
if err != nil {
s.log.WithError(err).WithField("peer", pc).Error("while shutting down peer")
}
pc.Shutdown()
return nil
}, p)
}
Expand Down
130 changes: 63 additions & 67 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ const (
)

type PeerClient struct {
client PeersV1Client
conn *grpc.ClientConn
conf PeerConfig
queue chan *request
lastErrs *collections.LRUCache

mutex sync.RWMutex // This mutex is for verifying the closing state of the client
status peerStatus // Keep the current status of the peer
wg sync.WaitGroup // This wait group is to monitor the number of in-flight requests
clientMutex sync.RWMutex
client PeersV1Client // GUARDED_BY(clientMutex)
conn *grpc.ClientConn // GUARDED_BY(clientMutex)
conf PeerConfig
queue chan *request
lastErrs *collections.LRUCache

statusMutex sync.RWMutex
status peerStatus // Keep the current status of the peer. GUARDED_BY(statusMutex)
wgMutex sync.RWMutex
wg sync.WaitGroup // Monitor the number of in-flight requests. GUARDED_BY(wgMutex)
}

type response struct {
Expand Down Expand Up @@ -95,8 +97,8 @@ func NewPeerClient(conf PeerConfig) *PeerClient {

// Connect establishes a GRPC connection to a peer
func (c *PeerClient) connect(ctx context.Context) (err error) {
// NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and
// handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected.
// NOTE: To future self, this statusMutex is used here because we need to know if the peer is disconnecting and
// handle ErrClosing. Since this statusMutex MUST be here we take this opportunity to also see if we are connected.
// Doing this here encapsulates managing the connected state to the PeerClient struct. Previously a PeerClient
// was connected when `NewPeerClient()` was called however, when adding support for multi data centers having a
// PeerClient connected to every Peer in every data center continuously is not desirable.
Expand All @@ -105,28 +107,20 @@ func (c *PeerClient) connect(ctx context.Context) (err error) {
defer funcTimer.ObserveDuration()
lockTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.connect_RLock"))

c.mutex.RLock()
c.statusMutex.RLock()
lockTimer.ObserveDuration()

if c.status == peerClosing {
c.mutex.RUnlock()
c.statusMutex.RUnlock()
return &PeerErr{err: errors.New("already disconnecting")}
}

if c.status == peerNotConnected {
// This mutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we
// This statusMutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we
// actually need to connect uses a full Lock(), using RLock() most of which should reduce the over head
// of a full lock on every call

// Yield the read lock so we can get the RW lock
c.mutex.RUnlock()
c.mutex.Lock()
defer c.mutex.Unlock()

// Now that we have the RW lock, ensure no else got here ahead of us.
if c.status == peerConnected {
return nil
}
c.statusMutex.RUnlock()

// Setup OpenTelemetry interceptor to propagate spans.
var opts []grpc.DialOption
Expand All @@ -144,19 +138,25 @@ func (c *PeerClient) connect(ctx context.Context) (err error) {
}

var err error
c.clientMutex.Lock()
c.conn, err = grpc.Dial(c.conf.Info.GRPCAddress, opts...)
if err != nil {
c.clientMutex.Unlock()
return c.setLastErr(&PeerErr{err: errors.Wrapf(err, "failed to dial peer %s", c.conf.Info.GRPCAddress)})
}
c.client = NewPeersV1Client(c.conn)
c.clientMutex.Unlock()

c.statusMutex.Lock()
c.status = peerConnected
c.statusMutex.Unlock()

if !c.conf.Behavior.DisableBatching {
go c.runBatch()
}
return nil
}
c.mutex.RUnlock()
c.statusMutex.RUnlock()
return nil
}

Expand Down Expand Up @@ -213,17 +213,17 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits
return nil, c.setLastErr(err)
}

// NOTE: This must be done within the RLock since calling Wait() in Shutdown() causes
// NOTE: This must be done within the Lock since calling Wait() in Shutdown() causes
// a race condition if called within a separate go routine if the internal wg is `0`
// when Wait() is called then Add(1) is called concurrently.
c.mutex.RLock()
c.wgMutex.Lock()
c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
defer c.wg.Done()
}()
c.wgMutex.Unlock()
defer c.wg.Done()

c.clientMutex.RLock()
resp, err = c.client.GetPeerRateLimits(ctx, r)
c.clientMutex.RUnlock()
if err != nil {
err = errors.Wrap(err, "Error in client.GetPeerRateLimits")
// metricCheckErrorCounter is updated within client.GetPeerRateLimits().
Expand All @@ -246,14 +246,14 @@ func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals
}

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
c.wgMutex.Lock()
c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
defer c.wg.Done()
}()
c.wgMutex.Unlock()
defer c.wg.Done()

c.clientMutex.RLock()
resp, err = c.client.UpdatePeerGlobals(ctx, r)
c.clientMutex.RUnlock()
if err != nil {
_ = c.setLastErr(err)
}
Expand Down Expand Up @@ -301,12 +301,13 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
return nil, c.setLastErr(err)
}

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
c.statusMutex.RLock()
if c.status == peerClosing {
c.statusMutex.RUnlock()
err := &PeerErr{err: errors.New("already disconnecting")}
return nil, c.setLastErr(err)
}
c.statusMutex.RUnlock()

// Wait for a response or context cancel
req := request{
Expand All @@ -315,6 +316,11 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
request: r,
}

c.wgMutex.Lock()
c.wg.Add(1)
c.wgMutex.Unlock()
defer c.wg.Done()

// Enqueue the request to be sent
peerAddr := c.Info().GRPCAddress
metricBatchQueueLength.WithLabelValues(peerAddr).Set(float64(len(c.queue)))
Expand All @@ -326,12 +332,6 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
return nil, errors.Wrap(ctx.Err(), "Context error while enqueuing request")
}

c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
c.wg.Done()
}()

select {
case re := <-req.resp:
if re.err != nil {
Expand All @@ -344,7 +344,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
}
}

// run processes batching requests by waiting for requests to be queued. Send
// runBatch processes batching requests by waiting for requests to be queued. Send
// the queue as a batch when either c.batchWait time has elapsed or the queue
// reaches c.batchLimit.
func (c *PeerClient) runBatch() {
Expand Down Expand Up @@ -426,11 +426,12 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
prop.Inject(r.ctx, &MetadataCarrier{Map: r.request.Metadata})
req.Requests = append(req.Requests, r.request)
tracing.EndScope(r.ctx, nil)

}

timeoutCtx, timeoutCancel := context.WithTimeout(ctx, c.conf.Behavior.BatchTimeout)
c.clientMutex.RLock()
resp, err := c.client.GetPeerRateLimits(timeoutCtx, &req)
c.clientMutex.RUnlock()
timeoutCancel()

// An error here indicates the entire request failed
Expand Down Expand Up @@ -470,40 +471,35 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
}
}

// Shutdown will gracefully shutdown the client connection, until the context is cancelled
func (c *PeerClient) Shutdown(ctx context.Context) error {
// Take the write lock since we're going to modify the closing state
c.mutex.Lock()
// Shutdown waits until outstanding requests have finished
func (c *PeerClient) Shutdown() {
c.statusMutex.RLock()
if c.status == peerClosing || c.status == peerNotConnected {
c.mutex.Unlock()
return nil
c.statusMutex.RUnlock()
return
}
defer c.mutex.Unlock()

c.statusMutex.RUnlock()

c.statusMutex.Lock()
c.status = peerClosing
c.statusMutex.Unlock()

defer func() {
c.clientMutex.Lock()
if c.conn != nil {
c.conn.Close()
}
c.clientMutex.Unlock()
}()

// This allows us to wait on the waitgroup, or until the context
// has been cancelled. This doesn't leak goroutines, because
// closing the connection will kill any outstanding requests.
waitChan := make(chan struct{})
go func() {
c.wg.Wait()
close(c.queue)
close(waitChan)
}()
// drain in-flight requests
c.wgMutex.RLock()
c.wg.Wait()
c.wgMutex.RUnlock()

select {
case <-ctx.Done():
return ctx.Err()
case <-waitChan:
return nil
}
// no more items will be sent
close(c.queue)
}

// PeerErr is returned if the peer is not connected or is in a closing state
Expand Down
3 changes: 1 addition & 2 deletions peer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ func TestPeerClientShutdown(t *testing.T) {
// yield the processor that way we allow other goroutines to start their request
runtime.Gosched()

err := client.Shutdown(context.Background())
assert.NoError(t, err)
client.Shutdown()

wg.Wait()
})
Expand Down

0 comments on commit 498acea

Please sign in to comment.