diff --git a/app/app.go b/app/app.go index e3e2d2e8..85562f87 100644 --- a/app/app.go +++ b/app/app.go @@ -51,6 +51,7 @@ type App struct { voters int standbys int roles RolesConfig + options *options } // New creates a new application node. @@ -219,6 +220,7 @@ func New(dir string, options ...Option) (app *App, err error) { driver.WithDialFunc(driverDial), driver.WithLogFunc(o.Log), driver.WithTracing(o.Tracing), + driver.WithConcurrentLeaderConns(o.ConcurrentLeaderConns), ) if err != nil { stop() @@ -256,6 +258,7 @@ func New(dir string, options ...Option) (app *App, err error) { voters: o.Voters, standbys: o.StandBys, roles: RolesConfig{Voters: o.Voters, StandBys: o.StandBys}, + options: o, } // Start the proxy if a TLS configuration was provided. @@ -484,8 +487,11 @@ func (a *App) Open(ctx context.Context, database string) (*sql.DB, error) { } // Leader returns a client connected to the current cluster leader, if any. -func (a *App) Leader(ctx context.Context) (*client.Client, error) { - return client.FindLeader(ctx, a.store, a.clientOptions()...) +func (a *App) Leader(ctx context.Context, options ...client.Option) (*client.Client, error) { + allOptions := a.clientOptions() + allOptions = append(allOptions, options...) + + return client.FindLeader(ctx, a.store, allOptions...) } // Client returns a client connected to the local node. @@ -734,7 +740,7 @@ func (a *App) makeRolesChanges(nodes []client.NodeInfo) RolesChanges { // Return the options to use for client.FindLeader() or client.New() func (a *App) clientOptions() []client.Option { - return []client.Option{client.WithDialFunc(a.dialFunc), client.WithLogFunc(a.log)} + return []client.Option{client.WithDialFunc(a.dialFunc), client.WithLogFunc(a.log), client.WithConcurrentLeaderConns(*a.options.ConcurrentLeaderConns)} } func (a *App) debug(format string, args ...interface{}) { diff --git a/app/options.go b/app/options.go index 48e59990..71871139 100644 --- a/app/options.go +++ b/app/options.go @@ -10,6 +10,7 @@ import ( "github.com/canonical/go-dqlite" "github.com/canonical/go-dqlite/client" + "github.com/canonical/go-dqlite/internal/protocol" ) // Option can be used to tweak app parameters. @@ -180,6 +181,17 @@ func WithNetworkLatency(latency time.Duration) Option { } } +// WithConcurrentLeaderConns is the maximum number of concurrent connections +// to other cluster members that will be attempted while searching for the dqlite leader. +// It takes a pointer to an integer so that the value can be dynamically modified based on cluster health. +// +// The default is 10 connections to other cluster members. +func WithConcurrentLeaderConns(maxConns *int64) Option { + return func(o *options) { + o.ConcurrentLeaderConns = maxConns + } +} + // WithSnapshotParams sets the raft snapshot parameters. func WithSnapshotParams(params dqlite.SnapshotParams) Option { return func(options *options) { @@ -235,6 +247,7 @@ type options struct { OnRolesAdjustment func(client.NodeInfo, []client.NodeInfo) error FailureDomain uint64 NetworkLatency time.Duration + ConcurrentLeaderConns *int64 UnixSocket string SnapshotParams dqlite.SnapshotParams DiskMode bool @@ -243,6 +256,7 @@ type options struct { // Create a options object with sane defaults. func defaultOptions() *options { + maxConns := protocol.MaxConcurrentLeaderConns return &options{ Log: defaultLogFunc, Tracing: client.LogNone, @@ -252,6 +266,7 @@ func defaultOptions() *options { OnRolesAdjustment: func(client.NodeInfo, []client.NodeInfo) error { return nil }, DiskMode: false, // Be explicit about not enabling disk-mode by default. AutoRecovery: true, + ConcurrentLeaderConns: &maxConns, } } diff --git a/client/client.go b/client/client.go index c8809be7..76987b23 100644 --- a/client/client.go +++ b/client/client.go @@ -19,8 +19,9 @@ type Client struct { type Option func(*options) type options struct { - DialFunc DialFunc - LogFunc LogFunc + DialFunc DialFunc + LogFunc LogFunc + ConcurrentLeaderConns int64 } // WithDialFunc sets a custom dial function for creating the client network @@ -39,6 +40,16 @@ func WithLogFunc(log LogFunc) Option { } } +// WithConcurrentLeaderConns is the maximum number of concurrent connections +// to other cluster members that will be attempted while searching for the dqlite leader. +// +// The default is 10 connections to other cluster members. +func WithConcurrentLeaderConns(maxConns int64) Option { + return func(o *options) { + o.ConcurrentLeaderConns = maxConns + } +} + // New creates a new client connected to the dqlite node with the given // address. func New(ctx context.Context, address string, options ...Option) (*Client, error) { @@ -313,7 +324,8 @@ func (c *Client) Close() error { // Create a client options object with sane defaults. func defaultOptions() *options { return &options{ - DialFunc: DefaultDialFunc, - LogFunc: DefaultLogFunc, + DialFunc: DefaultDialFunc, + LogFunc: DefaultLogFunc, + ConcurrentLeaderConns: protocol.MaxConcurrentLeaderConns, } } diff --git a/client/leader.go b/client/leader.go index 5de72ce9..d98ce2bb 100644 --- a/client/leader.go +++ b/client/leader.go @@ -20,7 +20,8 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien } config := protocol.Config{ - Dial: o.DialFunc, + Dial: o.DialFunc, + ConcurrentLeaderConns: o.ConcurrentLeaderConns, } connector := protocol.NewConnector(0, store, config, o.LogFunc) protocol, err := connector.Connect(ctx) diff --git a/driver/driver.go b/driver/driver.go index b33b9495..95479e75 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -34,13 +34,14 @@ import ( // Driver perform queries against a dqlite server. type Driver struct { - log client.LogFunc // Log function to use - store client.NodeStore // Holds addresses of dqlite servers - context context.Context // Global cancellation context - connectionTimeout time.Duration // Max time to wait for a new connection - contextTimeout time.Duration // Default client context timeout. - clientConfig protocol.Config // Configuration for dqlite client instances - tracing client.LogLevel // Whether to trace statements + log client.LogFunc // Log function to use + store client.NodeStore // Holds addresses of dqlite servers + context context.Context // Global cancellation context + connectionTimeout time.Duration // Max time to wait for a new connection + contextTimeout time.Duration // Default client context timeout. + clientConfig protocol.Config // Configuration for dqlite client instances + tracing client.LogLevel // Whether to trace statements + concurrentLeaderConns *int64 // Maximum number of concurrent connections to other cluster members while probing for leadership. } // Error is returned in case of database errors. @@ -124,6 +125,17 @@ func WithConnectionBackoffCap(cap time.Duration) Option { } } +// WithConcurrentLeaderConns is the maximum number of concurrent connections +// to other cluster members that will be attempted while searching for the dqlite leader. +// It takes a pointer to an integer so that the value can be dynamically modified based on cluster health. +// +// The default is 10 connections to other cluster members. +func WithConcurrentLeaderConns(maxConns *int64) Option { + return func(o *options) { + o.ConcurrentLeaderConns = maxConns + } +} + // WithAttemptTimeout sets the timeout for each individual connection attempt. // // The Connector.Connect() and Driver.Open() methods try to find the current @@ -187,12 +199,13 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) { } driver := &Driver{ - log: o.Log, - store: store, - context: o.Context, - connectionTimeout: o.ConnectionTimeout, - contextTimeout: o.ContextTimeout, - tracing: o.Tracing, + log: o.Log, + store: store, + context: o.Context, + connectionTimeout: o.ConnectionTimeout, + contextTimeout: o.ContextTimeout, + tracing: o.Tracing, + concurrentLeaderConns: o.ConcurrentLeaderConns, clientConfig: protocol.Config{ Dial: o.Dial, AttemptTimeout: o.AttemptTimeout, @@ -214,6 +227,7 @@ type options struct { ContextTimeout time.Duration ConnectionBackoffFactor time.Duration ConnectionBackoffCap time.Duration + ConcurrentLeaderConns *int64 RetryLimit uint Context context.Context Tracing client.LogLevel @@ -221,10 +235,12 @@ type options struct { // Create a options object with sane defaults. func defaultOptions() *options { + maxConns := protocol.MaxConcurrentLeaderConns return &options{ - Log: client.DefaultLogFunc, - Dial: client.DefaultDialFunc, - Tracing: client.LogNone, + Log: client.DefaultLogFunc, + Dial: client.DefaultDialFunc, + Tracing: client.LogNone, + ConcurrentLeaderConns: &maxConns, } } @@ -248,7 +264,9 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) { } // TODO: generate a client ID. - connector := protocol.NewConnector(0, c.driver.store, c.driver.clientConfig, c.driver.log) + config := c.driver.clientConfig + config.ConcurrentLeaderConns = *c.driver.concurrentLeaderConns + connector := protocol.NewConnector(0, c.driver.store, config, c.driver.log) conn := &Conn{ log: c.driver.log, diff --git a/internal/protocol/config.go b/internal/protocol/config.go index de5272df..0555b4ac 100644 --- a/internal/protocol/config.go +++ b/internal/protocol/config.go @@ -6,10 +6,11 @@ import ( // Config holds various configuration parameters for a dqlite client. type Config struct { - Dial DialFunc // Network dialer. - DialTimeout time.Duration // Timeout for establishing a network connection . - AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership. - BackoffFactor time.Duration // Exponential backoff factor for retries. - BackoffCap time.Duration // Maximum connection retry backoff value, - RetryLimit uint // Maximum number of retries, or 0 for unlimited. + Dial DialFunc // Network dialer. + DialTimeout time.Duration // Timeout for establishing a network connection . + AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership. + BackoffFactor time.Duration // Exponential backoff factor for retries. + BackoffCap time.Duration // Maximum connection retry backoff value, + RetryLimit uint // Maximum number of retries, or 0 for unlimited. + ConcurrentLeaderConns int64 // Maximum number of concurrent connections to other cluster members while probing for leadership. } diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index c8e3ed7a..e8f37599 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -18,6 +18,9 @@ import ( "golang.org/x/sync/semaphore" ) +// MaxConcurrentLeaderConns is the default maximum number of concurrent requests to other cluster members to probe for leadership. +const MaxConcurrentLeaderConns int64 = 10 + // DialFunc is a function that can be used to establish a network connection. type DialFunc func(context.Context, string) (net.Conn, error) @@ -53,6 +56,10 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) * config.BackoffCap = time.Second } + if config.ConcurrentLeaderConns == 0 { + config.ConcurrentLeaderConns = MaxConcurrentLeaderConns + } + connector := &Connector{ id: id, store: store, @@ -131,7 +138,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P ctx, cancel := context.WithCancel(ctx) defer cancel() - sem := semaphore.NewWeighted(10) + sem := semaphore.NewWeighted(c.config.ConcurrentLeaderConns) protocolChan := make(chan *Protocol)