diff --git a/.gitignore b/.gitignore index 71eeca2..80d9a36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .chglog -.env \ No newline at end of file +.env +.vscode \ No newline at end of file diff --git a/ticker/ticker.go b/ticker/ticker.go index 811374e..23206e1 100644 --- a/ticker/ticker.go +++ b/ticker/ticker.go @@ -9,6 +9,7 @@ import ( "math" "net/url" "sync" + "sync/atomic" "time" "github.com/gorilla/websocket" @@ -28,7 +29,7 @@ type Ticker struct { url url.URL callbacks callbacks - lastPingTime time.Time + lastPingTime atomicTime autoReconnect bool reconnectMaxRetries int reconnectMaxDelay time.Duration @@ -41,6 +42,22 @@ type Ticker struct { cancel context.CancelFunc } +// atomicTime is wrapper over time.Time to safely access +// an updating timestamp concurrently. +type atomicTime struct { + v atomic.Value +} + +// Get returns the current timestamp. +func (b *atomicTime) Get() time.Time { + return b.v.Load().(time.Time) +} + +// Set sets the current timestamp. +func (b *atomicTime) Set(value time.Time) { + b.v.Store(value) +} + // callbacks represents callbacks available in ticker. type callbacks struct { onTick func(models.Tick) @@ -311,7 +328,7 @@ func (t *Ticker) ServeWithContext(ctx context.Context) { t.reconnectAttempt = 0 // Set current time as last ping time - t.lastPingTime = time.Now() + t.lastPingTime.Set(time.Now()) // Set on close handler t.Conn.SetCloseHandler(t.handleClose) @@ -339,6 +356,7 @@ func (t *Ticker) handleClose(code int, reason string) error { return nil } + // Trigger callback methods func (t *Ticker) triggerError(err error) { if t.callbacks.onError != nil { @@ -370,6 +388,7 @@ func (t *Ticker) triggerNoReconnect(attempt int) { } } + func (t *Ticker) triggerMessage(messageType int, message []byte) { if t.callbacks.onMessage != nil { t.callbacks.onMessage(messageType, message) @@ -401,7 +420,7 @@ func (t *Ticker) checkConnection(ctx context.Context, wg *sync.WaitGroup) { // If last ping time is greater then timeout interval then close the // existing connection and reconnect - if time.Since(t.lastPingTime) > dataTimeoutInterval { + if time.Since(t.lastPingTime.Get()) > dataTimeoutInterval { // Close the current connection without waiting for close frame if t.Conn != nil { t.Conn.Close() @@ -431,7 +450,7 @@ func (t *Ticker) readMessage(ctx context.Context, wg *sync.WaitGroup) { } // Update last ping time to check for connection - t.lastPingTime = time.Now() + t.lastPingTime.Set(time.Now()) // Trigger message. t.triggerMessage(mType, msg)