Skip to content

Commit

Permalink
Merge pull request #119 from liftbridge-io/broker_reconnect
Browse files Browse the repository at this point in the history
Make PublishAsync more resilient
  • Loading branch information
tylertreat authored Dec 29, 2021
2 parents 441af0d + 865b4ec commit 12d3f79
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 43 deletions.
180 changes: 145 additions & 35 deletions v2/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
proto "github.com/liftbridge-io/liftbridge-api/go"
"github.com/serialx/hashring"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type ackReceivedFunc func(*proto.PublishResponse)
Expand Down Expand Up @@ -124,7 +126,7 @@ func (b *brokers) FromAddr(addr string) (proto.APIClient, error) {
return nil, fmt.Errorf("no broker found: %v", addr)
}

return broker.client, nil
return broker.grpcClient.Client(), nil
}

func (b *brokers) ChooseBroker(selectionCriteria SelectionCriteria) (proto.APIClient, error) {
Expand All @@ -144,48 +146,116 @@ func (b *brokers) ChooseBroker(selectionCriteria SelectionCriteria) (proto.APICl

// Find server with lowest latency
for i := 0; i < len(b.brokers); i++ {
status := b.brokers[i].Status()
if i == 0 {
minLatency = int(b.brokers[i].status.LastKnownLatency)
minLatency = int(status.LastKnownLatency)
broker = b.brokers[i]
continue
}
if int(b.brokers[i].status.LastKnownLatency) < minLatency {
minLatency = int(b.brokers[i].status.LastKnownLatency)
if int(status.LastKnownLatency) < minLatency {
minLatency = int(status.LastKnownLatency)
broker = b.brokers[i]
}

}
return broker.client, nil
return broker.grpcClient.Client(), nil
case Workload:
// Find server with lowest work load
minPartitionCount := -1

for i := 0; i < len(b.brokers); i++ {
status := b.brokers[i].Status()
if i == 0 {
minPartitionCount = int(b.brokers[i].status.PartitionCount)
minPartitionCount = int(status.PartitionCount)
broker = b.brokers[i]
continue
}
if int(b.brokers[i].status.PartitionCount) < minPartitionCount {
minPartitionCount = int(b.brokers[i].status.PartitionCount)
minPartitionCount = int(status.PartitionCount)
broker = b.brokers[i]
}

}
return broker.client, nil
return broker.grpcClient.Client(), nil
case Random:
// Return the current broker (randomly chosen)
return broker.client, nil
return broker.grpcClient.Client(), nil
default:
// Return the current broker (randomly chosen)
return broker.client, nil
return broker.grpcClient.Client(), nil
}

}

// PublicationStream returns a publication stream based on a stream name and a
// partition.
func (b *brokers) PublicationStream(stream string, partition int32) (proto.API_PublishAsyncClient, error) {
// grpcClient wraps a gRPC APIClient and API_PublishAsyncClient.
type grpcClient struct {
addr string
dialOpts []grpc.DialOption
conn *grpc.ClientConn
client proto.APIClient
asyncClient proto.API_PublishAsyncClient
mu sync.RWMutex
closed bool
}

func newGrpcClient(ctx context.Context, addr string, opts []grpc.DialOption) (*grpcClient, error) {
g := &grpcClient{addr: addr, dialOpts: opts}
if err := g.redial(ctx); err != nil {
return nil, err
}
return g, nil
}

func (g *grpcClient) redial(ctx context.Context) error {
g.mu.Lock()
defer g.mu.Unlock()
if g.closed {
return errors.New("client was closed")
}
oldConn := g.conn
conn, err := dialBroker(ctx, g.addr, g.dialOpts)
if err != nil {
return err
}
newClient := proto.NewAPIClient(conn)
newAsyncClient, err := newClient.PublishAsync(ctx)
if err != nil {
conn.Close()
return err
}
g.conn = conn
g.client = newClient
g.asyncClient = newAsyncClient
if oldConn != nil {
oldConn.Close()
}
return nil
}

func (g *grpcClient) close() {
g.mu.Lock()
defer g.mu.Unlock()
if g.closed {
return
}
g.conn.Close()
g.closed = true
}

func (g *grpcClient) Client() proto.APIClient {
g.mu.RLock()
defer g.mu.RUnlock()
return g.client
}

func (g *grpcClient) AsyncClient() proto.API_PublishAsyncClient {
g.mu.RLock()
defer g.mu.RUnlock()
return g.asyncClient
}

// GetGrpcClient returns a grpcClient based on a stream name and a partition.
func (b *brokers) GetGrpcClient(stream string, partition int32) (*grpcClient, error) {
b.mu.RLock()
defer b.mu.RUnlock()

Expand All @@ -200,7 +270,7 @@ func (b *brokers) PublicationStream(stream string, partition int32) (proto.API_P
return nil, fmt.Errorf("broker not found: %v", addr)
}

return broker.stream, nil
return broker.grpcClient, nil
}

func brokerHashringKey(stream string, partition int32) string {
Expand All @@ -209,28 +279,23 @@ func brokerHashringKey(stream string, partition int32) string {

// broker represents a connection to a broker.
type broker struct {
conn *grpc.ClientConn
client proto.APIClient
stream proto.API_PublishAsyncClient
wg sync.WaitGroup
status *brokerStatus
grpcClient *grpcClient
wg sync.WaitGroup
status *brokerStatus
closed chan struct{}
mu sync.RWMutex
}

func newBroker(ctx context.Context, addr string, opts []grpc.DialOption, ackReceived ackReceivedFunc) (*broker, error) {
conn, err := dialBroker(ctx, addr, opts)
client, err := newGrpcClient(ctx, addr, opts)
if err != nil {
return nil, err
}

b := &broker{
conn: conn,
client: proto.NewAPIClient(conn),
status: &brokerStatus{PartitionCount: 0, LastKnownLatency: 0},
}

if b.stream, err = b.client.PublishAsync(ctx); err != nil {
conn.Close()
return nil, err
grpcClient: client,
status: &brokerStatus{PartitionCount: 0, LastKnownLatency: 0},
closed: make(chan struct{}),
}

b.wg.Add(1)
Expand All @@ -250,7 +315,7 @@ func (b *broker) updateStatus(ctx context.Context, addr string) error {
// Measure instant server response time
start := time.Now()

resp, err := b.client.FetchMetadata(ctx, &proto.FetchMetadataRequest{})
resp, err := b.grpcClient.Client().FetchMetadata(ctx, &proto.FetchMetadataRequest{})

elapsed := time.Since(start)

Expand All @@ -259,10 +324,9 @@ func (b *broker) updateStatus(ctx context.Context, addr string) error {
}

// Parse broker status
b.status.LastKnownLatency = elapsed
updatedStatus := &brokerStatus{LastKnownLatency: elapsed}

// Count total number of partitions for this broker

for _, broker := range resp.Brokers {
brokerInfo := &BrokerInfo{
id: broker.Id,
Expand All @@ -272,35 +336,81 @@ func (b *broker) updateStatus(ctx context.Context, addr string) error {
partitionCount: broker.PartitionCount,
}
if brokerInfo.Addr() == addr {
b.status.PartitionCount = brokerInfo.LeaderCount() + brokerInfo.PartitionCount()
updatedStatus.PartitionCount = brokerInfo.LeaderCount() + brokerInfo.PartitionCount()
break
}

}

b.mu.Lock()
b.status = updatedStatus
b.mu.Unlock()

return nil
}

func (b *broker) Status() *brokerStatus {
b.mu.RLock()
defer b.mu.RUnlock()
return b.status
}

func (b *broker) Close() {
b.conn.Close()
select {
case <-b.closed:
return
default:
}
b.grpcClient.close()
close(b.closed)
b.wg.Wait()
}

func (b *broker) dispatchAcks(ackReceived ackReceivedFunc) {
stream := b.grpcClient.AsyncClient()
for {
resp, err := b.stream.Recv()
resp, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
// TODO: reconnect?
// Check if the broker connection has been closed.
select {
case <-b.closed:
return
default:
}
if status.Code(err) == codes.Unavailable {
// Attempt to reconnect.
if err := b.reconnect(); err == nil {
stream = b.grpcClient.AsyncClient()
continue
}
}
return
}
ackReceived(resp)
}
}

func (b *broker) reconnect() error {
b.mu.RLock()
var (
err error
ctx = context.Background()
)
b.mu.RUnlock()
for i := 0; i < 5; i++ {
if er := b.grpcClient.redial(ctx); er != nil {
err = er
sleepContext(ctx, 50*time.Millisecond)
continue
}
return nil
}
return err
}

func dialBroker(ctx context.Context, addr string, opts []grpc.DialOption) (*grpc.ClientConn, error) {
// Perform a blocking dial if a context with a deadline has been provided.
_, hasDeadline := ctx.Deadline()
Expand Down
27 changes: 20 additions & 7 deletions v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -1249,7 +1250,7 @@ func (c *client) publishAsync(ctx context.Context, streamName string, value []by
return err
}

stream, err := c.brokers.PublicationStream(streamName, req.Partition)
grpcClient, err := c.brokers.GetGrpcClient(streamName, req.Partition)
if err != nil {
return fmt.Errorf("broker for stream: %w", err)
}
Expand Down Expand Up @@ -1280,15 +1281,27 @@ func (c *client) publishAsync(ctx context.Context, streamName string, value []by
c.mu.Unlock()
}

if err := stream.Send(req); err != nil {
c.removeAckContext(req.CorrelationId)
if status.Code(err) == codes.FailedPrecondition {
err = ErrReadonlyPartition
for i := 0; i < 5; i++ {
stream := grpcClient.AsyncClient()
if e := stream.Send(req); e != nil {
err = e
if e == io.EOF {
// We were disconnected, so attempt to use the reconnected
// stream (the dispatchAcks goroutine will attempt to
// reconnect).
sleepContext(ctx, 50*time.Millisecond)
continue
}
if status.Code(e) == codes.FailedPrecondition {
e = ErrReadonlyPartition
}
c.removeAckContext(req.CorrelationId)
return e
}
return err
return nil
}

return nil
return err
}

// PublishToSubject publishes a new message to the NATS subject. Note that
Expand Down
Loading

0 comments on commit 12d3f79

Please sign in to comment.