-
Notifications
You must be signed in to change notification settings - Fork 1
/
cache.go
99 lines (73 loc) · 1.56 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package dht
import (
"errors"
"hash/maphash"
"sync"
"time"
"github.com/purehyperbole/dht/protocol"
)
var (
// ErrRequestTimeout returned when a pending request has not recevied a response before the TTL period
ErrRequestTimeout = errors.New("request timeout")
)
// a pending request
type request struct {
callback func(event *protocol.Event, err error) bool
ttl time.Time
}
// cache tracks asynchronous event requests
type cache struct {
requests sync.Map
hasher sync.Pool
}
func newCache(refresh time.Duration) *cache {
seed := maphash.MakeSeed()
c := &cache{
hasher: sync.Pool{
New: func() any {
var hasher maphash.Hash
hasher.SetSeed(seed)
return &hasher
},
},
}
go c.cleanup(refresh)
return c
}
func (c *cache) set(key []byte, ttl time.Time, cb func(*protocol.Event, error) bool) {
r := &request{callback: cb, ttl: ttl}
h := c.hasher.Get().(*maphash.Hash)
h.Reset()
h.Write(key)
k := h.Sum64()
c.hasher.Put(h)
c.requests.Store(k, r)
}
func (c *cache) callback(key []byte, event *protocol.Event, err error) {
h := c.hasher.Get().(*maphash.Hash)
h.Reset()
h.Write(key)
k := h.Sum64()
c.hasher.Put(h)
r, ok := c.requests.Load(k)
if !ok {
return
}
if r.(*request).callback(event, err) {
c.requests.Delete(k)
}
}
func (c *cache) cleanup(refresh time.Duration) {
for {
time.Sleep(refresh)
now := time.Now()
c.requests.Range(func(key, value any) bool {
v := value.(*request)
if now.After(v.ttl) {
v.callback(nil, ErrRequestTimeout)
c.requests.Delete(key)
}
return true
})
}
}