forked from klintcheng/kim
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontext.go
149 lines (129 loc) · 3.4 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package kim
import (
"sync"
"github.com/klintcheng/kim/logger"
"github.com/klintcheng/kim/wire"
"github.com/klintcheng/kim/wire/pkt"
"google.golang.org/protobuf/proto"
)
// Session read only
type Session interface {
GetChannelId() string
GetGateId() string
GetAccount() string
GetRemoteIP() string
GetApp() string
GetTags() []string
}
type Context interface {
Dispatcher
SessionStorage
Header() *pkt.Header
ReadBody(val proto.Message) error
Session() Session
RespWithError(status pkt.Status, err error) error
Resp(status pkt.Status, body proto.Message) error
Dispatch(body proto.Message, recvs ...*Location) error
Next()
}
// HandlerFunc defines the handler used
type HandlerFunc func(Context)
// HandlersChain HandlersChain
type HandlersChain []HandlerFunc
// ContextImpl is the most important part of kim
type ContextImpl struct {
sync.Mutex
Dispatcher
SessionStorage
handlers HandlersChain
index int
request *pkt.LogicPkt
session Session
}
func BuildContext() Context {
return &ContextImpl{}
}
// Next execute next handler
func (c *ContextImpl) Next() {
if c.index >= len(c.handlers) {
return
}
f := c.handlers[c.index]
c.index++
if f == nil {
logger.Warn("arrived unknown HandlerFunc")
return
}
f(c)
}
// RespWithError response with error
func (c *ContextImpl) RespWithError(status pkt.Status, err error) error {
return c.Resp(status, &pkt.ErrorResp{Message: err.Error()})
}
// Resp send a response message to sender, the header of packet copied from request
func (c *ContextImpl) Resp(status pkt.Status, body proto.Message) error {
packet := pkt.NewFrom(&c.request.Header)
packet.Status = status
packet.WriteBody(body)
packet.Flag = pkt.Flag_Response
logger.Debugf("<-- Resp to %s command:%s status: %v body: %s", c.Session().GetAccount(), &c.request.Header, status, body)
err := c.Push(c.Session().GetGateId(), []string{c.Session().GetChannelId()}, packet)
if err != nil {
logger.Error(err)
}
return err
}
// Dispatch the packet to the Destination of request,
// the header flag of this packet will be set with FlagDelivery
// exceptMe: exclude self if self is false
func (c *ContextImpl) Dispatch(body proto.Message, recvs ...*Location) error {
if len(recvs) == 0 {
return nil
}
packet := pkt.NewFrom(&c.request.Header)
packet.Flag = pkt.Flag_Push
packet.WriteBody(body)
logger.Debugf("<-- Dispatch to %d users command:%s", len(recvs), &c.request.Header)
// the receivers group by the destination of gateway
group := make(map[string][]string)
for _, recv := range recvs {
if recv.ChannelId == c.Session().GetChannelId() {
continue
}
if _, ok := group[recv.GateId]; !ok {
group[recv.GateId] = make([]string, 0)
}
group[recv.GateId] = append(group[recv.GateId], recv.ChannelId)
}
for gateway, ids := range group {
err := c.Push(gateway, ids, packet)
if err != nil {
logger.Error(err)
}
return err
}
return nil
}
func (c *ContextImpl) reset() {
c.request = nil
c.index = 0
c.handlers = nil
c.session = nil
}
func (c *ContextImpl) Header() *pkt.Header {
return &c.request.Header
}
func (c *ContextImpl) ReadBody(val proto.Message) error {
return c.request.ReadBody(val)
}
func (c *ContextImpl) Session() Session {
if c.session == nil {
server, _ := c.request.GetMeta(wire.MetaDestServer)
c.session = &pkt.Session{
ChannelId: c.request.ChannelId,
GateId: server.(string),
Tags: []string{"AutoGenerated"},
}
}
return c.session
}