-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathws_connection.go
110 lines (84 loc) · 1.97 KB
/
ws_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package ramix
import (
"github.com/gorilla/websocket"
"net"
)
type WebSocketConnection struct {
*netConnection
socket *websocket.Conn
}
func (c *WebSocketConnection) open() {
go c.reader()
go c.writer()
go c.heartbeatChecker.start()
if c.server.connectionOpen != nil {
c.server.connectionOpen(c)
}
}
func (c *WebSocketConnection) close(syncConnectionManager bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.isClosed {
return
}
if c.server.connectionClose != nil {
c.server.connectionClose(c)
}
_ = c.socket.Close()
c.isClosed = true
c.cancel()
close(c.messageChannel)
c.heartbeatChecker.stop()
if syncConnectionManager {
c.server.connectionManager.removeConnection(c)
}
// If the worker pool is not used, need to stop the worker by self
if !c.server.UsingWorkerPool() {
c.worker.stop()
}
debug("WebSocketConnection %d closed, remote address: %v", c.ID(), c.socket.RemoteAddr())
}
func (c *WebSocketConnection) writer() {
for {
select {
case <-c.ctx.Done():
debug("WebSocketConnection %d writer stopped", c.ID())
return
case data := <-c.messageChannel:
_ = c.socket.WriteMessage(websocket.BinaryMessage, data)
}
}
}
func (c *WebSocketConnection) reader() {
defer c.close(true)
for {
select {
case <-c.ctx.Done():
debug("WebSocketConnection %d reader stopped", c.ID())
return
default:
messageType, buffer, err := c.socket.ReadMessage()
if messageType == websocket.PingMessage {
c.refreshLastActiveTime()
continue
}
if err != nil {
debug("WebSocket read error: %v", err)
return
}
c.refreshLastActiveTime()
bytesSlices := c.frameDecoder.Decode(buffer)
for _, bytesSlice := range bytesSlices {
message, err := c.server.decoder.Decode(bytesSlice)
if err != nil {
debug("Message decode error: %v", err)
continue
}
c.server.handleRequest(c, newRequest(message))
}
}
}
}
func (c *WebSocketConnection) RemoteAddress() net.Addr {
return c.socket.RemoteAddr()
}