-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathticker.go
35 lines (33 loc) · 1 KB
/
ticker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package rx
import "time"
// Ticker creates an ObservableTime that emits a sequence of timestamps after
// an initialDelay has passed. Subsequent timestamps are emitted using a
// schedule of intervals passed in. If only the initialDelay is given, Ticker
// will emit only once.
func Ticker(initialDelay time.Duration, intervals ...time.Duration) Observable[time.Time] {
observable := func(observe Observer[time.Time], scheduler Scheduler, subscriber Subscriber) {
i := 0
runner := scheduler.ScheduleFutureRecursive(initialDelay, func(again func(time.Duration)) {
if subscriber.Subscribed() {
if i == 0 || (i > 0 && len(intervals) > 0) {
observe(scheduler.Now(), nil, false)
}
if subscriber.Subscribed() {
if len(intervals) > 0 {
again(intervals[i%len(intervals)])
} else {
if i == 0 {
again(0)
} else {
var zero time.Time
observe(zero, nil, true)
}
}
}
i++
}
})
subscriber.OnUnsubscribe(runner.Cancel)
}
return observable
}