Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

fix: mutex deadlocks in PeerClient #223

Merged
merged 23 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,6 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
s.log.WithField("peers", peerInfo).Debug("peers updated")

// Shutdown any old peers we no longer need
ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout)
defer cancel()

var shutdownPeers []*PeerClient
for _, peer := range oldLocalPicker.Peers() {
if peerInfo := s.conf.LocalPicker.GetByPeerInfo(peer.Info()); peerInfo == nil {
Expand All @@ -649,10 +646,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
for _, p := range shutdownPeers {
wg.Run(func(obj interface{}) error {
pc := obj.(*PeerClient)
err := pc.Shutdown(ctx)
if err != nil {
s.log.WithError(err).WithField("peer", pc).Error("while shutting down peer")
}
pc.Shutdown()
return nil
}, p)
}
Expand Down
141 changes: 65 additions & 76 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ const (
)

type PeerClient struct {
client PeersV1Client
conn *grpc.ClientConn
conf PeerConfig
queue chan *request
lastErrs *collections.LRUCache

mutex sync.RWMutex // This mutex is for verifying the closing state of the client
status peerStatus // Keep the current status of the peer
wg sync.WaitGroup // This wait group is to monitor the number of in-flight requests
clientMutex sync.RWMutex
client PeersV1Client // GUARDED_BY(clientMutex)
conn *grpc.ClientConn // GUARDED_BY(clientMutex)
conf PeerConfig
queue chan *request
lastErrs *collections.LRUCache

statusMutex sync.RWMutex
status peerStatus // Keep the current status of the peer. GUARDED_BY(statusMutex)
wgMutex sync.RWMutex
wg sync.WaitGroup // Monitor the number of in-flight requests. GUARDED_BY(wgMutex)
}

type response struct {
Expand Down Expand Up @@ -93,7 +95,8 @@ func NewPeerClient(conf PeerConfig) *PeerClient {
}
}

// Connect establishes a GRPC connection to a peer
// Connect tries to establish a GRPC connection to a peer.
// If the peer is shutting down it returns an error.
func (c *PeerClient) connect(ctx context.Context) (err error) {
// NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and
// handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected.
Expand All @@ -105,11 +108,11 @@ func (c *PeerClient) connect(ctx context.Context) (err error) {
defer funcTimer.ObserveDuration()
lockTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.connect_RLock"))

c.mutex.RLock()
c.statusMutex.RLock()
lockTimer.ObserveDuration()

if c.status == peerClosing {
c.mutex.RUnlock()
c.statusMutex.RUnlock()
return &PeerErr{err: errors.New("already disconnecting")}
}

Expand All @@ -118,15 +121,7 @@ func (c *PeerClient) connect(ctx context.Context) (err error) {
// actually need to connect uses a full Lock(), using RLock() most of which should reduce the over head
// of a full lock on every call

// Yield the read lock so we can get the RW lock
c.mutex.RUnlock()
c.mutex.Lock()
defer c.mutex.Unlock()

// Now that we have the RW lock, ensure no else got here ahead of us.
if c.status == peerConnected {
return nil
}
c.statusMutex.RUnlock()

// Setup OpenTelemetry interceptor to propagate spans.
var opts []grpc.DialOption
Expand All @@ -144,19 +139,25 @@ func (c *PeerClient) connect(ctx context.Context) (err error) {
}

var err error
c.clientMutex.Lock()
c.conn, err = grpc.Dial(c.conf.Info.GRPCAddress, opts...)
if err != nil {
c.clientMutex.Unlock()
return c.setLastErr(&PeerErr{err: errors.Wrapf(err, "failed to dial peer %s", c.conf.Info.GRPCAddress)})
}
c.client = NewPeersV1Client(c.conn)
miparnisari marked this conversation as resolved.
Show resolved Hide resolved
c.clientMutex.Unlock()

c.statusMutex.Lock()
c.status = peerConnected
c.statusMutex.Unlock()

if !c.conf.Behavior.DisableBatching {
go c.runBatch()
}
return nil
}
c.mutex.RUnlock()
c.statusMutex.RUnlock()
return nil
}

Expand Down Expand Up @@ -213,17 +214,17 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits
return nil, c.setLastErr(err)
}

// NOTE: This must be done within the RLock since calling Wait() in Shutdown() causes
// NOTE: This must be done within the Lock since calling Wait() in Shutdown() causes
// a race condition if called within a separate go routine if the internal wg is `0`
// when Wait() is called then Add(1) is called concurrently.
c.mutex.RLock()
c.wgMutex.Lock()
defer c.wgMutex.Unlock() // unlock at the very end of this function
c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
defer c.wg.Done()
}()
defer c.wg.Done()

c.clientMutex.RLock()
resp, err = c.client.GetPeerRateLimits(ctx, r)
c.clientMutex.RUnlock()
if err != nil {
err = errors.Wrap(err, "Error in client.GetPeerRateLimits")
// metricCheckErrorCounter is updated within client.GetPeerRateLimits().
Expand All @@ -246,14 +247,14 @@ func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals
}

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
c.wgMutex.Lock()
defer c.wgMutex.Unlock() // unlock at the very end of this function
c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
defer c.wg.Done()
}()
defer c.wg.Done()

