Skip to content

Commit

Permalink
feat: observer
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Mar 27, 2020
1 parent e1ddb4e commit fbad9af
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 67 deletions.
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package config

// Config 返回配置
type Config struct {
Content []byte
ContentMD5 string
Pulled bool
}
93 changes: 54 additions & 39 deletions diamond.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package aliacm

import (
"github.com/xiaojiaoyu100/aliyun-acm/v2/config"
"github.com/xiaojiaoyu100/aliyun-acm/v2/info"
"github.com/xiaojiaoyu100/aliyun-acm/v2/observer"
"github.com/xiaojiaoyu100/curlew"
"math/rand"
"time"

Expand Down Expand Up @@ -38,11 +42,9 @@ const (

// Unit 配置基本单位
type Unit struct {
Config
Group string
DataID string
FetchOnce bool
OnChange Handler
ch chan Config
}

// Option 参数设置
Expand All @@ -53,18 +55,17 @@ type Option struct {
secretKey string
}

// Config 返回配置
type Config struct {
Content []byte
}


// Diamond 提供了操作阿里云ACM的能力
type Diamond struct {
option Option
c *cast.Cast
units []Unit
errHook Hook
r *rand.Rand
oo []*observer.Observer
all map[info.Info]*config.Config
c curlew.Worker
}

// New 产生Diamond实例
Expand Down Expand Up @@ -94,6 +95,7 @@ func New(addr, tenant, accessKey, secretKey string, setters ...Setter) (*Diamond
option: option,
c: c,
r: r,
all: make(map[info.Info]*config.Config),
}

for _, setter := range setters {
Expand All @@ -109,40 +111,53 @@ func randomIntInRange(min, max int) int {
return rand.Intn(max-min) + min
}

// Add 添加想要关心的配置单元
func (d *Diamond) Add(unit Unit) {
unit.ch = make(chan Config)
d.units = append(d.units, unit)
var (
contentMD5 string
)
func (d *Diamond) Register(oo ...*observer.Observer) {
d.oo = append(d.oo, oo...)
for _, o := range oo {
for _, i := range o.Info() {
d.all[i] = nil
}
}
for i, _ := range d.all {
req := &GetConfigRequest{
Tenant: d.option.tenant,
Group: i.Group,
DataID: i.DataID,
}
b, err := d.GetConfig(req)
d.hang(i)
if err != nil {
continue
}
d.all[i] = &config.Config{
Content: b,
ContentMD5: Md5(string(b)),
Pulled: true,
}
}
d.trigger()
}

func (d *Diamond) trigger() {
for _, o := range d.oo {
if o.Ready() {
o.Handle()
}
}
}

func (d *Diamond) hang(i info.Info) {
go func() {
for {
time.Sleep(time.Duration(randomIntInRange(20, 100)) * time.Millisecond)
newContentMD5, err := d.LongPull(unit, contentMD5)
d.checkErr(unit, err)
if contentMD5 == "" &&
newContentMD5 != "" && unit.FetchOnce {
return
}
content, newContentMD5, err := d.LongPull(i, d.all[i].ContentMD5)
d.checkErr(i, err)
// 防止网络较差情景下MD5被重置,重新请求配置,造成阿里云限流
if newContentMD5 != "" {
contentMD5 = newContentMD5
}
}
}()

go func() {
for {
select {
case config := <-unit.ch:
var err error
config.Content, err = GbkToUtf8(config.Content)
d.checkErr(unit, err)
unit.OnChange(config)
if unit.FetchOnce {
return
}
d.all[i].Content = content
d.all[i].ContentMD5 = newContentMD5
d.all[i].Pulled = true
d.trigger()
}
}
}()
Expand All @@ -153,12 +168,12 @@ func (d *Diamond) SetHook(h Hook) {
d.errHook = h
}

func (d *Diamond) checkErr(unit Unit, err error) {
func (d *Diamond) checkErr(i info.Info, err error) {
if err == nil {
return
}
if d.errHook == nil {
return
}
d.errHook(unit, err)
d.errHook(i, err)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ go 1.14
require (
github.com/sirupsen/logrus v1.5.0
github.com/xiaojiaoyu100/cast v1.3.0
github.com/xiaojiaoyu100/curlew v0.2.3
golang.org/x/text v0.3.2
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xiaojiaoyu100/cast v1.3.0 h1:XoMMGSKB/jdfQ8Y83p3rf3lQjdDV5Ai1bdXP+pXbjr0=
github.com/xiaojiaoyu100/cast v1.3.0/go.mod h1:smc6oedQStn3NkaLOySSku1MDWcZHKA+hZDijVyyNcs=
github.com/xiaojiaoyu100/curlew v0.2.3 h1:i3S6MU3TEsR77vN34zDTYHuhGHXg7i/yDBXL0mkamGY=
github.com/xiaojiaoyu100/curlew v0.2.3/go.mod h1:T1E4wpTXciXHO0YJPhHz8sQB/MGPLt+M7efYFKK36Hg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924062700-2aa67d56cdd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c h1:S/FtSvpNLtFBgjTqcKsRpsa6aVsI6iztaz1bQd9BJwE=
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
4 changes: 3 additions & 1 deletion hook.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package aliacm

import "github.com/xiaojiaoyu100/aliyun-acm/v2/info"

// Hook 提供了长轮询失败发生的回调
type Hook func(unit Unit, err error)
type Hook func(i info.Info, err error)
6 changes: 6 additions & 0 deletions info/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package info

type Info struct {
Group string
DataID string
}
47 changes: 20 additions & 27 deletions long_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aliacm
import (
"context"
"errors"
"github.com/xiaojiaoyu100/aliyun-acm/v2/info"
"net/http"
"net/url"
"strings"
Expand All @@ -14,77 +15,69 @@ const (
)

// LongPull 监听配置
func (d *Diamond) LongPull(unit Unit, contentMD5 string) (string, error) {
func (d *Diamond) LongPull(info info.Info, contentMD5 string) ([]byte, string, error) {
ip, err := d.QueryIP()
if err != nil {
return "", err
return nil, "", err
}
switch contentMD5 {
case "":
args := new(GetConfigRequest)
args.Tenant = d.option.tenant
args.Group = unit.Group
args.DataID = unit.DataID
args.Group = info.Group
args.DataID = info.DataID
content, err := d.GetConfig(args)
if err != nil {
return "", err
return nil, "", err
}
contentMD5 = Md5(string(content))
config := Config{
Content: content,
}
unit.ch <- config
return contentMD5, nil
return content, contentMD5, nil
default:
headerSetters := []headerSetter{
d.withLongPollingTimeout(),
d.withUsual(d.option.tenant, unit.Group),
d.withUsual(d.option.tenant, info.Group),
}
header := make(http.Header)
for _, setter := range headerSetters {
if err := setter(header); err != nil {
return "", err
return nil, "", err
}
}
var longPollRequest struct {
ProbeModifyRequest string `url:"Probe-Modify-Request"`
}
longPollRequest.ProbeModifyRequest = strings.Join([]string{unit.DataID, unit.Group, contentMD5, d.option.tenant}, wordSeparator) + lineSeparator
longPollRequest.ProbeModifyRequest = strings.Join([]string{info.DataID, info.Group, contentMD5, d.option.tenant}, wordSeparator) + lineSeparator
request := d.c.NewRequest().
WithPath(acmLongPull.String(ip)).
WithFormURLEncodedBody(longPollRequest).
WithHeader(header).
Post()
response, err := d.c.Do(context.TODO(), request)
if err != nil {
return "", err
return nil, "", err
}
switch response.StatusCode() {
case http.StatusServiceUnavailable:
return "", serviceUnavailableErr
return nil, "", serviceUnavailableErr
case http.StatusInternalServerError:
return "", internalServerErr
return nil, "", internalServerErr
}
if !response.Success() {
return "", errors.New(response.String())
return nil, "", errors.New(response.String())
}
ret := url.QueryEscape(strings.Join([]string{unit.DataID, unit.Group, d.option.tenant}, wordSeparator) + lineSeparator)
ret := url.QueryEscape(strings.Join([]string{info.DataID, info.Group, d.option.tenant}, wordSeparator) + lineSeparator)
if ret == strings.TrimSpace(response.String()) {
args := new(GetConfigRequest)
args.Tenant = d.option.tenant
args.Group = unit.Group
args.DataID = unit.DataID
args.Group = info.Group
args.DataID = info.DataID
content, err := d.GetConfig(args)
if err != nil {
return "", err
return nil, "", err
}
contentMD5 := Md5(string(content))
config := Config{
Content: content,
}
unit.ch <- config
return contentMD5, nil
return content, contentMD5, nil
}
return contentMD5, nil
return nil, "", errors.New("long pull unexpected error")
}
}
8 changes: 8 additions & 0 deletions observer/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package observer

import (
"github.com/xiaojiaoyu100/aliyun-acm/v2/config"
"github.com/xiaojiaoyu100/aliyun-acm/v2/info"
)

type Handler func(coll map[info.Info]*config.Config)
46 changes: 46 additions & 0 deletions observer/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package observer

import (
"github.com/xiaojiaoyu100/aliyun-acm/v2/config"
"github.com/xiaojiaoyu100/aliyun-acm/v2/info"
)

type Observer struct {
coll map[info.Info]*config.Config
h Handler
}

func New(ss ...Setting) (*Observer, error) {
o := &Observer{}
o.coll = make(map[info.Info]*config.Config)
for _, s := range ss {
if err := s(o); err != nil {
return nil, err
}
}
return o, nil
}

func (o *Observer) Ready() bool {
var ret = true
for _, c := range o.coll {
if !c.Pulled {
ret = false
break
}
}
return ret
}

func (o *Observer) Info() []info.Info {
var ii []info.Info
for i, _ := range o.coll {
ii = append(ii, i)
}
return ii
}

func (o *Observer) Handle() {
o.h(o.coll)
}

26 changes: 26 additions & 0 deletions observer/setting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package observer

import (
"github.com/xiaojiaoyu100/aliyun-acm/v2/config"
"github.com/xiaojiaoyu100/aliyun-acm/v2/info"
)

type Setting func(o *Observer) error

// WithInfo loads info.
func WithInfo(ii ...info.Info) Setting {
return func(o *Observer) error {
for _, i := range ii {
o.coll[i] = &config.Config{}
}
return nil
}
}

// WithHandler initializes a handler.
func WithHandler(h Handler) Setting {
return func(o *Observer) error {
o.h = h
return nil
}
}

0 comments on commit fbad9af

Please sign in to comment.