Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure maximum concurrent leader probes #303

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type App struct {
voters int
standbys int
roles RolesConfig
options *options
}

// New creates a new application node.
Expand Down Expand Up @@ -219,6 +220,7 @@ func New(dir string, options ...Option) (app *App, err error) {
driver.WithDialFunc(driverDial),
driver.WithLogFunc(o.Log),
driver.WithTracing(o.Tracing),
driver.WithConcurrentLeaderConns(o.ConcurrentLeaderConns),
)
if err != nil {
stop()
Expand Down Expand Up @@ -256,6 +258,7 @@ func New(dir string, options ...Option) (app *App, err error) {
voters: o.Voters,
standbys: o.StandBys,
roles: RolesConfig{Voters: o.Voters, StandBys: o.StandBys},
options: o,
}

// Start the proxy if a TLS configuration was provided.
Expand Down Expand Up @@ -484,8 +487,11 @@ func (a *App) Open(ctx context.Context, database string) (*sql.DB, error) {
}

// Leader returns a client connected to the current cluster leader, if any.
func (a *App) Leader(ctx context.Context) (*client.Client, error) {
return client.FindLeader(ctx, a.store, a.clientOptions()...)
func (a *App) Leader(ctx context.Context, options ...client.Option) (*client.Client, error) {
allOptions := a.clientOptions()
allOptions = append(allOptions, options...)

return client.FindLeader(ctx, a.store, allOptions...)
}

// Client returns a client connected to the local node.
Expand Down Expand Up @@ -734,7 +740,7 @@ func (a *App) makeRolesChanges(nodes []client.NodeInfo) RolesChanges {

// Return the options to use for client.FindLeader() or client.New()
func (a *App) clientOptions() []client.Option {
return []client.Option{client.WithDialFunc(a.dialFunc), client.WithLogFunc(a.log)}
return []client.Option{client.WithDialFunc(a.dialFunc), client.WithLogFunc(a.log), client.WithConcurrentLeaderConns(*a.options.ConcurrentLeaderConns)}
}

func (a *App) debug(format string, args ...interface{}) {
Expand Down
15 changes: 15 additions & 0 deletions app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/internal/protocol"
)

// Option can be used to tweak app parameters.
Expand Down Expand Up @@ -180,6 +181,17 @@ func WithNetworkLatency(latency time.Duration) Option {
}
}

// WithConcurrentLeaderConns is the maximum number of concurrent connections
// to other cluster members that will be attempted while searching for the dqlite leader.
// It takes a pointer to an integer so that the value can be dynamically modified based on cluster health.
//
// The default is 10 connections to other cluster members.
func WithConcurrentLeaderConns(maxConns *int64) Option {
return func(o *options) {
o.ConcurrentLeaderConns = maxConns
}
}

// WithSnapshotParams sets the raft snapshot parameters.
func WithSnapshotParams(params dqlite.SnapshotParams) Option {
return func(options *options) {
Expand Down Expand Up @@ -235,6 +247,7 @@ type options struct {
OnRolesAdjustment func(client.NodeInfo, []client.NodeInfo) error
FailureDomain uint64
NetworkLatency time.Duration
ConcurrentLeaderConns *int64
UnixSocket string
SnapshotParams dqlite.SnapshotParams
DiskMode bool
Expand All @@ -243,6 +256,7 @@ type options struct {

// Create a options object with sane defaults.
func defaultOptions() *options {
maxConns := protocol.MaxConcurrentLeaderConns
return &options{
Log: defaultLogFunc,
Tracing: client.LogNone,
Expand All @@ -252,6 +266,7 @@ func defaultOptions() *options {
OnRolesAdjustment: func(client.NodeInfo, []client.NodeInfo) error { return nil },
DiskMode: false, // Be explicit about not enabling disk-mode by default.
AutoRecovery: true,
ConcurrentLeaderConns: &maxConns,
}
}

Expand Down
20 changes: 16 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ type Client struct {
type Option func(*options)

type options struct {
DialFunc DialFunc
LogFunc LogFunc
DialFunc DialFunc
LogFunc LogFunc
ConcurrentLeaderConns int64
}

// WithDialFunc sets a custom dial function for creating the client network
Expand All @@ -39,6 +40,16 @@ func WithLogFunc(log LogFunc) Option {
}
}

// WithConcurrentLeaderConns is the maximum number of concurrent connections
// to other cluster members that will be attempted while searching for the dqlite leader.
//
// The default is 10 connections to other cluster members.
func WithConcurrentLeaderConns(maxConns int64) Option {
return func(o *options) {
o.ConcurrentLeaderConns = maxConns
}
}

// New creates a new client connected to the dqlite node with the given
// address.
func New(ctx context.Context, address string, options ...Option) (*Client, error) {
Expand Down Expand Up @@ -313,7 +324,8 @@ func (c *Client) Close() error {
// Create a client options object with sane defaults.
func defaultOptions() *options {
return &options{
DialFunc: DefaultDialFunc,
LogFunc: DefaultLogFunc,
DialFunc: DefaultDialFunc,
LogFunc: DefaultLogFunc,
ConcurrentLeaderConns: protocol.MaxConcurrentLeaderConns,
}
}
3 changes: 2 additions & 1 deletion client/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien
}

config := protocol.Config{
Dial: o.DialFunc,
Dial: o.DialFunc,
ConcurrentLeaderConns: o.ConcurrentLeaderConns,
}
connector := protocol.NewConnector(0, store, config, o.LogFunc)
protocol, err := connector.Connect(ctx)
Expand Down
52 changes: 35 additions & 17 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ import (

// Driver perform queries against a dqlite server.
type Driver struct {
log client.LogFunc // Log function to use
store client.NodeStore // Holds addresses of dqlite servers
context context.Context // Global cancellation context
connectionTimeout time.Duration // Max time to wait for a new connection
contextTimeout time.Duration // Default client context timeout.
clientConfig protocol.Config // Configuration for dqlite client instances
tracing client.LogLevel // Whether to trace statements
log client.LogFunc // Log function to use
store client.NodeStore // Holds addresses of dqlite servers
context context.Context // Global cancellation context
connectionTimeout time.Duration // Max time to wait for a new connection
contextTimeout time.Duration // Default client context timeout.
clientConfig protocol.Config // Configuration for dqlite client instances
tracing client.LogLevel // Whether to trace statements
concurrentLeaderConns *int64 // Maximum number of concurrent connections to other cluster members while probing for leadership.
}

// Error is returned in case of database errors.
Expand Down Expand Up @@ -124,6 +125,17 @@ func WithConnectionBackoffCap(cap time.Duration) Option {
}
}

// WithConcurrentLeaderConns is the maximum number of concurrent connections
// to other cluster members that will be attempted while searching for the dqlite leader.
// It takes a pointer to an integer so that the value can be dynamically modified based on cluster health.
//
// The default is 10 connections to other cluster members.
func WithConcurrentLeaderConns(maxConns *int64) Option {
return func(o *options) {
o.ConcurrentLeaderConns = maxConns
}
}

// WithAttemptTimeout sets the timeout for each individual connection attempt.
//
// The Connector.Connect() and Driver.Open() methods try to find the current
Expand Down Expand Up @@ -187,12 +199,13 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) {
}

driver := &Driver{
log: o.Log,
store: store,
context: o.Context,
connectionTimeout: o.ConnectionTimeout,
contextTimeout: o.ContextTimeout,
tracing: o.Tracing,
log: o.Log,
store: store,
context: o.Context,
connectionTimeout: o.ConnectionTimeout,
contextTimeout: o.ContextTimeout,
tracing: o.Tracing,
concurrentLeaderConns: o.ConcurrentLeaderConns,
clientConfig: protocol.Config{
Dial: o.Dial,
AttemptTimeout: o.AttemptTimeout,
Expand All @@ -214,17 +227,20 @@ type options struct {
ContextTimeout time.Duration
ConnectionBackoffFactor time.Duration
ConnectionBackoffCap time.Duration
ConcurrentLeaderConns *int64
RetryLimit uint
Context context.Context
Tracing client.LogLevel
}

// Create a options object with sane defaults.
func defaultOptions() *options {
maxConns := protocol.MaxConcurrentLeaderConns
return &options{
Log: client.DefaultLogFunc,
Dial: client.DefaultDialFunc,
Tracing: client.LogNone,
Log: client.DefaultLogFunc,
Dial: client.DefaultDialFunc,
Tracing: client.LogNone,
ConcurrentLeaderConns: &maxConns,
}
}

Expand All @@ -248,7 +264,9 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
}

// TODO: generate a client ID.
connector := protocol.NewConnector(0, c.driver.store, c.driver.clientConfig, c.driver.log)
config := c.driver.clientConfig
config.ConcurrentLeaderConns = *c.driver.concurrentLeaderConns
connector := protocol.NewConnector(0, c.driver.store, config, c.driver.log)

conn := &Conn{
log: c.driver.log,
Expand Down
13 changes: 7 additions & 6 deletions internal/protocol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (

// Config holds various configuration parameters for a dqlite client.
type Config struct {
Dial DialFunc // Network dialer.
DialTimeout time.Duration // Timeout for establishing a network connection .
AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership.
BackoffFactor time.Duration // Exponential backoff factor for retries.
BackoffCap time.Duration // Maximum connection retry backoff value,
RetryLimit uint // Maximum number of retries, or 0 for unlimited.
Dial DialFunc // Network dialer.
DialTimeout time.Duration // Timeout for establishing a network connection .
AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership.
BackoffFactor time.Duration // Exponential backoff factor for retries.
BackoffCap time.Duration // Maximum connection retry backoff value,
RetryLimit uint // Maximum number of retries, or 0 for unlimited.
ConcurrentLeaderConns int64 // Maximum number of concurrent connections to other cluster members while probing for leadership.
}
9 changes: 8 additions & 1 deletion internal/protocol/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"golang.org/x/sync/semaphore"
)

// MaxConcurrentLeaderConns is the default maximum number of concurrent requests to other cluster members to probe for leadership.
const MaxConcurrentLeaderConns int64 = 10

// DialFunc is a function that can be used to establish a network connection.
type DialFunc func(context.Context, string) (net.Conn, error)

Expand Down Expand Up @@ -53,6 +56,10 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *
config.BackoffCap = time.Second
}

if config.ConcurrentLeaderConns == 0 {
config.ConcurrentLeaderConns = MaxConcurrentLeaderConns
}

connector := &Connector{
id: id,
store: store,
Expand Down Expand Up @@ -131,7 +138,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
ctx, cancel := context.WithCancel(ctx)
defer cancel()

sem := semaphore.NewWeighted(10)
sem := semaphore.NewWeighted(c.config.ConcurrentLeaderConns)

protocolChan := make(chan *Protocol)

Expand Down
Loading