-
Notifications
You must be signed in to change notification settings - Fork 11
/
interceptors.go
207 lines (175 loc) · 6.89 KB
/
interceptors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package tg
import (
"context"
"errors"
"math"
"math/rand"
"net/http"
"time"
)
type InterceptorInvoker func(ctx context.Context, req *Request, dst any) error
// Interceptor is a function that intercepts request and response.
type Interceptor func(ctx context.Context, req *Request, dst any, invoker InterceptorInvoker) error
type interceptorRetryFloodErrorOpts struct {
tries int
maxRetryAfter time.Duration
timeAfter func(time.Duration) <-chan time.Time
}
// InterceptorRetryFloodErrorOption is an option for NewRetryFloodErrorInterceptor.
type InterceptorRetryFloodErrorOption func(*interceptorRetryFloodErrorOpts)
// WithInterceptorRetryFloodErrorTries sets the number of tries.
func WithInterceptorRetryFloodErrorTries(tries int) InterceptorRetryFloodErrorOption {
return func(o *interceptorRetryFloodErrorOpts) {
o.tries = tries
}
}
// WithInterceptorRetryFloodErrorMaxRetryAfter sets the maximum retry after duration.
func WithInterceptorRetryFloodErrorMaxRetryAfter(maxRetryAfter time.Duration) InterceptorRetryFloodErrorOption {
return func(o *interceptorRetryFloodErrorOpts) {
o.maxRetryAfter = maxRetryAfter
}
}
// WithInterceptorRetryFloodErrorTimeAfter sets the time.After function.
func WithInterceptorRetryFloodErrorTimeAfter(timeAfter func(time.Duration) <-chan time.Time) InterceptorRetryFloodErrorOption {
return func(o *interceptorRetryFloodErrorOpts) {
o.timeAfter = timeAfter
}
}
// NewInterceptorRetryFloodError returns a new interceptor that retries the request if the error is flood error.
// With that interceptor, calling of method that hit limit will be look like it will look like the request just takes unusually long.
// Under the hood, multiple HTTP requests are being performed, with the appropriate delays in between.
//
// Default tries is 3, maxRetryAfter is 1 hour, timeAfter is time.After.
// The interceptor will retry the request if the error is flood error with RetryAfter less than maxRetryAfter.
// The interceptor will wait for RetryAfter duration before retrying the request.
// The interceptor will retry the request for tries times.
func NewInterceptorRetryFloodError(opts ...InterceptorRetryFloodErrorOption) Interceptor {
options := interceptorRetryFloodErrorOpts{
tries: 3,
maxRetryAfter: time.Hour,
timeAfter: time.After,
}
for _, o := range opts {
o(&options)
}
return func(ctx context.Context, req *Request, dst any, invoker InterceptorInvoker) error {
var err error
LOOP:
for i := 0; i < options.tries; i++ {
err = invoker(ctx, req, dst)
if err == nil {
return nil
}
var tgErr *Error
if errors.As(err, &tgErr) && tgErr.Code == http.StatusTooManyRequests && tgErr.Parameters != nil {
if tgErr.Parameters.RetryAfterDuration() > options.maxRetryAfter {
return err
}
select {
case <-options.timeAfter(tgErr.Parameters.RetryAfterDuration()):
continue LOOP
case <-ctx.Done():
return ctx.Err()
}
}
break
}
return err
}
}
type interceptorRetryInternalServerErrorOpts struct {
tries int
delay time.Duration
timeAfter func(time.Duration) <-chan time.Time
}
// RetryInternalServerErrorOption is an option for NewRetryInternalServerErrorInterceptor.
type RetryInternalServerErrorOption func(*interceptorRetryInternalServerErrorOpts)
// WithInterceptorRetryInternalServerErrorTries sets the number of tries.
func WithInterceptorRetryInternalServerErrorTries(tries int) RetryInternalServerErrorOption {
return func(o *interceptorRetryInternalServerErrorOpts) {
o.tries = tries
}
}
// WithInterceptorRetryInternalServerErrorDelay sets the delay between tries.
// The delay calculated as delay * 2^i + random jitter, where i is the number of tries.
func WithInterceptorRetryInternalServerErrorDelay(delay time.Duration) RetryInternalServerErrorOption {
return func(o *interceptorRetryInternalServerErrorOpts) {
o.delay = delay
}
}
// WithInterceptorRetryInternalServerErrorTimeAfter sets the time.After function.
func WithInterceptorRetryInternalServerErrorTimeAfter(timeAfter func(time.Duration) <-chan time.Time) RetryInternalServerErrorOption {
return func(o *interceptorRetryInternalServerErrorOpts) {
o.timeAfter = timeAfter
}
}
// NewInterceptorRetryInternalServerError returns a new interceptor that retries the request if the error is internal server error.
//
// With that interceptor, calling of method that hit limit will be look like it will look like the request just takes unusually long.
// Under the hood, multiple HTTP requests are being performed, with the appropriate delays in between.
//
// Default tries is 10, delay is 100ms, timeAfter is time.After.
// The interceptor will retry the request if the error is internal server error.
// The interceptor will wait for delay * 2^i + random jitter before retrying the request, where i is the number of tries.
// The interceptor will retry the request for ten times.
func NewInterceptorRetryInternalServerError(opts ...RetryInternalServerErrorOption) Interceptor {
options := &interceptorRetryInternalServerErrorOpts{
tries: 10,
delay: time.Millisecond * 100,
timeAfter: time.After,
}
for _, o := range opts {
o(options)
}
return func(ctx context.Context, req *Request, dst any, invoker InterceptorInvoker) error {
var err error
LOOP:
for i := 0; i < options.tries; i++ {
err = invoker(ctx, req, dst)
if err == nil {
return nil
}
var tgErr *Error
if errors.As(err, &tgErr) && tgErr.Code == http.StatusInternalServerError {
// do backoff delay
backoffDelay := options.delay * time.Duration(math.Pow(2, float64(i)))
jitter := time.Duration(rand.Int63n(int64(backoffDelay)))
select {
case <-options.timeAfter(backoffDelay + jitter):
continue LOOP
case <-ctx.Done():
return ctx.Err()
}
}
break
}
return err
}
}
// NewInterceptorDefaultParseMethod returns a new interceptor that sets the parse_method to the request if it is empty.
// Use in combination with NewInterceptorMethodFilter to filter and specify only needed methods.
// Like:
//
// NewInterceptorMethodFilter(NewInterceptorDefaultParseMethod(tg.HTML), "sendMessage", "editMessageText")
func NewInterceptorDefaultParseMethod(pm ParseMode) Interceptor {
return func(ctx context.Context, req *Request, dst any, invoker InterceptorInvoker) error {
if !req.Has("parse_mode") {
req.Stringer("parse_mode", pm)
}
return invoker(ctx, req, dst)
}
}
// ТewInterceptorMethodFilter returns a new filtering interceptor
// that calls the interceptor only for specified methods.
func NewInterceptorMethodFilter(interceptor Interceptor, methods ...string) Interceptor {
methodMap := make(map[string]struct{}, len(methods))
for _, method := range methods {
methodMap[method] = struct{}{}
}
return func(ctx context.Context, req *Request, dst any, invoker InterceptorInvoker) error {
if _, ok := methodMap[req.Method]; ok {
return interceptor(ctx, req, dst, invoker)
}
return invoker(ctx, req, dst)
}
}