-
Notifications
You must be signed in to change notification settings - Fork 534
/
store.go
140 lines (128 loc) · 3.83 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
*/
package mqtt
import (
"fmt"
"strconv"
"github.com/eclipse/paho.mqtt.golang/packets"
)
const (
inboundPrefix = "i."
outboundPrefix = "o."
)
// Store is an interface which can be used to provide implementations
// for message persistence.
// Because we may have to store distinct messages with the same
// message ID, we need a unique key for each message. This is
// possible by prepending "i." or "o." to each message id
type Store interface {
Open()
Put(key string, message packets.ControlPacket)
Get(key string) packets.ControlPacket
All() []string
Del(key string)
Close()
Reset()
}
// A key MUST have the form "X.[messageid]"
// where X is 'i' or 'o'
func mIDFromKey(key string) uint16 {
s := key[2:]
i, err := strconv.ParseUint(s, 10, 16)
chkerr(err)
return uint16(i)
}
// Return true if key prefix is outbound
func isKeyOutbound(key string) bool {
return key[:2] == outboundPrefix
}
// Return true if key prefix is inbound
func isKeyInbound(key string) bool {
return key[:2] == inboundPrefix
}
// Return a string of the form "i.[id]"
func inboundKeyFromMID(id uint16) string {
return fmt.Sprintf("%s%d", inboundPrefix, id)
}
// Return a string of the form "o.[id]"
func outboundKeyFromMID(id uint16) string {
return fmt.Sprintf("%s%d", outboundPrefix, id)
}
// govern which outgoing messages are persisted
func persistOutbound(s Store, m packets.ControlPacket) {
switch m.Details().Qos {
case 0:
switch m.(type) {
case *packets.PubackPacket, *packets.PubcompPacket:
// Sending puback. delete matching publish
// from ibound
s.Del(inboundKeyFromMID(m.Details().MessageID))
}
case 1:
switch m.(type) {
case *packets.PublishPacket, *packets.PubrelPacket, *packets.SubscribePacket, *packets.UnsubscribePacket:
// Sending publish. store in obound
// until puback received
s.Put(outboundKeyFromMID(m.Details().MessageID), m)
default:
ERROR.Println(STR, "Asked to persist an invalid message type")
}
case 2:
switch m.(type) {
case *packets.PublishPacket:
// Sending publish. store in obound
// until pubrel received
s.Put(outboundKeyFromMID(m.Details().MessageID), m)
default:
ERROR.Println(STR, "Asked to persist an invalid message type")
}
}
}
// govern which incoming messages are persisted
func persistInbound(s Store, m packets.ControlPacket) {
switch m.Details().Qos {
case 0:
switch m.(type) {
case *packets.PubackPacket, *packets.SubackPacket, *packets.UnsubackPacket, *packets.PubcompPacket:
// Received a puback. delete matching publish
// from obound
s.Del(outboundKeyFromMID(m.Details().MessageID))
case *packets.PublishPacket, *packets.PubrecPacket, *packets.PingrespPacket, *packets.ConnackPacket:
default:
ERROR.Println(STR, "Asked to persist an invalid messages type")
}
case 1:
switch m.(type) {
case *packets.PublishPacket, *packets.PubrelPacket:
// Received a publish. store it in ibound
// until puback sent
s.Put(inboundKeyFromMID(m.Details().MessageID), m)
default:
ERROR.Println(STR, "Asked to persist an invalid messages type")
}
case 2:
switch m.(type) {
case *packets.PublishPacket:
// Received a publish. store it in ibound
// until pubrel received
s.Put(inboundKeyFromMID(m.Details().MessageID), m)
default:
ERROR.Println(STR, "Asked to persist an invalid messages type")
}
}
}