Skip to content

Commit

Permalink
refactor: long pull
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Jun 9, 2020
1 parent 3c72bfa commit 845fb17
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 87 deletions.
2 changes: 1 addition & 1 deletion delete_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (d *Diamond) DeleteConfig(args *DeleteConfigRequest) error {

header := make(http.Header)

if err := d.withUsual(args.Tenant, args.Group)(header); err != nil {
if err := d.withSignature(args.Tenant, args.Group)(header); err != nil {
return err
}

Expand Down
77 changes: 51 additions & 26 deletions diamond.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,8 @@ func (d *Diamond) Register(oo ...*observer.Observer) {
DataID: i.DataID,
}
b, err := d.GetConfig(req)
d.hang(i)
if err != nil {
d.checkErr(i, err)
d.checkErr(fmt.Errorf("DataID: %s, Group: %s, err: %+v", i.DataID, i.Group, err))
continue
}
conf = &config.Config{
Expand All @@ -187,6 +186,7 @@ func (d *Diamond) Register(oo ...*observer.Observer) {

// NotifyAll is called after Register.
func (d *Diamond) NotifyAll() {
d.hang()
oo := make([]*observer.Observer, 0)
for o := range d.filter {
oo = append(oo, o)
Expand All @@ -209,34 +209,59 @@ func (d *Diamond) notify(oo ...*observer.Observer) {
}
}

func (d *Diamond) hang(i info.Info) {
func (d *Diamond) hang() {
go func() {
for {
// 不要在同一时间启动long pull
time.Sleep(time.Duration(randomIntInRange(20, 100)) * time.Millisecond)

content, newContentMD5, err := d.LongPull(i, d.all[i].ContentMD5)
d.checkErr(i, err)

// 防止MD5被重置,重新请求
if newContentMD5 == "" {
time.Sleep(time.Duration(randomIntInRange(1000, 1500)) * time.Millisecond)
continue
}
conf := &config.Config{
Content: content,
ContentMD5: newContentMD5,
Pulled: true,
var infoParams []InfoParam
for i, conf := range d.all {
infoParams = append(infoParams, InfoParam{
i,
conf.ContentMD5,
})
}
d.all[i] = conf
oo, ok := d.infoColl[i]
if !ok {
ret, err := d.LongPull(infoParams...)
if err != nil {
d.checkErr(fmt.Errorf("acm long pull failed: %+v", err))
time.Sleep(time.Duration(randomIntInRange(1000, 1500)) * time.Millisecond)
continue
}
for _, o := range oo {
o.HotUpdateInfo(i, conf)
for _, i := range ret {
j := curlew.NewJob()
j.Arg = i
j.Fn = func(ctx context.Context, arg interface{}) error {
i, ok := arg.(info.Info)
if !ok {
return nil
}
req := &GetConfigRequest{
Tenant: d.option.tenant,
DataID: i.DataID,
Group: i.Group,
}
b, err := d.GetConfig(req)
if err != nil {
d.checkErr(fmt.Errorf("DataID: %s, Group: %s, err: %+v", i.DataID, i.Group, err))
return nil
}
newContentMD5 := Md5(string(b))
conf := &config.Config{
Content: b,
ContentMD5: newContentMD5,
Pulled: true,
}
d.all[i] = conf
oo, ok := d.infoColl[i]
if !ok {
return nil
}
for _, o := range oo {
o.HotUpdateInfo(i, conf)
}
d.notify(oo...)
return nil
}
d.dispatcher.Submit(j)
}
d.notify(oo...)
}
}()
}
Expand All @@ -246,14 +271,14 @@ func (d *Diamond) SetHook(h Hook) {
d.errHook = h
}

func (d *Diamond) checkErr(i info.Info, err error) {
func (d *Diamond) checkErr(err error) {
if shouldIgnore(err) {
return
}
if d.errHook == nil {
return
}
d.errHook(i, err)
d.errHook(err)
}

// LoadingProgress shows the current load progress roughly.
Expand Down
2 changes: 1 addition & 1 deletion get_all_config_by_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (d *Diamond) GetAllConfigByTenant(args *GetAllConfigByTenantRequest) (*GetA
return nil, err
}
header := make(http.Header)
if err := d.withUsual(args.Tenant, "")(header); err != nil {
if err := d.withSignature(args.Tenant, "")(header); err != nil {
return nil, err
}
request := d.c.NewRequest().
Expand Down
2 changes: 1 addition & 1 deletion get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (d *Diamond) GetConfig(args *GetConfigRequest) ([]byte, error) {
return nil, err
}
header := make(http.Header)
if err := d.withUsual(args.Tenant, args.Group)(header); err != nil {
if err := d.withSignature(args.Tenant, args.Group)(header); err != nil {
return nil, err
}
request := d.c.NewRequest().
Expand Down
9 changes: 7 additions & 2 deletions header.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ func (d *Diamond) withLongPollingTimeout() headerSetter {
}
}

func (d *Diamond) withUsual(tenant, group string) headerSetter {
func (d *Diamond) withSignature(tenant, group string) headerSetter {
return func(header http.Header) error {
if header == nil {
header = make(http.Header)
}
now := timeInMilli()
var toSignList []string
toSignList = append(toSignList, tenant, group, strconv.FormatInt(now, 10))
toSignList = append(toSignList, tenant)
if group != "" {
toSignList = append(toSignList, group)
}
toSignList = append(toSignList, strconv.FormatInt(now, 10))

str := strings.Join(toSignList, "+")
signature, err := HMACSHA1Encrypt(str, d.option.secretKey)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions hook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
package aliacm

import "github.com/xiaojiaoyu100/aliyun-acm/v2/info"

// Hook 提供了长轮询失败发生的回调
type Hook func(i info.Info, err error)
// Hook 重要错误
type Hook func(err error)
107 changes: 56 additions & 51 deletions long_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,69 @@ const (
lineSeparator = string(rune(1))
)

// InfoParam long pull 参数
type InfoParam struct {
info.Info
ContentMD5 string
}

// LongPull 监听配置
func (d *Diamond) LongPull(info info.Info, contentMD5 string) ([]byte, string, error) {
func (d *Diamond) LongPull(infoParams ...InfoParam) ([]info.Info, error) {
ip, err := d.QueryIP()
if err != nil {
return nil, "", err
return nil, err
}
switch contentMD5 {
case "":
args := new(GetConfigRequest)
args.Tenant = d.option.tenant
args.Group = info.Group
args.DataID = info.DataID
content, err := d.GetConfig(args)
if err != nil {
return nil, "", err
}
contentMD5 = Md5(string(content))
return content, contentMD5, nil
default:
headerSetters := []headerSetter{
d.withLongPollingTimeout(),
d.withUsual(d.option.tenant, info.Group),
}
header := make(http.Header)
for _, setter := range headerSetters {
if err := setter(header); err != nil {
return nil, "", err
}
}
var longPollRequest struct {
ProbeModifyRequest string `url:"Probe-Modify-Request"`

var longPollRequest struct {
ProbeModifyRequest string `url:"Probe-Modify-Request"`
}
for _, infoParam := range infoParams {
if longPollRequest.ProbeModifyRequest != "" {
longPollRequest.ProbeModifyRequest += lineSeparator
}
longPollRequest.ProbeModifyRequest = strings.Join([]string{info.DataID, info.Group, contentMD5, d.option.tenant}, wordSeparator) + lineSeparator
request := d.c.NewRequest().
WithPath(acmLongPull.String(ip)).
WithFormURLEncodedBody(longPollRequest).
WithHeader(header).
Post()
response, err := d.c.Do(context.TODO(), request)
if err != nil {
return nil, "", err
longPollRequest.ProbeModifyRequest += strings.Join([]string{infoParam.DataID, infoParam.Group, infoParam.ContentMD5, d.option.tenant}, wordSeparator)
}
headerSetters := []headerSetter{
d.withLongPollingTimeout(),
d.withSignature(d.option.tenant, ""),
}
header := make(http.Header)
for _, setter := range headerSetters {
if err := setter(header); err != nil {
return nil, err
}
if !response.Success() {
return nil, "", errors.New(response.String())
}

request := d.c.NewRequest().
WithPath(acmLongPull.String(ip)).
WithFormURLEncodedBody(longPollRequest).
WithHeader(header).
Post()
response, err := d.c.Do(context.TODO(), request)
if err != nil {
return nil, err
}
if !response.Success() {
return nil, errors.New(response.String())
}
var ret []info.Info

s, err := url.QueryUnescape(response.String())
if err != nil {
return nil, errors.New("unescape fail")
}

ss := strings.Split(s, lineSeparator)
for _, s := range ss {
tt := strings.Split(s, wordSeparator)
if len(tt) != 3 {
continue
}
ret := url.QueryEscape(strings.Join([]string{info.DataID, info.Group, d.option.tenant}, wordSeparator) + lineSeparator)
if ret == strings.TrimSpace(response.String()) {
args := new(GetConfigRequest)
args.Tenant = d.option.tenant
args.Group = info.Group
args.DataID = info.DataID
content, err := d.GetConfig(args)
if err != nil {
return nil, "", err
}
contentMD5 := Md5(string(content))
return content, contentMD5, nil
i := info.Info{
DataID: tt[0],
Group: tt[1],
}
return nil, "", noChangeErr
ret = append(ret, i)
}
return ret, nil
}
2 changes: 1 addition & 1 deletion publish_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (d *Diamond) PublishConfig(args *PublishConfigRequest) error {
return err
}
header := make(http.Header)
if err := d.withUsual(args.Tenant, args.Group)(header); err != nil {
if err := d.withSignature(args.Tenant, args.Group)(header); err != nil {
return err
}
request := d.c.NewRequest().
Expand Down

0 comments on commit 845fb17

Please sign in to comment.