c.clientMutex.RLock()
resp, err = c.client.UpdatePeerGlobals(ctx, r)
c.clientMutex.RUnlock()
if err != nil {
_ = c.setLastErr(err)
}
Expand Down Expand Up @@ -301,20 +302,17 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
return nil, c.setLastErr(err)
}

// See NOTE above about RLock and wg.Add(1)
c.mutex.RLock()
if c.status == peerClosing {
err := &PeerErr{err: errors.New("already disconnecting")}
return nil, c.setLastErr(err)
}

// Wait for a response or context cancel
req := request{
resp: make(chan *response, 1),
ctx: ctx,
request: r,
}

c.wgMutex.Lock()
defer c.wgMutex.Unlock() // unlock at the very end of this function
c.wg.Add(1)
defer c.wg.Done()

// Enqueue the request to be sent
peerAddr := c.Info().GRPCAddress
metricBatchQueueLength.WithLabelValues(peerAddr).Set(float64(len(c.queue)))
Expand All @@ -326,12 +324,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
return nil, errors.Wrap(ctx.Err(), "Context error while enqueuing request")
}

c.wg.Add(1)
defer func() {
c.mutex.RUnlock()
c.wg.Done()
}()

// Wait for a response or context cancel
select {
case re := <-req.resp:
if re.err != nil {
Expand All @@ -344,7 +337,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
}
}

// run processes batching requests by waiting for requests to be queued. Send
// runBatch processes batching requests by waiting for requests to be queued. Send
// the queue as a batch when either c.batchWait time has elapsed or the queue
// reaches c.batchLimit.
func (c *PeerClient) runBatch() {
Expand Down Expand Up @@ -426,11 +419,12 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
prop.Inject(r.ctx, &MetadataCarrier{Map: r.request.Metadata})
req.Requests = append(req.Requests, r.request)
tracing.EndScope(r.ctx, nil)

}

timeoutCtx, timeoutCancel := context.WithTimeout(ctx, c.conf.Behavior.BatchTimeout)
c.clientMutex.RLock()
resp, err := c.client.GetPeerRateLimits(timeoutCtx, &req)
c.clientMutex.RUnlock()
timeoutCancel()

// An error here indicates the entire request failed
Expand Down Expand Up @@ -470,40 +464,35 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
}
}

