Skip to content

Commit

Permalink
Merge pull request #1 from WallEnd/backoffconfig
Browse files Browse the repository at this point in the history
Backoffconfig Extracting backoff and reconnect strategy from client.
  • Loading branch information
WallEnd authored Jul 17, 2020
2 parents 1d40ffb + e7972b3 commit bd53f2e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 21 deletions.
22 changes: 13 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Client struct {
mutex sync.RWMutex
url string
encoding protocol.Type
config Config
config WsConfig
token string
connectData protocol.Raw
transport transport
Expand Down Expand Up @@ -121,10 +121,10 @@ func New(u string, config Config) *Client {
url: u,
encoding: encoding,
subs: make(map[string]*Subscription),
config: config,
config: config.WsConfig,
requests: make(map[uint32]chan protocol.Reply),
reconnect: true,
reconnectStrategy: defaultBackoffReconnect,
reconnectStrategy: config.getBackoffReconnect(),
paramsEncoder: newParamsEncoder(encoding),
resultDecoder: newResultDecoder(encoding),
commandEncoder: newCommandEncoder(encoding),
Expand Down Expand Up @@ -376,12 +376,16 @@ type backoffReconnect struct {
MaxMilliseconds int
}

var defaultBackoffReconnect = &backoffReconnect{
NumReconnect: 0,
MinMilliseconds: 100,
MaxMilliseconds: 20 * 1000,
Factor: 2,
Jitter: true,
func (c *Config) getBackoffReconnect() (bor *backoffReconnect) {
bor = &backoffReconnect{
NumReconnect: c.BackoffConfig.NumReconnect,
Factor: c.BackoffConfig.Factor,
Jitter: c.BackoffConfig.Jitter,
MaxMilliseconds: c.BackoffConfig.MaxMilliseconds,
MinMilliseconds: c.BackoffConfig.MinMilliseconds,
}

return
}

func (r *backoffReconnect) timeBeforeNextAttempt(attempt int) (time.Duration, error) {
Expand Down
55 changes: 46 additions & 9 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@ const (
DefaultPingInterval = 25 * time.Second
// DefaultPrivateChannelPrefix ...
DefaultPrivateChannelPrefix = "$"
// NumReconnect is maximum number of reconnect attempts, 0 means reconnect forever.
DefaultBackoffNumReconnect = 0
// MinMilliseconds is a minimum value of the reconnect interval.
DefaultBackoffMinMilliseconds = 100
// MaxMilliseconds is a maximum value of the reconnect interval.
DefaultBackoffMaxMilliseconds = 20 * 1000
// Factor is the multiplying factor for each increment step.
DefaultBackoffFactor = 2
// Jitter eases contention by randomizing backoff steps.
DefaultBackoffJitter = true
)

// Config contains various client options.
type Config struct {
// WsConfig contains various client options.
type WsConfig struct {
// NetDialContext specifies the dial function for creating TCP connections. If
// NetDialContext is nil, net.DialContext is used.
NetDialContext func(ctx context.Context, network, addr string) (net.Conn, error)
Expand Down Expand Up @@ -53,14 +63,41 @@ type Config struct {
Header http.Header
}

// DefaultConfig returns Config with default options.
type BackoffReconnectConfig struct {
// NumReconnect is maximum number of reconnect attempts, 0 means reconnect forever.
NumReconnect int
// Factor is the multiplying factor for each increment step.
Factor float64
// Jitter eases contention by randomizing backoff steps.
Jitter bool
// MinMilliseconds is a minimum value of the reconnect interval.
MinMilliseconds int
// MaxMilliseconds is a maximum value of the reconnect interval.
MaxMilliseconds int
}

type Config struct {
BackoffConfig BackoffReconnectConfig
WsConfig WsConfig
}

// DefaultConfig returns WsConfig with default options.
func DefaultConfig() Config {
return Config{
PingInterval: DefaultPingInterval,
ReadTimeout: DefaultReadTimeout,
WriteTimeout: DefaultWriteTimeout,
HandshakeTimeout: DefaultHandshakeTimeout,
PrivateChannelPrefix: DefaultPrivateChannelPrefix,
Header: http.Header{},
BackoffReconnectConfig{
NumReconnect: DefaultBackoffNumReconnect,
Factor: DefaultBackoffFactor,
Jitter: DefaultBackoffJitter,
MinMilliseconds: DefaultBackoffMinMilliseconds,
MaxMilliseconds: DefaultBackoffMaxMilliseconds,
},
WsConfig{
PingInterval: DefaultPingInterval,
ReadTimeout: DefaultReadTimeout,
WriteTimeout: DefaultWriteTimeout,
HandshakeTimeout: DefaultHandshakeTimeout,
PrivateChannelPrefix: DefaultPrivateChannelPrefix,
Header: http.Header{},
},
}
}
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ var (
ErrReconnectFailed = errors.New("reconnect failed")
// ErrDuplicateSubscription ...
ErrDuplicateSubscription = errors.New("duplicate subscription")
//ErrConnectionClosed
ErrClientDestroyed = errors.New("client destroyed")
)
5 changes: 2 additions & 3 deletions examples/reconnect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package main
// Demonstrate how to reconnect.

import (
"centrifuge-go"
"fmt"
"log"
"time"

"github.com/centrifugal/centrifuge-go"
)

func init() {
Expand Down Expand Up @@ -45,7 +44,7 @@ func (h *subEventHandler) OnUnsubscribe(sub *centrifuge.Subscription, e centrifu
}

func newConnection() *centrifuge.Client {
url := "ws://localhost:8000/connection/websocket?format=protobuf"
url := "ws://localhost:8008/connection/websocket?format=protobuf"

c := centrifuge.New(url, centrifuge.DefaultConfig())

Expand Down

0 comments on commit bd53f2e

Please sign in to comment.