-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathxconf_remote.go
88 lines (79 loc) · 2.4 KB
/
xconf_remote.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package xconf
import (
"context"
"fmt"
"path/filepath"
"github.com/sandwich-go/xconf/kv"
"github.com/sandwich-go/xconf/xutil"
)
type kvLoader struct {
kv.Loader
confPath string
}
// OnFieldUpdated 字段发生变化方法签名
type OnFieldUpdated func(fieldPath string, from, to interface{})
// WatchFieldPath 关注特定的字段变化
func (x *XConf) WatchFieldPath(fieldPath string, changed OnFieldUpdated) {
if _, ok := x.fieldPathInfoMap[fieldPath]; !ok {
panic(fmt.Sprintf("field path:%s not found,valid ones:%v", fieldPath, x.keysList()))
}
x.mapOnFieldUpdated[fieldPath] = changed
}
// WatchUpdate confPath不会自动绑定env value,如果需要watch的路径与环境变量相关,先通过ParseEnvValue自行解析替换处理错误
func (x *XConf) WatchUpdate(confPath string, loader kv.Loader) {
k := &kvLoader{
confPath: confPath,
Loader: loader,
}
x.kvs = append(x.kvs, k)
if ow, ok := loader.(interface {
CheckOnWatchError(watchError kv.WatchError)
}); ok {
ow.CheckOnWatchError(func(name string, confPath string, err error) {
x.cc.LogWarning(fmt.Sprintf("name:%s confPath:%s watch got error:%s", name, confPath, err))
})
}
// 需要Loader自行维护异步逻辑
k.Watch(context.TODO(), k.confPath, x.onContentChanged)
}
func (x *XConf) notifyChanged() error {
latest, err := x.Latest()
if err != nil {
return err
}
select {
case <-x.updated:
default:
}
x.updated <- latest
// 自动更新
x.atomicSetFunc(latest)
for k, v := range x.changes.changed {
notify, ok := x.mapOnFieldUpdated[k]
if !ok {
continue
}
notify(v.fieldPath, v.from, v.to)
}
x.changes.changed = make(map[string]*fieldValues)
return nil
}
func (x *XConf) onContentChanged(name string, confPath string, content []byte) (err error) {
x.cc.LogDebug(fmt.Sprintf("got update:%s", confPath))
defer func() {
if reason := recover(); reason == nil {
x.cc.LogWarning(fmt.Sprintf("onContentChanged with name:%s path:%s succ.", name, confPath))
} else {
x.cc.LogWarning(fmt.Sprintf("onContentChanged with name:%s path:%s reason:%v content:%s", name, confPath, reason, string(content)))
}
}()
unmarshal := GetDecodeFunc(filepath.Ext(confPath))
data := make(map[string]interface{})
err = unmarshal(content, data)
xutil.PanicErrWithWrap(err, "unmarshal_error(%v) ", err)
xutil.PanicErr(x.commonUpdateAndNotify(func() error {
err = x.mergeToDest(confPath, data)
return err
}))
return
}