// Shutdown will gracefully shutdown the client connection, until the context is cancelled
func (c *PeerClient) Shutdown(ctx context.Context) error {
// Take the write lock since we're going to modify the closing state
c.mutex.Lock()
// Shutdown waits until outstanding requests have finished
func (c *PeerClient) Shutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutdown/Close methods should always accept and context.Cancel as there may be some unknown or unforeseen reason the close cannot complete and it then hangs forever. This has happened in production on other services and it's always good to put a limit on how long you will wait for a shutdown to complete. This protects the service from having to be killed by the orchestration system without successfully saving it's internal state. Shutting down a connection is a low priority thing, if for some reason the workgroup Wait() hangs forever, then we can cancel the shutdown wait, and not interfere with normal shutdown of any service that uses gubernator as a library.

Copy link
Contributor Author

@miparnisari miparnisari Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never seen a shutdown function accept a context. E.g. sql.Close, or connection.Close.

The context object is meant to be used when there is the potential chance that the request is cancelled by the user. This is not the case, Shutdown is invoked internally, not by a client. We should not use the context as a crutch to fix deadlock issues.
I can refactor this to avoid mutex altogether, but I would like to know if the work would be useful (given the v3 release)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few things in the works. We are moving development to a new repo which will not be associated with the mailgun GitHub organization. After that, the plan is to bump to a v3 release, drop GRPC support and continue development there.

But first we want to merge all these open PRs.

I had hoped we could finish merging last week, as I'm traveling this week, but hopefully we can get it all done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to add, Mailgun plans to abandon this repo in favor of @thrawn01's new repo. I suppose you can continue using the /v2 flavor with the new repo until ready to migrate to /v3.

Either way, it looks like /v2 will be deprecated very soon and gRPC support along with it. I suggest not investing too much time into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! I can lend a hand in the new repo 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of interest, what was the biggest factor for the decision to drop gRPC?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of interest, what was the biggest factor for the decision to drop gRPC?

@thrawn01 and I had some big conversations about this. Reduced complexity and improved performance. HTTP can perform more efficiently at scale than gRPC if done right.

This would be a good time for @thrawn01 to mention his work on Duh-RPC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP can perform more efficiently at scale than gRPC if done right.

What's the source for this? My understanding was this this in fact the opposite 😄 https://aws.amazon.com/compare/the-difference-between-grpc-and-rest/

A gRPC API is better for these use cases:
High-performance systems
High data loads
Real-time or streaming applications

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another source: https://github.com/grpc-ecosystem/grpc-gateway#background

gRPC is great -- it generates API clients and server stubs in many programming languages, it is fast, easy-to-use, bandwidth-efficient and its design is combat-proven by Google. However, you might still want to provide a traditional RESTful JSON API as well. Reasons can range from maintaining backward-compatibility, supporting languages or clients that are not well supported by gRPC, to simply maintaining the aesthetics and tooling involved with a RESTful JSON architecture.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to publish some more formal information about our GRPC journey at Mailgun. But as a quick overview, we are doing some very high performance and large scale stuff with GRPC and it hasn't lived up to all the hype.

In fact we have discovered that GRPC is slightly slower than using plan old HTTP/1 !!!!!!

As a result, I started working on a project called DUH-RPC, which is still in the works, but you can see the benchmarks comparing HTTP/1 with GRPC here https://github.com/duh-rpc/duh-go-benchmarks

The spec I'm working on is in progress but is here, feed back is welcome.
https://github.com/duh-rpc/duh-go

I'll be traveling for the next 2 weeks, and I'm not always near my laptop, so I don't expect much movement on these projects until after. I still get notifications for these conversations, so I'll try to reply as I can.

c.statusMutex.RLock()
if c.status == peerClosing || c.status == peerNotConnected {
c.mutex.Unlock()
return nil
c.statusMutex.RUnlock()
return
}
defer c.mutex.Unlock()

c.statusMutex.RUnlock()

c.statusMutex.Lock()
// unlock at the very end of this function so that new requests don't try to send through a closed channel
defer c.statusMutex.Unlock()
c.status = peerClosing

defer func() {
if c.conn != nil {
c.conn.Close()
}
}()

// This allows us to wait on the waitgroup, or until the context
// has been cancelled. This doesn't leak goroutines, because
// closing the connection will kill any outstanding requests.
waitChan := make(chan struct{})
go func() {
c.wg.Wait()
close(c.queue)
close(waitChan)
}()
// drain in-flight requests
c.wgMutex.Lock()
defer c.wgMutex.Unlock()
c.wg.Wait()

select {
case <-ctx.Done():
return ctx.Err()
case <-waitChan:
return nil
// no more items will be sent
close(c.queue)

// close the client connection
c.clientMutex.Lock()
if c.conn != nil {
c.conn.Close()
}
c.clientMutex.Unlock()
}

// PeerErr is returned if the peer is not connected or is in a closing state
Expand Down
3 changes: 1 addition & 2 deletions peer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ func TestPeerClientShutdown(t *testing.T) {
// yield the processor that way we allow other goroutines to start their request
runtime.Gosched()

err := client.Shutdown(context.Background())
assert.NoError(t, err)
client.Shutdown()

wg.Wait()
})
Expand Down
Loading