-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.go
85 lines (72 loc) · 1.68 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package ramix
import (
"context"
"errors"
"net"
"sync"
"time"
)
type Connection interface {
ID() uint64
RemoteAddress() net.Addr
SendMessage(event uint32, body []byte) error
open()
writer()
reader()
close(syncConnectionManager bool)
refreshLastActiveTime()
isAlive() bool
submitTask(ctx *Context)
}
type netConnection struct {
id uint64
isClosed bool
ctx context.Context
cancel context.CancelFunc
messageChannel chan []byte
lastActiveTime time.Time
server *Server
worker *worker
heartbeatChecker *heartbeatChecker
frameDecoder *FrameDecoder
lock sync.RWMutex
}
func (c *netConnection) refreshLastActiveTime() {
c.lastActiveTime = time.Now()
}
func (c *netConnection) isAlive() bool {
return !c.isClosed && c.lastActiveTime.Add(c.server.HeartbeatTimeout).After(time.Now())
}
func (c *netConnection) submitTask(ctx *Context) {
c.worker.tasks <- ctx
}
func (c *netConnection) ID() uint64 {
return c.id
}
func (c *netConnection) SendMessage(event uint32, body []byte) error {
if c.isClosed {
return errors.New("connection is closed")
}
encodedMessage, err := c.server.encoder.Encode(Message{
Event: event,
Body: body,
BodySize: uint32(len(body)),
})
if err != nil {
return err
}
c.messageChannel <- encodedMessage
return nil
}
func newNetConnection(connectionID uint64, s *Server) *netConnection {
return &netConnection{
id: connectionID,
isClosed: false,
messageChannel: make(chan []byte, s.ConnectionWriteBufferSize),
server: s,
frameDecoder: NewFrameDecoder(
WithLengthFieldOffset(4),
WithLengthFieldLength(4),
),
}
}