-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathretrytime.go
40 lines (38 loc) · 984 Bytes
/
retrytime.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package rx
import (
"math"
"time"
)
func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T] {
if len(limit) == 0 || limit[0] <= 0 {
limit = []int{math.MaxInt}
}
return func(observe Observer[T], scheduler Scheduler, subscriber Subscriber) {
var retry struct {
observer Observer[T]
count int
subscriber Subscriber
resubscribe func()
}
retry.observer = func(next T, err error, done bool) {
switch {
case !done:
observe(next, nil, false)
retry.count = 0
case err != nil && backoff != nil && retry.count < limit[0]:
retry.subscriber.Unsubscribe()
scheduler.ScheduleFuture(backoff(retry.count), retry.resubscribe)
retry.count++
default:
observe(next, err, true)
}
}
retry.resubscribe = func() {
if subscriber.Subscribed() {
retry.subscriber = subscriber.Add()
observable(retry.observer, scheduler, retry.subscriber)
}
}
retry.resubscribe()
}
}