Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to connect to leader via 10 servers simultaneously #292

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 87 additions & 44 deletions internal/protocol/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"io"
"net"
"sort"
"sync"
"time"

"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/go-dqlite/logging"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)

// DialFunc is a function that can be used to establish a network connection.
Expand Down Expand Up @@ -126,60 +128,101 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
return servers[i].Role < servers[j].Role
})

ctx, cancel := context.WithCancel(ctx)
defer cancel()

sem := semaphore.NewWeighted(10)

protocolChan := make(chan *Protocol)

wg := &sync.WaitGroup{}
wg.Add(len(servers))

go func() {
wg.Wait()
close(protocolChan)
}()

// Make an attempt for each address until we find the leader.
for _, server := range servers {
log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}
go func(server NodeInfo, pc chan<- *Protocol) {
defer wg.Done()

ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
return
}
defer sem.Release(1)

protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Warn, err.Error())
continue
}
if protocol != nil {
// We found the leader
if ctx.Err() != nil {
return
}

log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}

ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Warn, err.Error())
return
}
if protocol != nil {
// We found the leader
log(logging.Debug, "connected")
pc <- protocol
return
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
return
}

// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
log(logging.Debug, "connect to reported leader %s", leader)

ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, _, err = c.connectAttemptOne(ctx, leader, log)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
log(logging.Warn, "reported leader unavailable err=%v", err)
return
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
log(logging.Warn, "reported leader server is not the leader")
return
}
log(logging.Debug, "connected")
return protocol, nil
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
continue
}
pc <- protocol
}(server, protocolChan)
}

// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
log(logging.Debug, "connect to reported leader %s", leader)
// Read from protocol chan, cancel context
protocol, ok := <-protocolChan
if !ok {
return nil, ErrNoAvailableLeader
}

ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
cancel()

protocol, _, err = c.connectAttemptOne(ctx, leader, log)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
log(logging.Warn, "reported leader unavailable err=%v", err)
continue
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
log(logging.Warn, "reported leader server is not the leader")
continue
}
log(logging.Debug, "connected")
return protocol, nil
for extra := range protocolChan {
extra.Close()
}

return nil, ErrNoAvailableLeader
return protocol, nil
}

// Perform the initial handshake using the given protocol version.
Expand Down
Loading