From 498acea568f7c80d37c3a64ee20e3e0e36d1cc60 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Tue, 20 Feb 2024 00:40:17 -0300 Subject: [PATCH 01/21] fix race issuers in PeerClient --- gubernator.go | 8 +-- peer_client.go | 130 +++++++++++++++++++++----------------------- peer_client_test.go | 3 +- 3 files changed, 65 insertions(+), 76 deletions(-) diff --git a/gubernator.go b/gubernator.go index 59c26eca..9b892b92 100644 --- a/gubernator.go +++ b/gubernator.go @@ -627,9 +627,6 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { s.log.WithField("peers", peerInfo).Debug("peers updated") // Shutdown any old peers we no longer need - ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout) - defer cancel() - var shutdownPeers []*PeerClient for _, peer := range oldLocalPicker.Peers() { if peerInfo := s.conf.LocalPicker.GetByPeerInfo(peer.Info()); peerInfo == nil { @@ -649,10 +646,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { for _, p := range shutdownPeers { wg.Run(func(obj interface{}) error { pc := obj.(*PeerClient) - err := pc.Shutdown(ctx) - if err != nil { - s.log.WithError(err).WithField("peer", pc).Error("while shutting down peer") - } + pc.Shutdown() return nil }, p) } diff --git a/peer_client.go b/peer_client.go index a39d9f02..db96c4a9 100644 --- a/peer_client.go +++ b/peer_client.go @@ -54,15 +54,17 @@ const ( ) type PeerClient struct { - client PeersV1Client - conn *grpc.ClientConn - conf PeerConfig - queue chan *request - lastErrs *collections.LRUCache - - mutex sync.RWMutex // This mutex is for verifying the closing state of the client - status peerStatus // Keep the current status of the peer - wg sync.WaitGroup // This wait group is to monitor the number of in-flight requests + clientMutex sync.RWMutex + client PeersV1Client // GUARDED_BY(clientMutex) + conn *grpc.ClientConn // GUARDED_BY(clientMutex) + conf PeerConfig + queue chan *request + lastErrs *collections.LRUCache + + statusMutex sync.RWMutex + status peerStatus // Keep the current status of the peer. GUARDED_BY(statusMutex) + wgMutex sync.RWMutex + wg sync.WaitGroup // Monitor the number of in-flight requests. GUARDED_BY(wgMutex) } type response struct { @@ -95,8 +97,8 @@ func NewPeerClient(conf PeerConfig) *PeerClient { // Connect establishes a GRPC connection to a peer func (c *PeerClient) connect(ctx context.Context) (err error) { - // NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and - // handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected. + // NOTE: To future self, this statusMutex is used here because we need to know if the peer is disconnecting and + // handle ErrClosing. Since this statusMutex MUST be here we take this opportunity to also see if we are connected. // Doing this here encapsulates managing the connected state to the PeerClient struct. Previously a PeerClient // was connected when `NewPeerClient()` was called however, when adding support for multi data centers having a // PeerClient connected to every Peer in every data center continuously is not desirable. @@ -105,28 +107,20 @@ func (c *PeerClient) connect(ctx context.Context) (err error) { defer funcTimer.ObserveDuration() lockTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.connect_RLock")) - c.mutex.RLock() + c.statusMutex.RLock() lockTimer.ObserveDuration() if c.status == peerClosing { - c.mutex.RUnlock() + c.statusMutex.RUnlock() return &PeerErr{err: errors.New("already disconnecting")} } if c.status == peerNotConnected { - // This mutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we + // This statusMutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we // actually need to connect uses a full Lock(), using RLock() most of which should reduce the over head // of a full lock on every call - // Yield the read lock so we can get the RW lock - c.mutex.RUnlock() - c.mutex.Lock() - defer c.mutex.Unlock() - - // Now that we have the RW lock, ensure no else got here ahead of us. - if c.status == peerConnected { - return nil - } + c.statusMutex.RUnlock() // Setup OpenTelemetry interceptor to propagate spans. var opts []grpc.DialOption @@ -144,19 +138,25 @@ func (c *PeerClient) connect(ctx context.Context) (err error) { } var err error + c.clientMutex.Lock() c.conn, err = grpc.Dial(c.conf.Info.GRPCAddress, opts...) if err != nil { + c.clientMutex.Unlock() return c.setLastErr(&PeerErr{err: errors.Wrapf(err, "failed to dial peer %s", c.conf.Info.GRPCAddress)}) } c.client = NewPeersV1Client(c.conn) + c.clientMutex.Unlock() + + c.statusMutex.Lock() c.status = peerConnected + c.statusMutex.Unlock() if !c.conf.Behavior.DisableBatching { go c.runBatch() } return nil } - c.mutex.RUnlock() + c.statusMutex.RUnlock() return nil } @@ -213,17 +213,17 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits return nil, c.setLastErr(err) } - // NOTE: This must be done within the RLock since calling Wait() in Shutdown() causes + // NOTE: This must be done within the Lock since calling Wait() in Shutdown() causes // a race condition if called within a separate go routine if the internal wg is `0` // when Wait() is called then Add(1) is called concurrently. - c.mutex.RLock() + c.wgMutex.Lock() c.wg.Add(1) - defer func() { - c.mutex.RUnlock() - defer c.wg.Done() - }() + c.wgMutex.Unlock() + defer c.wg.Done() + c.clientMutex.RLock() resp, err = c.client.GetPeerRateLimits(ctx, r) + c.clientMutex.RUnlock() if err != nil { err = errors.Wrap(err, "Error in client.GetPeerRateLimits") // metricCheckErrorCounter is updated within client.GetPeerRateLimits(). @@ -246,14 +246,14 @@ func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals } // See NOTE above about RLock and wg.Add(1) - c.mutex.RLock() + c.wgMutex.Lock() c.wg.Add(1) - defer func() { - c.mutex.RUnlock() - defer c.wg.Done() - }() + c.wgMutex.Unlock() + defer c.wg.Done() + c.clientMutex.RLock() resp, err = c.client.UpdatePeerGlobals(ctx, r) + c.clientMutex.RUnlock() if err != nil { _ = c.setLastErr(err) } @@ -301,12 +301,13 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq return nil, c.setLastErr(err) } - // See NOTE above about RLock and wg.Add(1) - c.mutex.RLock() + c.statusMutex.RLock() if c.status == peerClosing { + c.statusMutex.RUnlock() err := &PeerErr{err: errors.New("already disconnecting")} return nil, c.setLastErr(err) } + c.statusMutex.RUnlock() // Wait for a response or context cancel req := request{ @@ -315,6 +316,11 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq request: r, } + c.wgMutex.Lock() + c.wg.Add(1) + c.wgMutex.Unlock() + defer c.wg.Done() + // Enqueue the request to be sent peerAddr := c.Info().GRPCAddress metricBatchQueueLength.WithLabelValues(peerAddr).Set(float64(len(c.queue))) @@ -326,12 +332,6 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq return nil, errors.Wrap(ctx.Err(), "Context error while enqueuing request") } - c.wg.Add(1) - defer func() { - c.mutex.RUnlock() - c.wg.Done() - }() - select { case re := <-req.resp: if re.err != nil { @@ -344,7 +344,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq } } -// run processes batching requests by waiting for requests to be queued. Send +// runBatch processes batching requests by waiting for requests to be queued. Send // the queue as a batch when either c.batchWait time has elapsed or the queue // reaches c.batchLimit. func (c *PeerClient) runBatch() { @@ -426,11 +426,12 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { prop.Inject(r.ctx, &MetadataCarrier{Map: r.request.Metadata}) req.Requests = append(req.Requests, r.request) tracing.EndScope(r.ctx, nil) - } timeoutCtx, timeoutCancel := context.WithTimeout(ctx, c.conf.Behavior.BatchTimeout) + c.clientMutex.RLock() resp, err := c.client.GetPeerRateLimits(timeoutCtx, &req) + c.clientMutex.RUnlock() timeoutCancel() // An error here indicates the entire request failed @@ -470,40 +471,35 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { } } -// Shutdown will gracefully shutdown the client connection, until the context is cancelled -func (c *PeerClient) Shutdown(ctx context.Context) error { - // Take the write lock since we're going to modify the closing state - c.mutex.Lock() +// Shutdown waits until outstanding requests have finished +func (c *PeerClient) Shutdown() { + c.statusMutex.RLock() if c.status == peerClosing || c.status == peerNotConnected { - c.mutex.Unlock() - return nil + c.statusMutex.RUnlock() + return } - defer c.mutex.Unlock() + c.statusMutex.RUnlock() + + c.statusMutex.Lock() c.status = peerClosing + c.statusMutex.Unlock() defer func() { + c.clientMutex.Lock() if c.conn != nil { c.conn.Close() } + c.clientMutex.Unlock() }() - // This allows us to wait on the waitgroup, or until the context - // has been cancelled. This doesn't leak goroutines, because - // closing the connection will kill any outstanding requests. - waitChan := make(chan struct{}) - go func() { - c.wg.Wait() - close(c.queue) - close(waitChan) - }() + // drain in-flight requests + c.wgMutex.RLock() + c.wg.Wait() + c.wgMutex.RUnlock() - select { - case <-ctx.Done(): - return ctx.Err() - case <-waitChan: - return nil - } + // no more items will be sent + close(c.queue) } // PeerErr is returned if the peer is not connected or is in a closing state diff --git a/peer_client_test.go b/peer_client_test.go index 99924bed..5ad80f99 100644 --- a/peer_client_test.go +++ b/peer_client_test.go @@ -91,8 +91,7 @@ func TestPeerClientShutdown(t *testing.T) { // yield the processor that way we allow other goroutines to start their request runtime.Gosched() - err := client.Shutdown(context.Background()) - assert.NoError(t, err) + client.Shutdown() wg.Wait() }) From 5c0e54efb5c7e4179d4d2da38878ac8a3d9e5700 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Tue, 20 Feb 2024 10:32:30 -0300 Subject: [PATCH 02/21] undo changes in comments --- peer_client.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/peer_client.go b/peer_client.go index db96c4a9..574a54c1 100644 --- a/peer_client.go +++ b/peer_client.go @@ -97,8 +97,8 @@ func NewPeerClient(conf PeerConfig) *PeerClient { // Connect establishes a GRPC connection to a peer func (c *PeerClient) connect(ctx context.Context) (err error) { - // NOTE: To future self, this statusMutex is used here because we need to know if the peer is disconnecting and - // handle ErrClosing. Since this statusMutex MUST be here we take this opportunity to also see if we are connected. + // NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and + // handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected. // Doing this here encapsulates managing the connected state to the PeerClient struct. Previously a PeerClient // was connected when `NewPeerClient()` was called however, when adding support for multi data centers having a // PeerClient connected to every Peer in every data center continuously is not desirable. @@ -116,7 +116,7 @@ func (c *PeerClient) connect(ctx context.Context) (err error) { } if c.status == peerNotConnected { - // This statusMutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we + // This mutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we // actually need to connect uses a full Lock(), using RLock() most of which should reduce the over head // of a full lock on every call @@ -485,14 +485,6 @@ func (c *PeerClient) Shutdown() { c.status = peerClosing c.statusMutex.Unlock() - defer func() { - c.clientMutex.Lock() - if c.conn != nil { - c.conn.Close() - } - c.clientMutex.Unlock() - }() - // drain in-flight requests c.wgMutex.RLock() c.wg.Wait() @@ -500,6 +492,13 @@ func (c *PeerClient) Shutdown() { // no more items will be sent close(c.queue) + + // close the client connection + c.clientMutex.Lock() + if c.conn != nil { + c.conn.Close() + } + c.clientMutex.Unlock() } // PeerErr is returned if the peer is not connected or is in a closing state From 2b940a86c32d514b97302c01459a2b6e4ac71091 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Tue, 20 Feb 2024 10:56:13 -0300 Subject: [PATCH 03/21] more fixes --- peer_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/peer_client.go b/peer_client.go index 574a54c1..d6bfc54b 100644 --- a/peer_client.go +++ b/peer_client.go @@ -217,8 +217,8 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits // a race condition if called within a separate go routine if the internal wg is `0` // when Wait() is called then Add(1) is called concurrently. c.wgMutex.Lock() + defer c.wgMutex.Unlock() // unlock at the very end of this function c.wg.Add(1) - c.wgMutex.Unlock() defer c.wg.Done() c.clientMutex.RLock() @@ -247,8 +247,8 @@ func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals // See NOTE above about RLock and wg.Add(1) c.wgMutex.Lock() + defer c.wgMutex.Unlock() // unlock at the very end of this function c.wg.Add(1) - c.wgMutex.Unlock() defer c.wg.Done() c.clientMutex.RLock() @@ -317,8 +317,8 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq } c.wgMutex.Lock() + defer c.wgMutex.Unlock() // unlock at the very end of this function c.wg.Add(1) - c.wgMutex.Unlock() defer c.wg.Done() // Enqueue the request to be sent @@ -486,9 +486,9 @@ func (c *PeerClient) Shutdown() { c.statusMutex.Unlock() // drain in-flight requests - c.wgMutex.RLock() + c.wgMutex.Lock() + defer c.wgMutex.Unlock() c.wg.Wait() - c.wgMutex.RUnlock() // no more items will be sent close(c.queue) From 747d3cdadcbcec83606713caf854e64b81c3b468 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Tue, 20 Feb 2024 11:26:50 -0300 Subject: [PATCH 04/21] final fixes --- peer_client.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/peer_client.go b/peer_client.go index d6bfc54b..b00f7f2b 100644 --- a/peer_client.go +++ b/peer_client.go @@ -95,7 +95,8 @@ func NewPeerClient(conf PeerConfig) *PeerClient { } } -// Connect establishes a GRPC connection to a peer +// Connect tries to establish a GRPC connection to a peer. +// If the peer is shutting down it returns an error. func (c *PeerClient) connect(ctx context.Context) (err error) { // NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and // handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected. @@ -301,15 +302,6 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq return nil, c.setLastErr(err) } - c.statusMutex.RLock() - if c.status == peerClosing { - c.statusMutex.RUnlock() - err := &PeerErr{err: errors.New("already disconnecting")} - return nil, c.setLastErr(err) - } - c.statusMutex.RUnlock() - - // Wait for a response or context cancel req := request{ resp: make(chan *response, 1), ctx: ctx, @@ -332,6 +324,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq return nil, errors.Wrap(ctx.Err(), "Context error while enqueuing request") } + // Wait for a response or context cancel select { case re := <-req.resp: if re.err != nil { @@ -482,8 +475,9 @@ func (c *PeerClient) Shutdown() { c.statusMutex.RUnlock() c.statusMutex.Lock() + // unlock at the very end of this function so that new requests don't try to send through a closed channel + defer c.statusMutex.Unlock() c.status = peerClosing - c.statusMutex.Unlock() // drain in-flight requests c.wgMutex.Lock() From 53a1627171db94edfd43691af089605219b17a83 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:15:05 -0300 Subject: [PATCH 05/21] do not create 1 connection every time --- benchmark_test.go | 5 +- functional_test.go | 3 +- global.go | 5 +- go.mod | 1 + go.sum | 1 + gubernator.go | 14 +++- peer_client.go | 191 ++++++++++---------------------------------- peer_client_test.go | 39 ++++----- 8 files changed, 83 insertions(+), 176 deletions(-) diff --git a/benchmark_test.go b/benchmark_test.go index 56d0fe57..5a383761 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -33,10 +33,13 @@ func BenchmarkServer(b *testing.B) { require.NoError(b, err, "Error in conf.SetDefaults") b.Run("GetPeerRateLimit() with no batching", func(b *testing.B) { - client := guber.NewPeerClient(guber.PeerConfig{ + client, err := guber.NewPeerClient(guber.PeerConfig{ Info: cluster.GetRandomPeer(cluster.DataCenterNone), Behavior: conf.Behaviors, }) + if err != nil { + b.Errorf("Error building client: %s", err) + } b.ResetTimer() diff --git a/functional_test.go b/functional_test.go index 246e837e..c0c32485 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1720,9 +1720,10 @@ func TestGRPCGateway(t *testing.T) { func TestGetPeerRateLimits(t *testing.T) { ctx := context.Background() - peerClient := guber.NewPeerClient(guber.PeerConfig{ + peerClient, err := guber.NewPeerClient(guber.PeerConfig{ Info: cluster.GetRandomPeer(cluster.DataCenterNone), }) + require.NoError(t, err) t.Run("Stable rate check request order", func(t *testing.T) { // Ensure response order matches rate check request order. diff --git a/global.go b/global.go index adbd8e44..3c3817c0 100644 --- a/global.go +++ b/global.go @@ -20,6 +20,7 @@ import ( "context" "github.com/mailgun/holster/v4/syncutil" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -247,8 +248,8 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] cancel() if err != nil { - // Skip peers that are not in a ready state - if !IsNotReady(err) { + // Only log if it's an unknown error + if !errors.Is(err, context.Canceled) && errors.Is(err, context.DeadlineExceeded) { gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress) } } diff --git a/go.mod b/go.mod index e32b4cd3..b2d14e33 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/otel/sdk v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 golang.org/x/net v0.18.0 + golang.org/x/sync v0.3.0 golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b google.golang.org/grpc v1.59.0 diff --git a/go.sum b/go.sum index 9b2e3287..6b25e570 100644 --- a/go.sum +++ b/go.sum @@ -579,6 +579,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/gubernator.go b/gubernator.go index f47e9fb4..3360152c 100644 --- a/gubernator.go +++ b/gubernator.go @@ -345,7 +345,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) { // Make an RPC call to the peer that owns this rate limit r, err := req.Peer.GetPeerRateLimit(ctx, req.Req) if err != nil { - if IsNotReady(err) { + if errors.Is(err, context.DeadlineExceeded) { attempts++ metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc() req.Peer, err = s.GetPeer(ctx, req.Key) @@ -599,13 +599,17 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { peer := s.conf.RegionPicker.GetByPeerInfo(info) // If we don't have an existing PeerClient create a new one if peer == nil { - peer = NewPeerClient(PeerConfig{ + var err error + peer, err = NewPeerClient(PeerConfig{ TraceGRPC: s.conf.PeerTraceGRPC, Behavior: s.conf.Behaviors, TLS: s.conf.PeerTLS, Log: s.log, Info: info, }) + if err != nil { + s.log.Error("error connecting to peer: %s", err) + } } regionPicker.Add(peer) continue @@ -613,13 +617,17 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { // If we don't have an existing PeerClient create a new one peer := s.conf.LocalPicker.GetByPeerInfo(info) if peer == nil { - peer = NewPeerClient(PeerConfig{ + var err error + peer, err = NewPeerClient(PeerConfig{ TraceGRPC: s.conf.PeerTraceGRPC, Behavior: s.conf.Behaviors, TLS: s.conf.PeerTLS, Log: s.log, Info: info, }) + if err != nil { + s.log.Error("error connecting to peer: %s", err) + } } localPicker.Add(peer) } diff --git a/peer_client.go b/peer_client.go index b00f7f2b..9dee7593 100644 --- a/peer_client.go +++ b/peer_client.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "fmt" "sync" + "sync/atomic" "github.com/mailgun/holster/v4/clock" "github.com/mailgun/holster/v4/collections" @@ -45,26 +46,16 @@ type PeerPicker interface { Add(*PeerClient) } -type peerStatus int - -const ( - peerNotConnected peerStatus = iota - peerConnected - peerClosing -) - type PeerClient struct { - clientMutex sync.RWMutex - client PeersV1Client // GUARDED_BY(clientMutex) - conn *grpc.ClientConn // GUARDED_BY(clientMutex) + client PeersV1Client + conn *grpc.ClientConn conf PeerConfig queue chan *request + queueClosed atomic.Bool lastErrs *collections.LRUCache - statusMutex sync.RWMutex - status peerStatus // Keep the current status of the peer. GUARDED_BY(statusMutex) - wgMutex sync.RWMutex - wg sync.WaitGroup // Monitor the number of in-flight requests. GUARDED_BY(wgMutex) + wgMutex sync.RWMutex + wg sync.WaitGroup // Monitor the number of in-flight requests. GUARDED_BY(wgMutex) } type response struct { @@ -86,79 +77,39 @@ type PeerConfig struct { TraceGRPC bool } -func NewPeerClient(conf PeerConfig) *PeerClient { - return &PeerClient{ +// NewPeerClient establishes a connection to a peer in a non-blocking fashion. +// If batching is enabled, it also starts a goroutine where batches will be processed. +func NewPeerClient(conf PeerConfig) (*PeerClient, error) { + peerClient := &PeerClient{ queue: make(chan *request, 1000), - status: peerNotConnected, conf: conf, lastErrs: collections.NewLRUCache(100), } -} - -// Connect tries to establish a GRPC connection to a peer. -// If the peer is shutting down it returns an error. -func (c *PeerClient) connect(ctx context.Context) (err error) { - // NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and - // handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected. - // Doing this here encapsulates managing the connected state to the PeerClient struct. Previously a PeerClient - // was connected when `NewPeerClient()` was called however, when adding support for multi data centers having a - // PeerClient connected to every Peer in every data center continuously is not desirable. - - funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.connect")) - defer funcTimer.ObserveDuration() - lockTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.connect_RLock")) - - c.statusMutex.RLock() - lockTimer.ObserveDuration() - - if c.status == peerClosing { - c.statusMutex.RUnlock() - return &PeerErr{err: errors.New("already disconnecting")} - } - - if c.status == peerNotConnected { - // This mutex stuff looks wonky, but it allows us to use RLock() 99% of the time, while the 1% where we - // actually need to connect uses a full Lock(), using RLock() most of which should reduce the over head - // of a full lock on every call - - c.statusMutex.RUnlock() - - // Setup OpenTelemetry interceptor to propagate spans. - var opts []grpc.DialOption + var opts []grpc.DialOption - if c.conf.TraceGRPC { - opts = []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), - } - } - - if c.conf.TLS != nil { - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(c.conf.TLS))) - } else { - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + if conf.TraceGRPC { + opts = []grpc.DialOption{ + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), } + } - var err error - c.clientMutex.Lock() - c.conn, err = grpc.Dial(c.conf.Info.GRPCAddress, opts...) - if err != nil { - c.clientMutex.Unlock() - return c.setLastErr(&PeerErr{err: errors.Wrapf(err, "failed to dial peer %s", c.conf.Info.GRPCAddress)}) - } - c.client = NewPeersV1Client(c.conn) - c.clientMutex.Unlock() + if conf.TLS != nil { + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(conf.TLS))) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } - c.statusMutex.Lock() - c.status = peerConnected - c.statusMutex.Unlock() + var err error + peerClient.conn, err = grpc.Dial(conf.Info.GRPCAddress, opts...) + if err != nil { + return nil, err + } + peerClient.client = NewPeersV1Client(peerClient.conn) - if !c.conf.Behavior.DisableBatching { - go c.runBatch() - } - return nil + if !conf.Behavior.DisableBatching { + go peerClient.runBatch() } - c.statusMutex.RUnlock() - return nil + return peerClient, nil } // Info returns PeerInfo struct that describes this PeerClient @@ -208,23 +159,15 @@ func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (res // GetPeerRateLimits requests a list of rate limit statuses from a peer func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) { - if err := c.connect(ctx); err != nil { - err = errors.Wrap(err, "Error in connect") - metricCheckErrorCounter.WithLabelValues("Connect error").Add(1) - return nil, c.setLastErr(err) - } - // NOTE: This must be done within the Lock since calling Wait() in Shutdown() causes // a race condition if called within a separate go routine if the internal wg is `0` // when Wait() is called then Add(1) is called concurrently. c.wgMutex.Lock() - defer c.wgMutex.Unlock() // unlock at the very end of this function c.wg.Add(1) + c.wgMutex.Unlock() defer c.wg.Done() - c.clientMutex.RLock() resp, err = c.client.GetPeerRateLimits(ctx, r) - c.clientMutex.RUnlock() if err != nil { err = errors.Wrap(err, "Error in client.GetPeerRateLimits") // metricCheckErrorCounter is updated within client.GetPeerRateLimits(). @@ -242,19 +185,14 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits // UpdatePeerGlobals sends global rate limit status updates to a peer func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (resp *UpdatePeerGlobalsResp, err error) { - if err := c.connect(ctx); err != nil { - return nil, c.setLastErr(err) - } // See NOTE above about RLock and wg.Add(1) c.wgMutex.Lock() - defer c.wgMutex.Unlock() // unlock at the very end of this function c.wg.Add(1) + c.wgMutex.Unlock() defer c.wg.Done() - c.clientMutex.RLock() resp, err = c.client.UpdatePeerGlobals(ctx, r) - c.clientMutex.RUnlock() if err != nil { _ = c.setLastErr(err) } @@ -297,11 +235,6 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.getPeerRateLimitsBatch")) defer funcTimer.ObserveDuration() - if err := c.connect(ctx); err != nil { - err = errors.Wrap(err, "Error in connect") - return nil, c.setLastErr(err) - } - req := request{ resp: make(chan *response, 1), ctx: ctx, @@ -309,14 +242,18 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq } c.wgMutex.Lock() - defer c.wgMutex.Unlock() // unlock at the very end of this function c.wg.Add(1) + c.wgMutex.Unlock() // unlock at the very end of this function defer c.wg.Done() // Enqueue the request to be sent peerAddr := c.Info().GRPCAddress metricBatchQueueLength.WithLabelValues(peerAddr).Set(float64(len(c.queue))) + if c.queueClosed.Load() { + return nil, grpc.ErrClientConnClosing + } + select { case c.queue <- &req: // Successfully enqueued request. @@ -351,8 +288,8 @@ func (c *PeerClient) runBatch() { select { case r, ok := <-c.queue: - // If the queue has shutdown, we need to send the rest of the queue if !ok { + // If the queue has shutdown, we need to send the rest of the queue if len(queue) > 0 { c.sendBatch(ctx, queue) } @@ -422,9 +359,7 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { } timeoutCtx, timeoutCancel := context.WithTimeout(ctx, c.conf.Behavior.BatchTimeout) - c.clientMutex.RLock() resp, err := c.client.GetPeerRateLimits(timeoutCtx, &req) - c.clientMutex.RUnlock() timeoutCancel() // An error here indicates the entire request failed @@ -464,60 +399,16 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { } } -// Shutdown waits until outstanding requests have finished +// Shutdown waits until all outstanding requests have finished and then closes the grpc connection func (c *PeerClient) Shutdown() { - c.statusMutex.RLock() - if c.status == peerClosing || c.status == peerNotConnected { - c.statusMutex.RUnlock() - return - } - - c.statusMutex.RUnlock() - - c.statusMutex.Lock() - // unlock at the very end of this function so that new requests don't try to send through a closed channel - defer c.statusMutex.Unlock() - c.status = peerClosing - // drain in-flight requests c.wgMutex.Lock() defer c.wgMutex.Unlock() c.wg.Wait() - // no more items will be sent + // signal that no more items will be sent + c.queueClosed.Store(true) close(c.queue) - // close the client connection - c.clientMutex.Lock() - if c.conn != nil { - c.conn.Close() - } - c.clientMutex.Unlock() -} - -// PeerErr is returned if the peer is not connected or is in a closing state -type PeerErr struct { - err error -} - -func (p *PeerErr) NotReady() bool { - return true -} - -func (p *PeerErr) Error() string { - return p.err.Error() -} - -func (p *PeerErr) Cause() error { - return p.err -} - -type notReadyErr interface { - NotReady() bool -} - -// IsNotReady returns true if the err is because the peer is not connected or in a closing state -func IsNotReady(err error) bool { - te, ok := err.(notReadyErr) - return ok && te.NotReady() + c.conn.Close() } diff --git a/peer_client_test.go b/peer_client_test.go index 5ad80f99..5f4165c4 100644 --- a/peer_client_test.go +++ b/peer_client_test.go @@ -19,13 +19,15 @@ package gubernator_test import ( "context" "runtime" - "sync" + "strings" "testing" gubernator "github.com/mailgun/gubernator/v2" "github.com/mailgun/gubernator/v2/cluster" "github.com/mailgun/holster/v4/clock" - "github.com/stretchr/testify/assert" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestPeerClientShutdown(t *testing.T) { @@ -56,17 +58,17 @@ func TestPeerClientShutdown(t *testing.T) { c := cases[i] t.Run(c.Name, func(t *testing.T) { - client := gubernator.NewPeerClient(gubernator.PeerConfig{ + client, err := gubernator.NewPeerClient(gubernator.PeerConfig{ Info: cluster.GetRandomPeer(cluster.DataCenterNone), Behavior: config, }) + require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(threads) + wg := errgroup.Group{} + wg.SetLimit(threads) // Spawn a whole bunch of concurrent requests to test shutdown in various states for j := 0; j < threads; j++ { - go func() { - defer wg.Done() + wg.Go(func() error { ctx := context.Background() _, err := client.GetPeerRateLimit(ctx, &gubernator.RateLimitReq{ Hits: 1, @@ -74,18 +76,13 @@ func TestPeerClientShutdown(t *testing.T) { Behavior: c.Behavior, }) - isExpectedErr := false - - switch err.(type) { - case *gubernator.PeerErr: - isExpectedErr = true - case nil: - isExpectedErr = true + if err != nil { + if !strings.Contains(err.Error(), "client connection is closing") { + return errors.Wrap(err, "unexpected error") + } } - - assert.True(t, true, isExpectedErr) - - }() + return nil + }) } // yield the processor that way we allow other goroutines to start their request @@ -93,7 +90,11 @@ func TestPeerClientShutdown(t *testing.T) { client.Shutdown() - wg.Wait() + err = wg.Wait() + if err != nil { + t.Error(err) + t.Fail() + } }) } From 4b701fb7a3dbb13c0009daa8b3d4e6659a78d873 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:23:48 -0300 Subject: [PATCH 06/21] add comment --- peer_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/peer_client.go b/peer_client.go index 9dee7593..71de53b4 100644 --- a/peer_client.go +++ b/peer_client.go @@ -251,6 +251,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq metricBatchQueueLength.WithLabelValues(peerAddr).Set(float64(len(c.queue))) if c.queueClosed.Load() { + // this check prevents "panic: send on close channel" return nil, grpc.ErrClientConnClosing } From 0eccd3720fc98f5df75095b761fab1c57479711f Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:25:01 -0300 Subject: [PATCH 07/21] remove comment --- peer_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer_client.go b/peer_client.go index 71de53b4..e18f28bb 100644 --- a/peer_client.go +++ b/peer_client.go @@ -243,7 +243,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq c.wgMutex.Lock() c.wg.Add(1) - c.wgMutex.Unlock() // unlock at the very end of this function + c.wgMutex.Unlock() defer c.wg.Done() // Enqueue the request to be sent From 42c8e72dd8cefb6f846c280b0bd4fabc5c7ee635 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:44:06 -0300 Subject: [PATCH 08/21] fix lint --- peer_client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/peer_client.go b/peer_client.go index e18f28bb..fd2203c7 100644 --- a/peer_client.go +++ b/peer_client.go @@ -34,8 +34,10 @@ import ( "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) type PeerPicker interface { @@ -252,7 +254,7 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq if c.queueClosed.Load() { // this check prevents "panic: send on close channel" - return nil, grpc.ErrClientConnClosing + return nil, status.Error(codes.Canceled, "grpc: the client connection is closing") } select { From 27e91fdd63582d0c0610452eeaefb3dde5e82b62 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:50:17 -0300 Subject: [PATCH 09/21] add ctx back to shutdown --- gubernator.go | 8 +++++++- peer_client.go | 31 ++++++++++++++++++++++--------- peer_client_test.go | 6 +++--- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/gubernator.go b/gubernator.go index 3360152c..734832f2 100644 --- a/gubernator.go +++ b/gubernator.go @@ -644,6 +644,9 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { s.log.WithField("peers", peerInfo).Debug("peers updated") // Shutdown any old peers we no longer need + ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout) + defer cancel() + var shutdownPeers []*PeerClient for _, peer := range oldLocalPicker.Peers() { if peerInfo := s.conf.LocalPicker.GetByPeerInfo(peer.Info()); peerInfo == nil { @@ -663,7 +666,10 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { for _, p := range shutdownPeers { wg.Run(func(obj interface{}) error { pc := obj.(*PeerClient) - pc.Shutdown() + err := pc.Shutdown(ctx) + if err != nil { + s.log.WithError(err).WithField("peer", pc).Error("while shutting down peer") + } return nil }, p) } diff --git a/peer_client.go b/peer_client.go index fd2203c7..3bdf6901 100644 --- a/peer_client.go +++ b/peer_client.go @@ -403,15 +403,28 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { } // Shutdown waits until all outstanding requests have finished and then closes the grpc connection -func (c *PeerClient) Shutdown() { - // drain in-flight requests - c.wgMutex.Lock() - defer c.wgMutex.Unlock() - c.wg.Wait() +func (c *PeerClient) Shutdown(ctx context.Context) error { + waitChan := make(chan struct{}) + go func() { + // drain in-flight requests + c.wgMutex.Lock() + defer c.wgMutex.Unlock() + c.wg.Wait() + + // signal that no more items will be sent + c.queueClosed.Store(true) + close(c.queue) + + // close connection + _ = c.conn.Close() - // signal that no more items will be sent - c.queueClosed.Store(true) - close(c.queue) + close(waitChan) + }() - c.conn.Close() + select { + case <-ctx.Done(): + return ctx.Err() + case <-waitChan: + return nil + } } diff --git a/peer_client_test.go b/peer_client_test.go index 5f4165c4..d739f40a 100644 --- a/peer_client_test.go +++ b/peer_client_test.go @@ -78,7 +78,7 @@ func TestPeerClientShutdown(t *testing.T) { if err != nil { if !strings.Contains(err.Error(), "client connection is closing") { - return errors.Wrap(err, "unexpected error") + return errors.Wrap(err, "unexpected error in test") } } return nil @@ -88,14 +88,14 @@ func TestPeerClientShutdown(t *testing.T) { // yield the processor that way we allow other goroutines to start their request runtime.Gosched() - client.Shutdown() + shutDownErr := client.Shutdown(context.Background()) err = wg.Wait() if err != nil { t.Error(err) t.Fail() } + require.NoError(t, shutDownErr) }) - } } From dc64a092398af2b5906c6b48b5560845e81c27dc Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:55:08 -0300 Subject: [PATCH 10/21] move conn.Close out of goroutine --- peer_client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/peer_client.go b/peer_client.go index 3bdf6901..6d6fbb8a 100644 --- a/peer_client.go +++ b/peer_client.go @@ -404,6 +404,9 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { // Shutdown waits until all outstanding requests have finished and then closes the grpc connection func (c *PeerClient) Shutdown(ctx context.Context) error { + // ensure we don't leak goroutines, even if the Shutdown times out + defer c.conn.Close() + waitChan := make(chan struct{}) go func() { // drain in-flight requests @@ -415,9 +418,6 @@ func (c *PeerClient) Shutdown(ctx context.Context) error { c.queueClosed.Store(true) close(c.queue) - // close connection - _ = c.conn.Close() - close(waitChan) }() From 045f826f6b4ac65b5c92b7c4e9af7708598d9c26 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:55:50 -0300 Subject: [PATCH 11/21] fix lint --- gubernator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gubernator.go b/gubernator.go index 734832f2..681413d0 100644 --- a/gubernator.go +++ b/gubernator.go @@ -530,7 +530,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health localPeers := s.conf.LocalPicker.Peers() for _, peer := range localPeers { for _, errMsg := range peer.GetLastErr() { - err := fmt.Errorf("Error returned from local peer.GetLastErr: %s", errMsg) + err := fmt.Errorf("error returned from local peer.GetLastErr: %s", errMsg) span.RecordError(err) errs = append(errs, err.Error()) } @@ -540,7 +540,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health regionPeers := s.conf.RegionPicker.Peers() for _, peer := range regionPeers { for _, errMsg := range peer.GetLastErr() { - err := fmt.Errorf("Error returned from region peer.GetLastErr: %s", errMsg) + err := fmt.Errorf("error returned from region peer.GetLastErr: %s", errMsg) span.RecordError(err) errs = append(errs, err.Error()) } From 9ce0eb6a5f76dedc67e860c712c3703bdff6856a Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 18:58:21 -0300 Subject: [PATCH 12/21] oops --- gubernator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gubernator.go b/gubernator.go index 681413d0..f0708fa8 100644 --- a/gubernator.go +++ b/gubernator.go @@ -345,7 +345,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) { // Make an RPC call to the peer that owns this rate limit r, err := req.Peer.GetPeerRateLimit(ctx, req.Req) if err != nil { - if errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { attempts++ metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc() req.Peer, err = s.GetPeer(ctx, req.Key) From e7eabb0ff0bdd20a38e5960c45482811122d9b67 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 19:26:19 -0300 Subject: [PATCH 13/21] make it blocking --- gubernator.go | 3 ++- peer_client.go | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/gubernator.go b/gubernator.go index f0708fa8..b8f18601 100644 --- a/gubernator.go +++ b/gubernator.go @@ -588,7 +588,8 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_ return resp, nil } -// SetPeers is called by the implementor to indicate the pool of peers has changed +// SetPeers replaces the peers and shuts down all the previous peers. +// TODO this should return an error if we failed to connect to any of the new peers func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { localPicker := s.conf.LocalPicker.New() regionPicker := s.conf.RegionPicker.New() diff --git a/peer_client.go b/peer_client.go index 6d6fbb8a..36a60f08 100644 --- a/peer_client.go +++ b/peer_client.go @@ -79,7 +79,7 @@ type PeerConfig struct { TraceGRPC bool } -// NewPeerClient establishes a connection to a peer in a non-blocking fashion. +// NewPeerClient establishes a connection to a peer in a blocking fashion. // If batching is enabled, it also starts a goroutine where batches will be processed. func NewPeerClient(conf PeerConfig) (*PeerClient, error) { peerClient := &PeerClient{ @@ -87,7 +87,7 @@ func NewPeerClient(conf PeerConfig) (*PeerClient, error) { conf: conf, lastErrs: collections.NewLRUCache(100), } - var opts []grpc.DialOption + opts := []grpc.DialOption{grpc.WithBlock()} if conf.TraceGRPC { opts = []grpc.DialOption{ @@ -402,7 +402,8 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { } } -// Shutdown waits until all outstanding requests have finished and then closes the grpc connection +// Shutdown waits until all outstanding requests have finished or the context is cancelled. +// Then it closes the grpc connection. func (c *PeerClient) Shutdown(ctx context.Context) error { // ensure we don't leak goroutines, even if the Shutdown times out defer c.conn.Close() From f13217670a2de38e7baefef7c99d34343fe64669 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 20:23:56 -0300 Subject: [PATCH 14/21] put cap of 1 second to NewPeerClient connect --- gubernator.go | 2 ++ peer_client.go | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/gubernator.go b/gubernator.go index b8f18601..0a4701e5 100644 --- a/gubernator.go +++ b/gubernator.go @@ -610,6 +610,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { }) if err != nil { s.log.Error("error connecting to peer: %s", err) + return } } regionPicker.Add(peer) @@ -628,6 +629,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { }) if err != nil { s.log.Error("error connecting to peer: %s", err) + return } } localPicker.Add(peer) diff --git a/peer_client.go b/peer_client.go index 36a60f08..e7c40a4c 100644 --- a/peer_client.go +++ b/peer_client.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/mailgun/holster/v4/clock" "github.com/mailgun/holster/v4/collections" @@ -79,7 +80,7 @@ type PeerConfig struct { TraceGRPC bool } -// NewPeerClient establishes a connection to a peer in a blocking fashion. +// NewPeerClient tries to establish a connection to a peer in a blocking fashion with a 1 second timeout. // If batching is enabled, it also starts a goroutine where batches will be processed. func NewPeerClient(conf PeerConfig) (*PeerClient, error) { peerClient := &PeerClient{ @@ -101,8 +102,11 @@ func NewPeerClient(conf PeerConfig) (*PeerClient, error) { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + var err error - peerClient.conn, err = grpc.Dial(conf.Info.GRPCAddress, opts...) + peerClient.conn, err = grpc.DialContext(ctx, conf.Info.GRPCAddress, opts...) if err != nil { return nil, err } From f6102d9ed6ac2b2f61837594a24da0f50c67d851 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Sun, 25 Feb 2024 22:24:21 -0300 Subject: [PATCH 15/21] undo put cap of 1 second to NewPeerClient connect --- gubernator.go | 4 ++-- peer_client.go | 10 +++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/gubernator.go b/gubernator.go index 0a4701e5..eed4db7a 100644 --- a/gubernator.go +++ b/gubernator.go @@ -609,7 +609,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { Info: info, }) if err != nil { - s.log.Error("error connecting to peer: %s", err) + s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err) return } } @@ -628,7 +628,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { Info: info, }) if err != nil { - s.log.Error("error connecting to peer: %s", err) + s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err) return } } diff --git a/peer_client.go b/peer_client.go index e7c40a4c..8cd2a1c5 100644 --- a/peer_client.go +++ b/peer_client.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" "sync/atomic" - "time" "github.com/mailgun/holster/v4/clock" "github.com/mailgun/holster/v4/collections" @@ -80,7 +79,7 @@ type PeerConfig struct { TraceGRPC bool } -// NewPeerClient tries to establish a connection to a peer in a blocking fashion with a 1 second timeout. +// NewPeerClient tries to establish a connection to a peer in a non-blocking fashion. // If batching is enabled, it also starts a goroutine where batches will be processed. func NewPeerClient(conf PeerConfig) (*PeerClient, error) { peerClient := &PeerClient{ @@ -88,7 +87,7 @@ func NewPeerClient(conf PeerConfig) (*PeerClient, error) { conf: conf, lastErrs: collections.NewLRUCache(100), } - opts := []grpc.DialOption{grpc.WithBlock()} + var opts []grpc.DialOption if conf.TraceGRPC { opts = []grpc.DialOption{ @@ -102,11 +101,8 @@ func NewPeerClient(conf PeerConfig) (*PeerClient, error) { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - var err error - peerClient.conn, err = grpc.DialContext(ctx, conf.Info.GRPCAddress, opts...) + peerClient.conn, err = grpc.Dial(conf.Info.GRPCAddress, opts...) if err != nil { return nil, err } From b218cbeb3320aac5fa765e8e01688f28aa12c67f Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Mon, 26 Feb 2024 12:22:25 -0300 Subject: [PATCH 16/21] fix another leak --- global.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/global.go b/global.go index 5301be54..1dbd44ca 100644 --- a/global.go +++ b/global.go @@ -263,4 +263,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] func (gm *globalManager) Close() { gm.wg.Stop() + for _, peer := range gm.instance.GetPeerList() { + _ = peer.Shutdown(context.Background()) + } } From fcb7b1505fd2df34a969a3dc5680ba3b9322e3ce Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Wed, 28 Feb 2024 11:13:19 -0800 Subject: [PATCH 17/21] on peer shutdown, clear all errors --- peer_client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/peer_client.go b/peer_client.go index 8cd2a1c5..39c13c14 100644 --- a/peer_client.go +++ b/peer_client.go @@ -415,6 +415,9 @@ func (c *PeerClient) Shutdown(ctx context.Context) error { defer c.wgMutex.Unlock() c.wg.Wait() + // clear errors + c.lastErrs = collections.NewLRUCache(100) + // signal that no more items will be sent c.queueClosed.Store(true) close(c.queue) From 6a53c430f86eb052d7f8974911bb0cd15899d863 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Wed, 28 Feb 2024 11:24:06 -0800 Subject: [PATCH 18/21] add time.sleep to end of TestHealthCheck --- functional_test.go | 2 ++ global.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/functional_test.go b/functional_test.go index 8ad15450..1cca4a71 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1618,6 +1618,8 @@ func TestHealthCheck(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), clock.Second*15) defer cancel() require.NoError(t, cluster.Restart(ctx)) + // wait for instances to come online + time.Sleep(1 * time.Second) } func TestLeakyBucketDivBug(t *testing.T) { diff --git a/global.go b/global.go index 1dbd44ca..bd0c1e7c 100644 --- a/global.go +++ b/global.go @@ -32,7 +32,7 @@ type globalManager struct { wg syncutil.WaitGroup conf BehaviorConfig log FieldLogger - instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager + instance *V1Instance // TODO circular import? V1Instance also holds a reference to globalManager metricGlobalSendDuration prometheus.Summary metricBroadcastDuration prometheus.Summary metricBroadcastCounter *prometheus.CounterVec @@ -261,6 +261,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] fan.Wait() } +// Close stops all goroutines and shuts down all the peers. func (gm *globalManager) Close() { gm.wg.Stop() for _, peer := range gm.instance.GetPeerList() { From e4d42b7faa7757e290ead4373ec3f330f520f5dc Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Wed, 28 Feb 2024 14:20:49 -0800 Subject: [PATCH 19/21] use testutil.UntilPass --- functional_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/functional_test.go b/functional_test.go index 1cca4a71..6e09db31 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1618,8 +1618,13 @@ func TestHealthCheck(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), clock.Second*15) defer cancel() require.NoError(t, cluster.Restart(ctx)) - // wait for instances to come online - time.Sleep(1 * time.Second) + + // wait for instances to come back online + testutil.UntilPass(t, 10, clock.Millisecond*300, func(t testutil.TestingT) { + healthResp, err = client.HealthCheck(context.Background(), &guber.HealthCheckReq{}) + assert.Nil(t, err) + assert.Equal(t, "healthy", healthResp.GetStatus()) + }) } func TestLeakyBucketDivBug(t *testing.T) { From 795af0697ba767610aec71d396e78e8034918cc3 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Wed, 28 Feb 2024 14:29:19 -0800 Subject: [PATCH 20/21] check health of every instance --- functional_test.go | 46 +++++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/functional_test.go b/functional_test.go index 6e09db31..35199c2f 100644 --- a/functional_test.go +++ b/functional_test.go @@ -36,28 +36,29 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" json "google.golang.org/protobuf/encoding/protojson" ) +var allPeers = []guber.PeerInfo{ + {GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone}, + + // DataCenterOne + {GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne}, +} + // Setup and shutdown the mock gubernator cluster for the entire test suite func TestMain(m *testing.M) { - if err := cluster.StartWith([]guber.PeerInfo{ - {GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone}, - - // DataCenterOne - {GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne}, - {GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne}, - {GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne}, - {GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne}, - }); err != nil { + if err := cluster.StartWith(allPeers); err != nil { fmt.Println(err) os.Exit(1) } @@ -1619,12 +1620,15 @@ func TestHealthCheck(t *testing.T) { defer cancel() require.NoError(t, cluster.Restart(ctx)) - // wait for instances to come back online - testutil.UntilPass(t, 10, clock.Millisecond*300, func(t testutil.TestingT) { - healthResp, err = client.HealthCheck(context.Background(), &guber.HealthCheckReq{}) - assert.Nil(t, err) - assert.Equal(t, "healthy", healthResp.GetStatus()) - }) + // wait for every peer instance to come back online + for _, peer := range allPeers { + peerClient, err := guber.DialV1Server(peer.GRPCAddress, nil) + require.NoError(t, err) + testutil.UntilPass(t, 10, clock.Millisecond*300, func(t testutil.TestingT) { + healthResp, err = peerClient.HealthCheck(context.Background(), &guber.HealthCheckReq{}) + assert.Equal(t, "healthy", healthResp.GetStatus()) + }) + } } func TestLeakyBucketDivBug(t *testing.T) { From db541c086cb69a0d9c42dac10e3e73c686c6dc44 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Wed, 28 Feb 2024 14:35:35 -0800 Subject: [PATCH 21/21] use cluster.GetAllPeers --- functional_test.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/functional_test.go b/functional_test.go index 35199c2f..654342b7 100644 --- a/functional_test.go +++ b/functional_test.go @@ -41,24 +41,22 @@ import ( json "google.golang.org/protobuf/encoding/protojson" ) -var allPeers = []guber.PeerInfo{ - {GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone}, - {GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone}, - - // DataCenterOne - {GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne}, - {GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne}, - {GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne}, - {GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne}, -} - // Setup and shutdown the mock gubernator cluster for the entire test suite func TestMain(m *testing.M) { - if err := cluster.StartWith(allPeers); err != nil { + if err := cluster.StartWith([]guber.PeerInfo{ + {GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone}, + + // DataCenterOne + {GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne}, + }); err != nil { fmt.Println(err) os.Exit(1) } @@ -1621,7 +1619,7 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, cluster.Restart(ctx)) // wait for every peer instance to come back online - for _, peer := range allPeers { + for _, peer := range cluster.GetPeers() { peerClient, err := guber.DialV1Server(peer.GRPCAddress, nil) require.NoError(t, err) testutil.UntilPass(t, 10, clock.Millisecond*300, func(t testutil.TestingT) {