From 845fb175e9fecedf8cb25f1f48487e35adfae109 Mon Sep 17 00:00:00 2001 From: zenghongru Date: Tue, 9 Jun 2020 14:58:16 +0800 Subject: [PATCH] refactor: long pull --- delete_config.go | 2 +- diamond.go | 77 +++++++++++++++++--------- get_all_config_by_tenant.go | 2 +- get_config.go | 2 +- header.go | 9 ++- hook.go | 6 +- long_pull.go | 107 +++++++++++++++++++----------------- publish_config.go | 2 +- 8 files changed, 120 insertions(+), 87 deletions(-) diff --git a/delete_config.go b/delete_config.go index c52ea0a..ddcac29 100644 --- a/delete_config.go +++ b/delete_config.go @@ -25,7 +25,7 @@ func (d *Diamond) DeleteConfig(args *DeleteConfigRequest) error { header := make(http.Header) - if err := d.withUsual(args.Tenant, args.Group)(header); err != nil { + if err := d.withSignature(args.Tenant, args.Group)(header); err != nil { return err } diff --git a/diamond.go b/diamond.go index e9aa428..33fb519 100644 --- a/diamond.go +++ b/diamond.go @@ -163,9 +163,8 @@ func (d *Diamond) Register(oo ...*observer.Observer) { DataID: i.DataID, } b, err := d.GetConfig(req) - d.hang(i) if err != nil { - d.checkErr(i, err) + d.checkErr(fmt.Errorf("DataID: %s, Group: %s, err: %+v", i.DataID, i.Group, err)) continue } conf = &config.Config{ @@ -187,6 +186,7 @@ func (d *Diamond) Register(oo ...*observer.Observer) { // NotifyAll is called after Register. func (d *Diamond) NotifyAll() { + d.hang() oo := make([]*observer.Observer, 0) for o := range d.filter { oo = append(oo, o) @@ -209,34 +209,59 @@ func (d *Diamond) notify(oo ...*observer.Observer) { } } -func (d *Diamond) hang(i info.Info) { +func (d *Diamond) hang() { go func() { for { - // 不要在同一时间启动long pull - time.Sleep(time.Duration(randomIntInRange(20, 100)) * time.Millisecond) - - content, newContentMD5, err := d.LongPull(i, d.all[i].ContentMD5) - d.checkErr(i, err) - - // 防止MD5被重置,重新请求 - if newContentMD5 == "" { - time.Sleep(time.Duration(randomIntInRange(1000, 1500)) * time.Millisecond) - continue - } - conf := &config.Config{ - Content: content, - ContentMD5: newContentMD5, - Pulled: true, + var infoParams []InfoParam + for i, conf := range d.all { + infoParams = append(infoParams, InfoParam{ + i, + conf.ContentMD5, + }) } - d.all[i] = conf - oo, ok := d.infoColl[i] - if !ok { + ret, err := d.LongPull(infoParams...) + if err != nil { + d.checkErr(fmt.Errorf("acm long pull failed: %+v", err)) + time.Sleep(time.Duration(randomIntInRange(1000, 1500)) * time.Millisecond) continue } - for _, o := range oo { - o.HotUpdateInfo(i, conf) + for _, i := range ret { + j := curlew.NewJob() + j.Arg = i + j.Fn = func(ctx context.Context, arg interface{}) error { + i, ok := arg.(info.Info) + if !ok { + return nil + } + req := &GetConfigRequest{ + Tenant: d.option.tenant, + DataID: i.DataID, + Group: i.Group, + } + b, err := d.GetConfig(req) + if err != nil { + d.checkErr(fmt.Errorf("DataID: %s, Group: %s, err: %+v", i.DataID, i.Group, err)) + return nil + } + newContentMD5 := Md5(string(b)) + conf := &config.Config{ + Content: b, + ContentMD5: newContentMD5, + Pulled: true, + } + d.all[i] = conf + oo, ok := d.infoColl[i] + if !ok { + return nil + } + for _, o := range oo { + o.HotUpdateInfo(i, conf) + } + d.notify(oo...) + return nil + } + d.dispatcher.Submit(j) } - d.notify(oo...) } }() } @@ -246,14 +271,14 @@ func (d *Diamond) SetHook(h Hook) { d.errHook = h } -func (d *Diamond) checkErr(i info.Info, err error) { +func (d *Diamond) checkErr(err error) { if shouldIgnore(err) { return } if d.errHook == nil { return } - d.errHook(i, err) + d.errHook(err) } // LoadingProgress shows the current load progress roughly. diff --git a/get_all_config_by_tenant.go b/get_all_config_by_tenant.go index d942e29..77dd4be 100644 --- a/get_all_config_by_tenant.go +++ b/get_all_config_by_tenant.go @@ -35,7 +35,7 @@ func (d *Diamond) GetAllConfigByTenant(args *GetAllConfigByTenantRequest) (*GetA return nil, err } header := make(http.Header) - if err := d.withUsual(args.Tenant, "")(header); err != nil { + if err := d.withSignature(args.Tenant, "")(header); err != nil { return nil, err } request := d.c.NewRequest(). diff --git a/get_config.go b/get_config.go index 6d25a33..3bf3f36 100644 --- a/get_config.go +++ b/get_config.go @@ -26,7 +26,7 @@ func (d *Diamond) GetConfig(args *GetConfigRequest) ([]byte, error) { return nil, err } header := make(http.Header) - if err := d.withUsual(args.Tenant, args.Group)(header); err != nil { + if err := d.withSignature(args.Tenant, args.Group)(header); err != nil { return nil, err } request := d.c.NewRequest(). diff --git a/header.go b/header.go index 5c609aa..d240c57 100644 --- a/header.go +++ b/header.go @@ -25,14 +25,19 @@ func (d *Diamond) withLongPollingTimeout() headerSetter { } } -func (d *Diamond) withUsual(tenant, group string) headerSetter { +func (d *Diamond) withSignature(tenant, group string) headerSetter { return func(header http.Header) error { if header == nil { header = make(http.Header) } now := timeInMilli() var toSignList []string - toSignList = append(toSignList, tenant, group, strconv.FormatInt(now, 10)) + toSignList = append(toSignList, tenant) + if group != "" { + toSignList = append(toSignList, group) + } + toSignList = append(toSignList, strconv.FormatInt(now, 10)) + str := strings.Join(toSignList, "+") signature, err := HMACSHA1Encrypt(str, d.option.secretKey) if err != nil { diff --git a/hook.go b/hook.go index 46ee746..cbce9fc 100644 --- a/hook.go +++ b/hook.go @@ -1,6 +1,4 @@ package aliacm -import "github.com/xiaojiaoyu100/aliyun-acm/v2/info" - -// Hook 提供了长轮询失败发生的回调 -type Hook func(i info.Info, err error) +// Hook 重要错误 +type Hook func(err error) diff --git a/long_pull.go b/long_pull.go index 7c13fd9..5dc33a1 100644 --- a/long_pull.go +++ b/long_pull.go @@ -15,64 +15,69 @@ const ( lineSeparator = string(rune(1)) ) +// InfoParam long pull 参数 +type InfoParam struct { + info.Info + ContentMD5 string +} + // LongPull 监听配置 -func (d *Diamond) LongPull(info info.Info, contentMD5 string) ([]byte, string, error) { +func (d *Diamond) LongPull(infoParams ...InfoParam) ([]info.Info, error) { ip, err := d.QueryIP() if err != nil { - return nil, "", err + return nil, err } - switch contentMD5 { - case "": - args := new(GetConfigRequest) - args.Tenant = d.option.tenant - args.Group = info.Group - args.DataID = info.DataID - content, err := d.GetConfig(args) - if err != nil { - return nil, "", err - } - contentMD5 = Md5(string(content)) - return content, contentMD5, nil - default: - headerSetters := []headerSetter{ - d.withLongPollingTimeout(), - d.withUsual(d.option.tenant, info.Group), - } - header := make(http.Header) - for _, setter := range headerSetters { - if err := setter(header); err != nil { - return nil, "", err - } - } - var longPollRequest struct { - ProbeModifyRequest string `url:"Probe-Modify-Request"` + + var longPollRequest struct { + ProbeModifyRequest string `url:"Probe-Modify-Request"` + } + for _, infoParam := range infoParams { + if longPollRequest.ProbeModifyRequest != "" { + longPollRequest.ProbeModifyRequest += lineSeparator } - longPollRequest.ProbeModifyRequest = strings.Join([]string{info.DataID, info.Group, contentMD5, d.option.tenant}, wordSeparator) + lineSeparator - request := d.c.NewRequest(). - WithPath(acmLongPull.String(ip)). - WithFormURLEncodedBody(longPollRequest). - WithHeader(header). - Post() - response, err := d.c.Do(context.TODO(), request) - if err != nil { - return nil, "", err + longPollRequest.ProbeModifyRequest += strings.Join([]string{infoParam.DataID, infoParam.Group, infoParam.ContentMD5, d.option.tenant}, wordSeparator) + } + headerSetters := []headerSetter{ + d.withLongPollingTimeout(), + d.withSignature(d.option.tenant, ""), + } + header := make(http.Header) + for _, setter := range headerSetters { + if err := setter(header); err != nil { + return nil, err } - if !response.Success() { - return nil, "", errors.New(response.String()) + } + + request := d.c.NewRequest(). + WithPath(acmLongPull.String(ip)). + WithFormURLEncodedBody(longPollRequest). + WithHeader(header). + Post() + response, err := d.c.Do(context.TODO(), request) + if err != nil { + return nil, err + } + if !response.Success() { + return nil, errors.New(response.String()) + } + var ret []info.Info + + s, err := url.QueryUnescape(response.String()) + if err != nil { + return nil, errors.New("unescape fail") + } + + ss := strings.Split(s, lineSeparator) + for _, s := range ss { + tt := strings.Split(s, wordSeparator) + if len(tt) != 3 { + continue } - ret := url.QueryEscape(strings.Join([]string{info.DataID, info.Group, d.option.tenant}, wordSeparator) + lineSeparator) - if ret == strings.TrimSpace(response.String()) { - args := new(GetConfigRequest) - args.Tenant = d.option.tenant - args.Group = info.Group - args.DataID = info.DataID - content, err := d.GetConfig(args) - if err != nil { - return nil, "", err - } - contentMD5 := Md5(string(content)) - return content, contentMD5, nil + i := info.Info{ + DataID: tt[0], + Group: tt[1], } - return nil, "", noChangeErr + ret = append(ret, i) } + return ret, nil } diff --git a/publish_config.go b/publish_config.go index abb5f35..9172ff3 100644 --- a/publish_config.go +++ b/publish_config.go @@ -24,7 +24,7 @@ func (d *Diamond) PublishConfig(args *PublishConfigRequest) error { return err } header := make(http.Header) - if err := d.withUsual(args.Tenant, args.Group)(header); err != nil { + if err := d.withSignature(args.Tenant, args.Group)(header); err != nil { return err } request := d.c.NewRequest().