-
Notifications
You must be signed in to change notification settings - Fork 17
/
message_expire.go
102 lines (82 loc) · 2.01 KB
/
message_expire.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package server
import (
"fmt"
"time"
"github.com/muka/peerjs-go/models"
"github.com/sirupsen/logrus"
)
//IMessagesExpire MessagesExpire interface
type IMessagesExpire interface {
Start()
Stop()
}
func NewMessagesExpire(realm IRealm, opts Options, messageHandler IMessageHandler) *MessagesExpire {
return &MessagesExpire{
realm: realm,
opts: opts,
messageHandler: messageHandler,
log: createLogger("messageExpire", opts),
close: make(chan bool, 1),
}
}
//MessagesExpire check for expired messages
type MessagesExpire struct {
realm IRealm
opts Options
messageHandler IMessageHandler
ticker *time.Ticker
log *logrus.Entry
close chan bool
}
func (b *MessagesExpire) pruneOutstanding() {
destinationClientsIds := b.realm.GetClientsIdsWithQueue()
now := getTime()
maxDiff := b.opts.ExpireTimeout
seen := map[string]bool{}
for _, destinationClientID := range destinationClientsIds {
messageQueue := b.realm.GetMessageQueueByID(destinationClientID)
if messageQueue == nil {
continue
}
lastReadDiff := now - messageQueue.GetLastReadAt()
if lastReadDiff < maxDiff {
continue
}
messages := messageQueue.GetMessages()
for _, message := range messages {
seenKey := fmt.Sprintf("%s_%s", message.GetSrc(), message.GetDst())
if _, ok := seen[seenKey]; !ok {
b.messageHandler.Handle(nil, models.Message{
Type: MessageTypeExpire,
Src: message.GetDst(),
Dst: message.GetSrc(),
})
seen[seenKey] = true
}
}
b.realm.ClearMessageQueue(destinationClientID)
}
}
//Start the message expire check
func (b *MessagesExpire) Start() {
b.ticker = time.NewTicker(DefaultCheckInterval * time.Millisecond)
go func() {
for {
select {
case <-b.close:
b.ticker.Stop()
b.ticker = nil
return
case <-b.ticker.C:
b.pruneOutstanding()
}
}
}()
}
//Stop the message expire check
func (b *MessagesExpire) Stop() {
if b.ticker == nil {
return
}
b.close <- true
}