Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

fix: mutex deadlocks in PeerClient #223

Merged
merged 23 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ func BenchmarkServer(b *testing.B) {
require.NoError(b, err, "Error in conf.SetDefaults")

b.Run("GetPeerRateLimit() with no batching", func(b *testing.B) {
client := guber.NewPeerClient(guber.PeerConfig{
client, err := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
Behavior: conf.Behaviors,
})
if err != nil {
b.Errorf("Error building client: %s", err)
}

b.ResetTimer()

Expand Down
5 changes: 4 additions & 1 deletion functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,8 @@ func TestHealthCheck(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*15)
defer cancel()
require.NoError(t, cluster.Restart(ctx))
// wait for instances to come online
Baliedge marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(1 * time.Second)
}

func TestLeakyBucketDivBug(t *testing.T) {
Expand Down Expand Up @@ -1723,9 +1725,10 @@ func TestGRPCGateway(t *testing.T) {

func TestGetPeerRateLimits(t *testing.T) {
ctx := context.Background()
peerClient := guber.NewPeerClient(guber.PeerConfig{
peerClient, err := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
})
require.NoError(t, err)

t.Run("Stable rate check request order", func(t *testing.T) {
// Ensure response order matches rate check request order.
Expand Down
11 changes: 8 additions & 3 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"github.com/mailgun/holster/v4/syncutil"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -31,7 +32,7 @@ type globalManager struct {
wg syncutil.WaitGroup
conf BehaviorConfig
log FieldLogger
instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager
instance *V1Instance // TODO circular import? V1Instance also holds a reference to globalManager
metricGlobalSendDuration prometheus.Summary
metricBroadcastDuration prometheus.Summary
metricBroadcastCounter *prometheus.CounterVec
Expand Down Expand Up @@ -249,8 +250,8 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
// Only log if it's an unknown error
if !errors.Is(err, context.Canceled) && errors.Is(err, context.DeadlineExceeded) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
}
}
Expand All @@ -260,6 +261,10 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
fan.Wait()
}

// Close stops all goroutines and shuts down all the peers.
func (gm *globalManager) Close() {
gm.wg.Stop()
for _, peer := range gm.instance.GetPeerList() {
_ = peer.Shutdown(context.Background())
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
go.opentelemetry.io/otel/trace v1.21.0
go.uber.org/goleak v1.3.0
golang.org/x/net v0.18.0
golang.org/x/sync v0.3.0
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b
google.golang.org/grpc v1.59.0
Expand Down
1 change: 1 addition & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 17 additions & 6 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) {
// Make an RPC call to the peer that owns this rate limit
r, err := req.Peer.GetPeerRateLimit(ctx, req.Req)
if err != nil {
if IsNotReady(err) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
attempts++
metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc()
req.Peer, err = s.GetPeer(ctx, req.Key)
Expand Down Expand Up @@ -528,7 +528,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health
localPeers := s.conf.LocalPicker.Peers()
for _, peer := range localPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("Error returned from local peer.GetLastErr: %s", errMsg)
err := fmt.Errorf("error returned from local peer.GetLastErr: %s", errMsg)
span.RecordError(err)
errs = append(errs, err.Error())
}
Expand All @@ -538,7 +538,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health
regionPeers := s.conf.RegionPicker.Peers()
for _, peer := range regionPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("Error returned from region peer.GetLastErr: %s", errMsg)
err := fmt.Errorf("error returned from region peer.GetLastErr: %s", errMsg)
span.RecordError(err)
errs = append(errs, err.Error())
}
Expand Down Expand Up @@ -586,7 +586,8 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_
return resp, nil
}

// SetPeers is called by the implementor to indicate the pool of peers has changed
// SetPeers replaces the peers and shuts down all the previous peers.
// TODO this should return an error if we failed to connect to any of the new peers
func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
localPicker := s.conf.LocalPicker.New()
regionPicker := s.conf.RegionPicker.New()
Expand All @@ -597,27 +598,37 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
peer := s.conf.RegionPicker.GetByPeerInfo(info)
// If we don't have an existing PeerClient create a new one
if peer == nil {
peer = NewPeerClient(PeerConfig{
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
return
}
}
regionPicker.Add(peer)
continue
}
// If we don't have an existing PeerClient create a new one
peer := s.conf.LocalPicker.GetByPeerInfo(info)
if peer == nil {
peer = NewPeerClient(PeerConfig{
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
return
}
}
localPicker.Add(peer)
}
Expand Down
Loading
Loading