-
Notifications
You must be signed in to change notification settings - Fork 1
/
bucket.go
176 lines (142 loc) · 3.32 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package dht
import (
"bytes"
"net"
"sync"
"time"
)
type bucket struct {
// the number of nodes in the bucket, excluding the promotion cache
size int
// the amout of time before a node is considered stale
expiry time.Duration
// nodes holds all active nodes on the network
nodes []*node
// cache holds all nodes that could be promoted to the bucket when
// other nodes expire
cache []*node
mu sync.Mutex
}
// inserts a node into the bucket. if the bucket
// is full, it will return false
func (b *bucket) insert(id []byte, address *net.UDPAddr) bool {
b.mu.Lock()
defer b.mu.Unlock()
// try to remove the node. If it exists in the bucket,
// then update it and add it to the end of the list
rn := b.remove(id, false)
if rn != nil {
rn.seen = time.Now()
b.nodes[b.size] = rn
b.size++
return true
}
n := &node{
id: id,
address: address,
}
// if the bucket is not full, add the new node to the end
if !b.full() {
n.seen = time.Now()
b.nodes[b.size] = n
b.size++
return true
}
var si int
var stale *node
now := time.Now()
// check for any stale entries
for i := 0; i < b.size; i++ {
en := b.nodes[i]
if now.After(n.seen.Add(b.expiry)) {
if stale == nil && en.pending > 1 {
stale = en
si = i
} else if stale != nil && en.pending > stale.pending {
stale = en
si = i
}
}
}
// delete the stalest entry
if stale != nil {
copy(b.nodes[si:], b.nodes[si+1:])
b.nodes[b.size] = n
return true
}
// if there's no space in the bucket, we add the node to the promotion cache
// so it can be added to the main node list when other nodes expire
b.stash(n)
return true
}
// gets a node by its id
func (b *bucket) get(nodeID []byte) *node {
// check the main routing bucket
for i := 0; i < b.size; i++ {
if bytes.Equal(b.nodes[i].id, nodeID) {
return b.nodes[i]
}
}
// check the promotion cache
for i := 0; i < len(b.cache); i++ {
if bytes.Equal(b.cache[i].id, nodeID) {
return b.cache[i]
}
}
return nil
}
// iterates over each node in the bucket
func (b *bucket) iterate(fn func(n *node)) {
b.mu.Lock()
for i := 0; i < b.size; i++ {
fn(b.nodes[i])
}
b.mu.Unlock()
}
// sets a node as recently seen by updating it's seen timestamp
// if it still exists in the bucket. this is called when a node has
// responded to a request
func (b *bucket) seen(nodeID []byte) bool {
b.mu.Lock()
defer b.mu.Unlock()
n := b.get(nodeID)
if n != nil {
// todo improve the safety of this
n.seen = time.Now()
return true
}
return false
}
// removes a node and returns it if it exists
func (b *bucket) remove(nodeID []byte, lock bool) *node {
if lock {
b.mu.Lock()
defer b.mu.Unlock()
}
for i := b.size - 1; i >= 0; i-- {
if bytes.Equal(b.nodes[i].id, nodeID) {
r := b.nodes[i]
copy(b.nodes[i:], b.nodes[i+1:])
b.size--
return r
}
}
// TODO : promote a node from the promotion cache
return nil
}
// stash stashes a node in the promotion cache
func (b *bucket) stash(n *node) {
for i := range b.cache {
if bytes.Equal(b.cache[i].id, n.id) {
b.cache[i].seen = time.Now()
return
}
}
// TODO : restrict the size of the cache and
// evict the oldest members of this cache before
// adding any new items. a circular buf would be ideal here
b.cache = append(b.cache, n)
}
func (b *bucket) full() bool {
return b.size == 20
}