Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
make it blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
miparnisari committed Feb 25, 2024
1 parent 9ce0eb6 commit e7eabb0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
3 changes: 2 additions & 1 deletion gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,8 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_
return resp, nil
}

// SetPeers is called by the implementor to indicate the pool of peers has changed
// SetPeers replaces the peers and shuts down all the previous peers.
// TODO this should return an error if we failed to connect to any of the new peers
func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
localPicker := s.conf.LocalPicker.New()
regionPicker := s.conf.RegionPicker.New()
Expand Down
7 changes: 4 additions & 3 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ type PeerConfig struct {
TraceGRPC bool
}

// NewPeerClient establishes a connection to a peer in a non-blocking fashion.
// NewPeerClient establishes a connection to a peer in a blocking fashion.
// If batching is enabled, it also starts a goroutine where batches will be processed.
func NewPeerClient(conf PeerConfig) (*PeerClient, error) {
peerClient := &PeerClient{
queue: make(chan *request, 1000),
conf: conf,
lastErrs: collections.NewLRUCache(100),
}
var opts []grpc.DialOption
opts := []grpc.DialOption{grpc.WithBlock()}

if conf.TraceGRPC {
opts = []grpc.DialOption{
Expand Down Expand Up @@ -402,7 +402,8 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
}
}

// Shutdown waits until all outstanding requests have finished and then closes the grpc connection
// Shutdown waits until all outstanding requests have finished or the context is cancelled.
// Then it closes the grpc connection.
func (c *PeerClient) Shutdown(ctx context.Context) error {
// ensure we don't leak goroutines, even if the Shutdown times out
defer c.conn.Close()
Expand Down

0 comments on commit e7eabb0

Please sign in to comment.