-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfastly.go
148 lines (121 loc) · 3.31 KB
/
fastly.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package fastlystats
import (
"context"
"errors"
"math"
"reflect"
"time"
"github.com/fastly/go-fastly/v3/fastly"
"go.uber.org/zap"
)
// Must not be shorter than the quota for creating time series.
// See https://cloud.google.com/monitoring/quotas
// At the time of writing this is 1 point per 10 seconds, so must be more than 10 seconds
//
// The shorter this is, the higher resolution there will be on datapoints, but
// also increases resource usage and billing
const pollInterval = 15 * time.Second
type FastlyMeanStats struct {
IntervalStart uint64
IntervalEnd uint64
Stats *fastly.Stats
}
type FastlyStatsProvider struct {
fastlyClient *fastly.RTSClient
service string
ch chan<- *FastlyMeanStats
timestamp uint64
}
func NewFastlyStatsProvider(service, apiKey string, ch chan<- *FastlyMeanStats) (*FastlyStatsProvider, error) {
fastlyClient, err := fastly.NewRealtimeStatsClientForEndpoint(apiKey, fastly.DefaultRealtimeStatsEndpoint)
if err != nil {
return nil, err
}
return &FastlyStatsProvider{
fastlyClient: fastlyClient,
service: service,
ch: ch,
}, nil
}
func (f *FastlyStatsProvider) Run(ctx context.Context) {
ll := zap.S()
ll.Infof("starting fastly stats provider")
for {
start := time.Now()
if err := f.next(ctx, f.ch); err != nil && !errors.Is(err, context.Canceled) {
ll.Warnf("failed to get stats, retrying in a while: %v", err)
}
dur := time.Since(start)
ll.Debugf("getting and reporting stats took %v - sleeping for %v", dur, pollInterval-dur)
select {
case <-time.After(pollInterval - dur):
case <-ctx.Done():
return
}
if ctx.Err() != nil {
return
}
}
}
func (f *FastlyStatsProvider) mean(list []*fastly.RealtimeData) *FastlyMeanStats {
stats := &fastly.Stats{}
n := uint64(len(list))
var min, max uint64 = math.MaxUint64, 0
refStats := reflect.ValueOf(stats)
for _, rtdata := range list {
vs := reflect.ValueOf(rtdata.Aggregated)
for i := 0; i < vs.Elem().NumField(); i++ {
sf := vs.Elem().Field(i)
df := refStats.Elem().Field(i)
switch sf.Kind() {
case reflect.Uint64:
df.SetUint(sf.Uint() + df.Uint())
case reflect.Float64:
df.SetFloat(sf.Float() + df.Float())
}
}
if rtdata.Recorded < min {
min = rtdata.Recorded
}
if rtdata.Recorded > max {
max = rtdata.Recorded
}
}
for i := 0; i < refStats.Elem().NumField(); i++ {
f := refStats.Elem().Field(i)
switch f.Kind() {
case reflect.Uint64:
f.SetUint(uint64(math.Round(float64(f.Uint()) / float64(n))))
case reflect.Float64:
f.SetFloat(f.Float() / float64(n))
}
}
// Hit Ratio is not set in RT API, build it synthetically
stats.HitRatio = float64(stats.Hits) / float64(stats.Hits+stats.Miss)
return &FastlyMeanStats{
IntervalStart: min,
IntervalEnd: max,
Stats: stats,
}
}
func (s *FastlyStatsProvider) next(ctx context.Context, ch chan<- *FastlyMeanStats) error {
ll := zap.S()
req := &fastly.GetRealtimeStatsInput{
ServiceID: s.service,
Timestamp: s.timestamp,
}
ll.Debugf("getting realtime stats")
resp, err := s.fastlyClient.GetRealtimeStats(req)
if err != nil {
return err
}
ll.Debugf("got %d seconds worth of value", len(resp.Data))
meanStats := s.mean(resp.Data)
s.timestamp = resp.Timestamp
select {
case ch <- meanStats:
case <-ctx.Done():
return ctx.Err()
}
return nil
}