diff --git a/README.md b/README.md index 0e3bf48..5ccd9e8 100644 --- a/README.md +++ b/README.md @@ -9,34 +9,54 @@ package main import ( "fmt" - "github.com/xiaojiaoyu100/aliyun-acm" + aliacm "github.com/xiaojiaoyu100/aliyun-acm/v2" + "github.com/xiaojiaoyu100/aliyun-acm/v2/config" + "github.com/xiaojiaoyu100/aliyun-acm/v2/info" + "github.com/xiaojiaoyu100/aliyun-acm/v2/observer" ) - -func Handle(config aliacm.Config) { - fmt.Println(string(config.Content)) +func handle(coll map[info.Info]*config.Config) { + for _, o := range coll { + fmt.Println(string(o.Content)) + } } func main() { d, err := aliacm.New( - "your_addr", - "your_tenant", - "your_access_key", - "your_secret_key") + addr, + tenant, + accessKey, + secretKey) + if err != nil { + fmt.Println(err) + return + } + + o1, err := observer.New( + observer.WithInfo( + info.Info{Group: "YourGroup", DataID: "YourDataID"}, + ), + observer.WithHandler(handle)) if err != nil { return } - var f = func(h aliacm.Unit, err error) { + o2, err := observer.New( + observer.WithInfo( + info.Info{Group: "YourGroup", DataID: "YourDataID"}, + info.Info{Group: "YourAnotherGroup", DataID: "YourAnotherDataID"}, + ), + observer.WithHandler(handle)) + if err != nil { + return + } + + var f = func(h info.Info, err error) { fmt.Println(err) } d.SetHook(f) - unit := aliacm.Unit{ - Group: "your_group", - DataID: "your_data_id", - FetchOnce: true, // 有且仅拉取一次 - OnChange: Handle, - } - d.Add(unit) + + d.Register(o1, o2) + select{} } -``` +``` \ No newline at end of file diff --git a/config/config.go b/config/config.go index 0fba344..4ec0977 100644 --- a/config/config.go +++ b/config/config.go @@ -2,7 +2,7 @@ package config // Config 返回配置 type Config struct { - Content []byte + Content []byte ContentMD5 string - Pulled bool + Pulled bool } diff --git a/diamond.go b/diamond.go index ed86f83..6cbca5f 100644 --- a/diamond.go +++ b/diamond.go @@ -1,6 +1,7 @@ package aliacm import ( + "fmt" "github.com/xiaojiaoyu100/aliyun-acm/v2/config" "github.com/xiaojiaoyu100/aliyun-acm/v2/info" "github.com/xiaojiaoyu100/aliyun-acm/v2/observer" @@ -8,8 +9,8 @@ import ( "math/rand" "time" + "context" "github.com/sirupsen/logrus" - "github.com/xiaojiaoyu100/cast" ) @@ -40,13 +41,6 @@ const ( ShanghaiFinance1Addr = "addr-cn-shanghai-finance-1-internal.edas.aliyun.com" ) -// Unit 配置基本单位 -type Unit struct { - Config - Group string - DataID string -} - // Option 参数设置 type Option struct { addr string @@ -55,17 +49,16 @@ type Option struct { secretKey string } - - // Diamond 提供了操作阿里云ACM的能力 type Diamond struct { - option Option - c *cast.Cast - errHook Hook - r *rand.Rand - oo []*observer.Observer - all map[info.Info]*config.Config - c curlew.Worker + option Option + c *cast.Cast + errHook Hook + r *rand.Rand + oo []*observer.Observer + filter map[*observer.Observer]struct{} + all map[info.Info]*config.Config + dispatcher *curlew.Dispatcher } // New 产生Diamond实例 @@ -91,11 +84,24 @@ func New(addr, tenant, accessKey, secretKey string, setters ...Setter) (*Diamond s := rand.NewSource(time.Now().UnixNano()) r := rand.New(s) + var monitor = func(err error) { + + } + + dispatcher, err := curlew.New( + curlew.WithMaxWorkerNum(100), + curlew.WithMonitor(monitor)) + if err != nil { + return nil, err + } + d := &Diamond{ - option: option, - c: c, - r: r, - all: make(map[info.Info]*config.Config), + option: option, + c: c, + r: r, + filter: make(map[*observer.Observer]struct{}), + all: make(map[info.Info]*config.Config), + dispatcher: dispatcher, } for _, setter := range setters { @@ -111,37 +117,59 @@ func randomIntInRange(min, max int) int { return rand.Intn(max-min) + min } -func (d *Diamond) Register(oo ...*observer.Observer) { - d.oo = append(d.oo, oo...) +// Register registers an observer list. +func (d *Diamond) Register(oo ...*observer.Observer) int64 { + start := time.Now() + for _, o := range oo { + _, ok := d.filter[o] + if ok { + continue + } + d.filter[o] = struct{}{} + d.oo = append(d.oo, o) + } for _, o := range oo { for _, i := range o.Info() { - d.all[i] = nil + d.all[i] = &config.Config{} } } - for i, _ := range d.all { + for i := range d.all { req := &GetConfigRequest{ Tenant: d.option.tenant, - Group: i.Group, + Group: i.Group, DataID: i.DataID, } b, err := d.GetConfig(req) d.hang(i) if err != nil { + d.checkErr(i, err) continue } - d.all[i] = &config.Config{ - Content: b, + conf := &config.Config{ + Content: b, ContentMD5: Md5(string(b)), - Pulled: true, + Pulled: true, + } + d.all[i] = conf + for _, o := range d.oo { + o.UpdateInfo(i, conf) } } - d.trigger() + d.notify() + return time.Now().Sub(start).Milliseconds() } -func (d *Diamond) trigger() { - for _, o := range d.oo { +func (d *Diamond) notify() { + for _, o := range d.oo { if o.Ready() { - o.Handle() + j := curlew.NewJob() + j.Arg = o + j.Fn = func(ctx context.Context, arg interface{}) error { + o := arg.(*observer.Observer) + o.Handle() + return nil + } + d.dispatcher.Submit(j) } } } @@ -149,16 +177,26 @@ func (d *Diamond) trigger() { func (d *Diamond) hang(i info.Info) { go func() { for { + // 防止网络差的时候没挂住 time.Sleep(time.Duration(randomIntInRange(20, 100)) * time.Millisecond) + content, newContentMD5, err := d.LongPull(i, d.all[i].ContentMD5) d.checkErr(i, err) - // 防止网络较差情景下MD5被重置,重新请求配置,造成阿里云限流 - if newContentMD5 != "" { - d.all[i].Content = content - d.all[i].ContentMD5 = newContentMD5 - d.all[i].Pulled = true - d.trigger() + + // 防止MD5被重置,重新请求 + if newContentMD5 == "" { + continue } + conf := &config.Config{ + Content: content, + ContentMD5: newContentMD5, + Pulled: true, + } + d.all[i] = conf + for _, o := range d.oo { + o.UpdateInfo(i, conf) + } + d.notify() } }() } @@ -169,7 +207,7 @@ func (d *Diamond) SetHook(h Hook) { } func (d *Diamond) checkErr(i info.Info, err error) { - if err == nil { + if shouldIgnore(err) { return } if d.errHook == nil { @@ -177,3 +215,14 @@ func (d *Diamond) checkErr(i info.Info, err error) { } d.errHook(i, err) } + +// LoadingProgress shows the current load progress roughly. +func (d *Diamond) LoadingProgress() string { + var ret = 0 + for _, conf := range d.all { + if conf.Pulled { + ret++ + } + } + return fmt.Sprintf("%d/%d", ret, len(d.all)) +} diff --git a/error.go b/error.go index ffeaa9f..fe46ee3 100644 --- a/error.go +++ b/error.go @@ -1,6 +1,8 @@ package aliacm -import "context" +import ( + "errors" +) // Error ACM错误 type Error string @@ -10,15 +12,13 @@ func (e Error) Error() string { } const ( - serviceUnavailableErr = Error("ServiceUnavailable") - internalServerErr = Error("InternalServerError") + noChangeErr = Error("NoChangeError") ) // ShouldIgnore 忽略一些不想关心的错误 -func ShouldIgnore(err error) bool { - if err == serviceUnavailableErr || - err == internalServerErr || - err == context.Canceled { +func shouldIgnore(err error) bool { + if err == nil || + errors.Is(err, noChangeErr) { return true } return false diff --git a/handler.go b/handler.go deleted file mode 100644 index a56e80d..0000000 --- a/handler.go +++ /dev/null @@ -1,4 +0,0 @@ -package aliacm - -// Handler 提供了配置的回调, -type Handler func(config Config) diff --git a/info/info.go b/info/info.go index 6497b15..f8f0c4d 100644 --- a/info/info.go +++ b/info/info.go @@ -1,6 +1,6 @@ package info type Info struct { - Group string + Group string DataID string } diff --git a/long_pull.go b/long_pull.go index 6f370b2..eeaa046 100644 --- a/long_pull.go +++ b/long_pull.go @@ -56,12 +56,6 @@ func (d *Diamond) LongPull(info info.Info, contentMD5 string) ([]byte, string, e if err != nil { return nil, "", err } - switch response.StatusCode() { - case http.StatusServiceUnavailable: - return nil, "", serviceUnavailableErr - case http.StatusInternalServerError: - return nil, "", internalServerErr - } if !response.Success() { return nil, "", errors.New(response.String()) } @@ -78,6 +72,6 @@ func (d *Diamond) LongPull(info info.Info, contentMD5 string) ([]byte, string, e contentMD5 := Md5(string(content)) return content, contentMD5, nil } - return nil, "", errors.New("long pull unexpected error") + return nil, "", noChangeErr } } diff --git a/observer/observer.go b/observer/observer.go index 9f376c4..4f124ec 100644 --- a/observer/observer.go +++ b/observer/observer.go @@ -6,8 +6,9 @@ import ( ) type Observer struct { - coll map[info.Info]*config.Config - h Handler + coll map[info.Info]*config.Config + h Handler + consumed bool } func New(ss ...Setting) (*Observer, error) { @@ -34,13 +35,21 @@ func (o *Observer) Ready() bool { func (o *Observer) Info() []info.Info { var ii []info.Info - for i, _ := range o.coll { + for i := range o.coll { ii = append(ii, i) } return ii } func (o *Observer) Handle() { + if o.consumed { + return + } + o.consumed = true o.h(o.coll) } +func (o *Observer) UpdateInfo(i info.Info, conf *config.Config) { + o.consumed = false + o.coll[i] = conf +}