-
Notifications
You must be signed in to change notification settings - Fork 224
/
inflight.go
156 lines (128 loc) · 3.93 KB
/
inflight.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
package mqtt
import (
"sort"
"sync"
"sync/atomic"
"github.com/mochi-mqtt/server/v2/packets"
)
// Inflight is a map of InflightMessage keyed on packet id.
type Inflight struct {
sync.RWMutex
internal map[uint16]packets.Packet // internal contains the inflight packets
receiveQuota int32 // remaining inbound qos quota for flow control
sendQuota int32 // remaining outbound qos quota for flow control
maximumReceiveQuota int32 // maximum allowed receive quota
maximumSendQuota int32 // maximum allowed send quota
}
// NewInflights returns a new instance of an Inflight packets map.
func NewInflights() *Inflight {
return &Inflight{
internal: map[uint16]packets.Packet{},
}
}
// Set adds or updates an inflight packet by packet id.
func (i *Inflight) Set(m packets.Packet) bool {
i.Lock()
defer i.Unlock()
_, ok := i.internal[m.PacketID]
i.internal[m.PacketID] = m
return !ok
}
// Get returns an inflight packet by packet id.
func (i *Inflight) Get(id uint16) (packets.Packet, bool) {
i.RLock()
defer i.RUnlock()
if m, ok := i.internal[id]; ok {
return m, true
}
return packets.Packet{}, false
}
// Len returns the size of the inflight messages map.
func (i *Inflight) Len() int {
i.RLock()
defer i.RUnlock()
return len(i.internal)
}
// Clone returns a new instance of Inflight with the same message data.
// This is used when transferring inflights from a taken-over session.
func (i *Inflight) Clone() *Inflight {
c := NewInflights()
i.RLock()
defer i.RUnlock()
for k, v := range i.internal {
c.internal[k] = v
}
return c
}
// GetAll returns all the inflight messages.
func (i *Inflight) GetAll(immediate bool) []packets.Packet {
i.RLock()
defer i.RUnlock()
m := []packets.Packet{}
for _, v := range i.internal {
if !immediate || (immediate && v.Expiry < 0) {
m = append(m, v)
}
}
sort.Slice(m, func(i, j int) bool {
return uint16(m[i].Created) < uint16(m[j].Created)
})
return m
}
// NextImmediate returns the next inflight packet which is indicated to be sent immediately.
// This typically occurs when the quota has been exhausted, and we need to wait until new quota
// is free to continue sending.
func (i *Inflight) NextImmediate() (packets.Packet, bool) {
i.RLock()
defer i.RUnlock()
m := i.GetAll(true)
if len(m) > 0 {
return m[0], true
}
return packets.Packet{}, false
}
// Delete removes an in-flight message from the map. Returns true if the message existed.
func (i *Inflight) Delete(id uint16) bool {
i.Lock()
defer i.Unlock()
_, ok := i.internal[id]
delete(i.internal, id)
return ok
}
// TakeRecieveQuota reduces the receive quota by 1.
func (i *Inflight) DecreaseReceiveQuota() {
if atomic.LoadInt32(&i.receiveQuota) > 0 {
atomic.AddInt32(&i.receiveQuota, -1)
}
}
// TakeRecieveQuota increases the receive quota by 1.
func (i *Inflight) IncreaseReceiveQuota() {
if atomic.LoadInt32(&i.receiveQuota) < atomic.LoadInt32(&i.maximumReceiveQuota) {
atomic.AddInt32(&i.receiveQuota, 1)
}
}
// ResetReceiveQuota resets the receive quota to the maximum allowed value.
func (i *Inflight) ResetReceiveQuota(n int32) {
atomic.StoreInt32(&i.receiveQuota, n)
atomic.StoreInt32(&i.maximumReceiveQuota, n)
}
// DecreaseSendQuota reduces the send quota by 1.
func (i *Inflight) DecreaseSendQuota() {
if atomic.LoadInt32(&i.sendQuota) > 0 {
atomic.AddInt32(&i.sendQuota, -1)
}
}
// IncreaseSendQuota increases the send quota by 1.
func (i *Inflight) IncreaseSendQuota() {
if atomic.LoadInt32(&i.sendQuota) < atomic.LoadInt32(&i.maximumSendQuota) {
atomic.AddInt32(&i.sendQuota, 1)
}
}
// ResetSendQuota resets the send quota to the maximum allowed value.
func (i *Inflight) ResetSendQuota(n int32) {
atomic.StoreInt32(&i.sendQuota, n)
atomic.StoreInt32(&i.maximumSendQuota, n)
}