-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathonce.go
71 lines (61 loc) · 1.39 KB
/
once.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package promise
import (
"context"
"sync"
)
// Once contains a function that is called concurrently once.
//
// The result is returned as a promise.
// If the function returns no error, the result is stored and memoized.
//
// Otherwise, future calls to the function will try again.
type Once[T comparable] struct {
cb func(ctx context.Context) (T, error)
mtx sync.Mutex
prom *Promise[T]
}
// NewOnce constructs a new Once caller.
func NewOnce[T comparable](cb func(ctx context.Context) (T, error)) *Once[T] {
return &Once[T]{cb: cb}
}
// Start attempts to start resolution returning the promise.
// Resolve attempts to resolve the value using the ctx.
func (o *Once[T]) Resolve(ctx context.Context) (T, error) {
for {
var empty T
if err := ctx.Err(); err != nil {
return empty, context.Canceled
}
o.mtx.Lock()
prom := o.prom
// start if not running
if prom == nil {
prom = NewPromise[T]()
o.prom = prom
go func() {
result, err := o.cb(ctx)
if err != nil {
o.mtx.Lock()
if o.prom == prom {
o.prom = nil
}
o.mtx.Unlock()
if ctx.Err() != nil {
prom.SetResult(empty, context.Canceled)
} else {
prom.SetResult(empty, err)
}
} else {
prom.SetResult(result, err)
}
}()
}
o.mtx.Unlock()
// await result
res, err := prom.Await(ctx)
if err == context.Canceled {
continue
}
return res, err
}
}