Skip to content

Commit

Permalink
feat: refactor observer
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Mar 31, 2020
1 parent fbad9af commit 8a1034f
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 81 deletions.
54 changes: 37 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,54 @@ package main

import (
"fmt"
"github.com/xiaojiaoyu100/aliyun-acm"
aliacm "github.com/xiaojiaoyu100/aliyun-acm/v2"
"github.com/xiaojiaoyu100/aliyun-acm/v2/config"
"github.com/xiaojiaoyu100/aliyun-acm/v2/info"
"github.com/xiaojiaoyu100/aliyun-acm/v2/observer"
)


func Handle(config aliacm.Config) {
fmt.Println(string(config.Content))
func handle(coll map[info.Info]*config.Config) {
for _, o := range coll {
fmt.Println(string(o.Content))
}
}

func main() {
d, err := aliacm.New(
"your_addr",
"your_tenant",
"your_access_key",
"your_secret_key")
addr,
tenant,
accessKey,
secretKey)
if err != nil {
fmt.Println(err)
return
}

o1, err := observer.New(
observer.WithInfo(
info.Info{Group: "YourGroup", DataID: "YourDataID"},
),
observer.WithHandler(handle))
if err != nil {
return
}
var f = func(h aliacm.Unit, err error) {
o2, err := observer.New(
observer.WithInfo(
info.Info{Group: "YourGroup", DataID: "YourDataID"},
info.Info{Group: "YourAnotherGroup", DataID: "YourAnotherDataID"},
),
observer.WithHandler(handle))
if err != nil {
return
}

var f = func(h info.Info, err error) {
fmt.Println(err)
}
d.SetHook(f)
unit := aliacm.Unit{
Group: "your_group",
DataID: "your_data_id",
FetchOnce: true, // 有且仅拉取一次
OnChange: Handle,
}
d.Add(unit)

d.Register(o1, o2)

select{}
}
```
```
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package config

// Config 返回配置
type Config struct {
Content []byte
Content []byte
ContentMD5 string
Pulled bool
Pulled bool
}
129 changes: 89 additions & 40 deletions diamond.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package aliacm

import (
"fmt"
"github.com/xiaojiaoyu100/aliyun-acm/v2/config"
"github.com/xiaojiaoyu100/aliyun-acm/v2/info"
"github.com/xiaojiaoyu100/aliyun-acm/v2/observer"
"github.com/xiaojiaoyu100/curlew"
"math/rand"
"time"

"context"
"github.com/sirupsen/logrus"

"github.com/xiaojiaoyu100/cast"
)

Expand Down Expand Up @@ -40,13 +41,6 @@ const (
ShanghaiFinance1Addr = "addr-cn-shanghai-finance-1-internal.edas.aliyun.com"
)

// Unit 配置基本单位
type Unit struct {
Config
Group string
DataID string
}

// Option 参数设置
type Option struct {
addr string
Expand All @@ -55,17 +49,16 @@ type Option struct {
secretKey string
}



// Diamond 提供了操作阿里云ACM的能力
type Diamond struct {
option Option
c *cast.Cast
errHook Hook
r *rand.Rand
oo []*observer.Observer
all map[info.Info]*config.Config
c curlew.Worker
option Option
c *cast.Cast
errHook Hook
r *rand.Rand
oo []*observer.Observer
filter map[*observer.Observer]struct{}
all map[info.Info]*config.Config
dispatcher *curlew.Dispatcher
}

// New 产生Diamond实例
Expand All @@ -91,11 +84,24 @@ func New(addr, tenant, accessKey, secretKey string, setters ...Setter) (*Diamond
s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)

var monitor = func(err error) {

}

dispatcher, err := curlew.New(
curlew.WithMaxWorkerNum(100),
curlew.WithMonitor(monitor))
if err != nil {
return nil, err
}

d := &Diamond{
option: option,
c: c,
r: r,
all: make(map[info.Info]*config.Config),
option: option,
c: c,
r: r,
filter: make(map[*observer.Observer]struct{}),
all: make(map[info.Info]*config.Config),
dispatcher: dispatcher,
}

for _, setter := range setters {
Expand All @@ -111,54 +117,86 @@ func randomIntInRange(min, max int) int {
return rand.Intn(max-min) + min
}

func (d *Diamond) Register(oo ...*observer.Observer) {
d.oo = append(d.oo, oo...)
// Register registers an observer list.
func (d *Diamond) Register(oo ...*observer.Observer) int64 {
start := time.Now()
for _, o := range oo {
_, ok := d.filter[o]
if ok {
continue
}
d.filter[o] = struct{}{}
d.oo = append(d.oo, o)
}
for _, o := range oo {
for _, i := range o.Info() {
d.all[i] = nil
d.all[i] = &config.Config{}
}
}
for i, _ := range d.all {
for i := range d.all {
req := &GetConfigRequest{
Tenant: d.option.tenant,
Group: i.Group,
Group: i.Group,
DataID: i.DataID,
}
b, err := d.GetConfig(req)
d.hang(i)
if err != nil {
d.checkErr(i, err)
continue
}
d.all[i] = &config.Config{
Content: b,
conf := &config.Config{
Content: b,
ContentMD5: Md5(string(b)),
Pulled: true,
Pulled: true,
}
d.all[i] = conf
for _, o := range d.oo {
o.UpdateInfo(i, conf)
}
}
d.trigger()
d.notify()
return time.Now().Sub(start).Milliseconds()
}

func (d *Diamond) trigger() {
for _, o := range d.oo {
func (d *Diamond) notify() {
for _, o := range d.oo {
if o.Ready() {
o.Handle()
j := curlew.NewJob()
j.Arg = o
j.Fn = func(ctx context.Context, arg interface{}) error {
o := arg.(*observer.Observer)
o.Handle()
return nil
}
d.dispatcher.Submit(j)
}
}
}

func (d *Diamond) hang(i info.Info) {
go func() {
for {
// 防止网络差的时候没挂住
time.Sleep(time.Duration(randomIntInRange(20, 100)) * time.Millisecond)

content, newContentMD5, err := d.LongPull(i, d.all[i].ContentMD5)
d.checkErr(i, err)
// 防止网络较差情景下MD5被重置,重新请求配置,造成阿里云限流
if newContentMD5 != "" {
d.all[i].Content = content
d.all[i].ContentMD5 = newContentMD5
d.all[i].Pulled = true
d.trigger()

// 防止MD5被重置,重新请求
if newContentMD5 == "" {
continue
}
conf := &config.Config{
Content: content,
ContentMD5: newContentMD5,
Pulled: true,
}
d.all[i] = conf
for _, o := range d.oo {
o.UpdateInfo(i, conf)
}
d.notify()
}
}()
}
Expand All @@ -169,11 +207,22 @@ func (d *Diamond) SetHook(h Hook) {
}

func (d *Diamond) checkErr(i info.Info, err error) {
if err == nil {
if shouldIgnore(err) {
return
}
if d.errHook == nil {
return
}
d.errHook(i, err)
}

// LoadingProgress shows the current load progress roughly.
func (d *Diamond) LoadingProgress() string {
var ret = 0
for _, conf := range d.all {
if conf.Pulled {
ret++
}
}
return fmt.Sprintf("%d/%d", ret, len(d.all))
}
14 changes: 7 additions & 7 deletions error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package aliacm

import "context"
import (
"errors"
)

// Error ACM错误
type Error string
Expand All @@ -10,15 +12,13 @@ func (e Error) Error() string {
}

const (
serviceUnavailableErr = Error("ServiceUnavailable")
internalServerErr = Error("InternalServerError")
noChangeErr = Error("NoChangeError")
)

// ShouldIgnore 忽略一些不想关心的错误
func ShouldIgnore(err error) bool {
if err == serviceUnavailableErr ||
err == internalServerErr ||
err == context.Canceled {
func shouldIgnore(err error) bool {
if err == nil ||
errors.Is(err, noChangeErr) {
return true
}
return false
Expand Down
4 changes: 0 additions & 4 deletions handler.go

This file was deleted.

2 changes: 1 addition & 1 deletion info/info.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package info

type Info struct {
Group string
Group string
DataID string
}
8 changes: 1 addition & 7 deletions long_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ func (d *Diamond) LongPull(info info.Info, contentMD5 string) ([]byte, string, e
if err != nil {
return nil, "", err
}
switch response.StatusCode() {
case http.StatusServiceUnavailable:
return nil, "", serviceUnavailableErr
case http.StatusInternalServerError:
return nil, "", internalServerErr
}
if !response.Success() {
return nil, "", errors.New(response.String())
}
Expand All @@ -78,6 +72,6 @@ func (d *Diamond) LongPull(info info.Info, contentMD5 string) ([]byte, string, e
contentMD5 := Md5(string(content))
return content, contentMD5, nil
}
return nil, "", errors.New("long pull unexpected error")
return nil, "", noChangeErr
}
}
15 changes: 12 additions & 3 deletions observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
)

type Observer struct {
coll map[info.Info]*config.Config
h Handler
coll map[info.Info]*config.Config
h Handler
consumed bool
}

func New(ss ...Setting) (*Observer, error) {
Expand All @@ -34,13 +35,21 @@ func (o *Observer) Ready() bool {

func (o *Observer) Info() []info.Info {
var ii []info.Info
for i, _ := range o.coll {
for i := range o.coll {
ii = append(ii, i)
}
return ii
}

func (o *Observer) Handle() {
if o.consumed {
return
}
o.consumed = true
o.h(o.coll)
}

func (o *Observer) UpdateInfo(i info.Info, conf *config.Config) {
o.consumed = false
o.coll[i] = conf
}

0 comments on commit 8a1034f

Please sign in to comment.