-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvalue.go
79 lines (72 loc) · 1.76 KB
/
value.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package config
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"go.uber.org/zap"
)
type Value[T any] struct {
value T
}
type SubscribeOptions struct {
Default *string
}
type SubscribeOption func(*SubscribeOptions)
func OverrideDefault(value string) SubscribeOption {
return func(options *SubscribeOptions) {
options.Default = &value
}
}
func Subscribe[T any](watcher *Watcher, e *Value[T], opts ...SubscribeOption) (chan T, error) {
options := &SubscribeOptions{}
for _, o := range opts {
o(options)
}
ptrValue := reflect.ValueOf(e)
if ptrValue.Kind() != reflect.Pointer {
return nil, fmt.Errorf("the provided value must be a pointer")
}
var name string
ptrAddress := ptrValue.Pointer()
for fieldName, address := range watcher.fields {
if ptrAddress == reflect.ValueOf(address).Pointer() {
name = fieldName
break
}
}
if len(name) == 0 {
return nil, fmt.Errorf("the provided value is not a field of the config struct")
}
stringCh := make(chan string)
if !watcher.subscribe(name, stringCh, options.Default) {
return nil, fmt.Errorf("you are already subscribed to this config variable")
}
ch := make(chan T)
go func() {
defer close(ch)
for str := range stringCh {
switch any(e.value).(type) {
case int:
value, err := strconv.Atoi(str)
if err != nil {
watcher.logger.Error("failed to decode int value from etcd",
zap.String("value", str),
zap.String("name", name))
}
ch <- any(value).(T)
case string:
ch <- any(str).(T)
default:
var value T
if err := json.Unmarshal([]byte(str), &value); err != nil {
watcher.logger.Error("failed to decode json value from etcd",
zap.String("value", str),
zap.String("name", name))
}
ch <- value
}
}
}()
return ch, nil
}