Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

fix: mutex deadlocks in PeerClient #223

Merged
merged 23 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ func BenchmarkServer(b *testing.B) {
require.NoError(b, err, "Error in conf.SetDefaults")

b.Run("GetPeerRateLimit() with no batching", func(b *testing.B) {
client := guber.NewPeerClient(guber.PeerConfig{
client, err := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
Behavior: conf.Behaviors,
})
if err != nil {
b.Errorf("Error building client: %s", err)
}

b.ResetTimer()

Expand Down
3 changes: 2 additions & 1 deletion functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,9 +1720,10 @@ func TestGRPCGateway(t *testing.T) {

func TestGetPeerRateLimits(t *testing.T) {
ctx := context.Background()
peerClient := guber.NewPeerClient(guber.PeerConfig{
peerClient, err := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
})
require.NoError(t, err)

t.Run("Stable rate check request order", func(t *testing.T) {
// Ensure response order matches rate check request order.
Expand Down
5 changes: 3 additions & 2 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"github.com/mailgun/holster/v4/syncutil"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -247,8 +248,8 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
// Only log if it's an unknown error
if !errors.Is(err, context.Canceled) && errors.Is(err, context.DeadlineExceeded) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
golang.org/x/net v0.18.0
golang.org/x/sync v0.3.0
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b
google.golang.org/grpc v1.59.0
Expand Down
1 change: 1 addition & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 12 additions & 10 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@
// Make an RPC call to the peer that owns this rate limit
r, err := req.Peer.GetPeerRateLimit(ctx, req.Req)
if err != nil {
if IsNotReady(err) {
if errors.Is(err, context.DeadlineExceeded) {
attempts++
metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc()
req.Peer, err = s.GetPeer(ctx, req.Key)
Expand Down Expand Up @@ -530,7 +530,7 @@
localPeers := s.conf.LocalPicker.Peers()
for _, peer := range localPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("Error returned from local peer.GetLastErr: %s", errMsg)

Check failure on line 533 in gubernator.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (stylecheck)

Check failure on line 533 in gubernator.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

ST1005: error strings should not be capitalized (stylecheck)
span.RecordError(err)
errs = append(errs, err.Error())
}
Expand All @@ -540,7 +540,7 @@
regionPeers := s.conf.RegionPicker.Peers()
for _, peer := range regionPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("Error returned from region peer.GetLastErr: %s", errMsg)

Check failure on line 543 in gubernator.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (stylecheck)

Check failure on line 543 in gubernator.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

ST1005: error strings should not be capitalized (stylecheck)
span.RecordError(err)
errs = append(errs, err.Error())
}
Expand Down Expand Up @@ -599,27 +599,35 @@
peer := s.conf.RegionPicker.GetByPeerInfo(info)
// If we don't have an existing PeerClient create a new one
if peer == nil {
peer = NewPeerClient(PeerConfig{
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Error("error connecting to peer: %s", err)
}
}
regionPicker.Add(peer)
continue
}
// If we don't have an existing PeerClient create a new one
peer := s.conf.LocalPicker.GetByPeerInfo(info)
if peer == nil {
peer = NewPeerClient(PeerConfig{
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Error("error connecting to peer: %s", err)
}
}
localPicker.Add(peer)
}
Expand All @@ -636,9 +644,6 @@
s.log.WithField("peers", peerInfo).Debug("peers updated")

// Shutdown any old peers we no longer need
ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout)
defer cancel()

var shutdownPeers []*PeerClient
for _, peer := range oldLocalPicker.Peers() {
if peerInfo := s.conf.LocalPicker.GetByPeerInfo(peer.Info()); peerInfo == nil {
Expand All @@ -658,10 +663,7 @@
for _, p := range shutdownPeers {
wg.Run(func(obj interface{}) error {
pc := obj.(*PeerClient)
err := pc.Shutdown(ctx)
if err != nil {
s.log.WithError(err).WithField("peer", pc).Error("while shutting down peer")
}
pc.Shutdown()
return nil
}, p)
}
Expand Down
Loading
Loading