diff --git a/client.go b/client.go index c7222ea9..f27216f7 100644 --- a/client.go +++ b/client.go @@ -95,8 +95,8 @@ type Client interface { // client implements the Client interface type client struct { - lastSent int64 - lastReceived int64 + lastSent atomic.Value + lastReceived atomic.Value pingOutstanding int32 status uint32 sync.RWMutex @@ -300,8 +300,8 @@ func (c *client) Connect() Token { if c.options.KeepAlive != 0 { atomic.StoreInt32(&c.pingOutstanding, 0) - atomic.StoreInt64(&c.lastReceived, time.Now().Unix()) - atomic.StoreInt64(&c.lastSent, time.Now().Unix()) + c.lastReceived.Store(time.Now()) + c.lastSent.Store(time.Now()) c.workers.Add(1) go keepalive(c) } @@ -412,8 +412,8 @@ func (c *client) reconnect() { if c.options.KeepAlive != 0 { atomic.StoreInt32(&c.pingOutstanding, 0) - atomic.StoreInt64(&c.lastReceived, time.Now().Unix()) - atomic.StoreInt64(&c.lastSent, time.Now().Unix()) + c.lastReceived.Store(time.Now()) + c.lastSent.Store(time.Now()) c.workers.Add(1) go keepalive(c) } diff --git a/net.go b/net.go index 83349a52..3e6366be 100644 --- a/net.go +++ b/net.go @@ -137,7 +137,7 @@ func incoming(c *client) { case c.ibound <- cp: // Notify keepalive logic that we recently received a packet if c.options.KeepAlive != 0 { - atomic.StoreInt64(&c.lastReceived, time.Now().Unix()) + c.lastReceived.Store(time.Now()) } case <-c.stop: // This avoids a deadlock should a message arrive while shutting down. @@ -221,7 +221,7 @@ func outgoing(c *client) { } // Reset ping timer after sending control packet. if c.options.KeepAlive != 0 { - atomic.StoreInt64(&c.lastSent, time.Now().Unix()) + c.lastSent.Store(time.Now()) } } } diff --git a/ping.go b/ping.go index 6df939ef..dcbcb1dd 100644 --- a/ping.go +++ b/ping.go @@ -43,8 +43,11 @@ func keepalive(c *client) { DEBUG.Println(PNG, "keepalive stopped") return case <-intervalTicker.C: - DEBUG.Println(PNG, "ping check", time.Now().Unix()-atomic.LoadInt64(&c.lastSent)) - if time.Now().Unix()-atomic.LoadInt64(&c.lastSent) >= c.options.KeepAlive || time.Now().Unix()-atomic.LoadInt64(&c.lastReceived) >= c.options.KeepAlive { + lastSent := c.lastSent.Load().(time.Time) + lastReceived := c.lastReceived.Load().(time.Time) + + DEBUG.Println(PNG, "ping check", time.Since(lastSent).Seconds()) + if time.Since(lastSent) >= time.Duration(c.options.KeepAlive*int64(time.Second)) || time.Since(lastReceived) >= time.Duration(c.options.KeepAlive*int64(time.Second)) { if atomic.LoadInt32(&c.pingOutstanding) == 0 { DEBUG.Println(PNG, "keepalive sending ping") ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) @@ -52,7 +55,7 @@ func keepalive(c *client) { //will block until it it able to send the packet. atomic.StoreInt32(&c.pingOutstanding, 1) ping.Write(c.conn) - atomic.StoreInt64(&c.lastSent, time.Now().Unix()) + c.lastSent.Store(time.Now()) pingSent = time.Now() } }