Skip to content

Commit

Permalink
priority config
Browse files Browse the repository at this point in the history
  • Loading branch information
realityone committed Dec 13, 2023
1 parent 11f0e18 commit 1f4f9dd
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 175 deletions.
363 changes: 224 additions & 139 deletions api/gateway/config/v1/gateway.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions api/gateway/config/v1/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ message Gateway {
repeated Middleware middlewares = 5;
}

message PriorityConfig {
string name = 1;
string version = 2;
repeated Endpoint endpoints = 3;
}

message Endpoint {
string path = 1;
string method = 2;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/gateway/middleware/cors/v1/cors.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/gateway/middleware/logging/v1/logging.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/gateway/middleware/rewrite/v1/rewrite.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/gateway/middleware/tracing/v1/tracing.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ import (
)

var (
ctrlName string
ctrlService string
discoveryDSN string
proxyAddrs = newSliceVar(":8080")
proxyConfig string
withDebug bool
ctrlName string
ctrlService string
discoveryDSN string
proxyAddrs = newSliceVar(":8080")
proxyConfig string
priorityConfigDir string
withDebug bool
)

type sliceVar struct {
Expand Down Expand Up @@ -71,6 +72,7 @@ func init() {
flag.BoolVar(&withDebug, "debug", false, "enable debug handlers")
flag.Var(&proxyAddrs, "addr", "proxy address, eg: -addr 0.0.0.0:8080")
flag.StringVar(&proxyConfig, "conf", "config.yaml", "config path, eg: -conf config.yaml")
flag.StringVar(&priorityConfigDir, "conf.priority", "", "priority config directory, eg: -conf.priority ./canary")
flag.StringVar(&ctrlName, "ctrl.name", os.Getenv("ADVERTISE_NAME"), "control gateway name, eg: gateway")
flag.StringVar(&ctrlService, "ctrl.service", "", "control service host, eg: http://127.0.0.1:8000")
flag.StringVar(&discoveryDSN, "discovery.dsn", "", "discovery dsn, eg: consul://127.0.0.1:7070?token=secret&datacenter=prod")
Expand Down Expand Up @@ -111,7 +113,7 @@ func main() {
go ctrlLoader.Run(ctx)
}

confLoader, err := config.NewFileLoader(proxyConfig)
confLoader, err := config.NewFileLoader(proxyConfig, priorityConfigDir)
if err != nil {
log.Fatalf("failed to create config file loader: %v", err)
}
Expand Down
17 changes: 14 additions & 3 deletions config/config-loader/ctrl-loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/url"
"os"
"path"
"sync"

"strings"
"time"
Expand All @@ -35,12 +36,19 @@ type CtrlConfigLoader struct {
advertiseName string
advertiseAddr string

lastVersion atomic.String
lastVersion atomic.String
canaryVersions sync.Map
}

type LoadResponse struct {
Config string `json:"config"`
Version string `json:"version"`
Config string `json:"config"`
Version string `json:"version"`
CanaryConfigs map[string]struct {
GroupID int64 `json:"group_id"`
Service string `json:"service"`
Config string `json:"config"`
Version string `json:"version"`
} `json:"canary_configs"`
}

type LoadFeatureResponse struct {
Expand Down Expand Up @@ -123,6 +131,8 @@ func (c *CtrlConfigLoader) Load(ctx context.Context) (err error) {
return err
}

// merge canary configs

tmpPath := fmt.Sprintf("%s.%s.tmp", c.dstPath, uuid.New().String())
if err := os.WriteFile(tmpPath, yamlBytes, 0644); err != nil {
return err
Expand Down Expand Up @@ -212,6 +222,7 @@ func (c *CtrlConfigLoader) load(ctx context.Context) ([]byte, error) {
params.Set("gateway", c.advertiseName)
params.Set("ip_addr", c.advertiseAddr)
params.Set("last_version", c.lastVersion.Load())
params.Set("canary_versions", "a=1,b=2,c=3")
log.Infof("%s is requesting config from %s with params: %+v", c.advertiseName, c.ctrlService, params)
api, err := c.urlfor("/v1/control/gateway/release", params)
if err != nil {
Expand Down
145 changes: 124 additions & 21 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"os"
"net/http"
"os"
"path/filepath"
"reflect"
"sync"
"time"

Expand All @@ -26,18 +28,21 @@ type ConfigLoader interface {
}

type FileLoader struct {
confPath string
confSHA256 string
watchCancel context.CancelFunc
lock sync.RWMutex
onChangeHandlers []OnChange
confPath string
confSHA256 string
priorityDirectory string
priorityConfigHash map[string]string
watchCancel context.CancelFunc
lock sync.RWMutex
onChangeHandlers []OnChange
}

var _jsonOptions = &protojson.UnmarshalOptions{DiscardUnknown: true}

func NewFileLoader(confPath string) (*FileLoader, error) {
func NewFileLoader(confPath string, priorityDirectory string) (*FileLoader, error) {
fl := &FileLoader{
confPath: confPath,
confPath: confPath,
priorityDirectory: priorityDirectory,
}
if err := fl.initialize(); err != nil {
return nil, err
Expand All @@ -46,12 +51,19 @@ func NewFileLoader(confPath string) (*FileLoader, error) {
}

func (f *FileLoader) initialize() error {
sha256hex, err := f.configSHA256()
if f.priorityDirectory != "" {
if err := os.MkdirAll(f.priorityDirectory, 0755); err != nil {
return err
}
}
sha256hex, pfHash, err := f.configSHA256()
if err != nil {
return err
}
f.confSHA256 = sha256hex
log.Infof("the initial config file sha256: %s", sha256hex)
f.priorityConfigHash = pfHash
log.Infof("the initial priority config file sha256 map: %+v", f.priorityConfigHash)

watchCtx, cancel := context.WithCancel(context.Background())
f.watchCancel = cancel
Expand All @@ -64,12 +76,42 @@ func sha256sum(in []byte) string {
return hex.EncodeToString(sum[:])
}

func (f *FileLoader) configSHA256() (string, error) {
func (f *FileLoader) configSHA256() (string, map[string]string, error) {
configData, err := os.ReadFile(f.confPath)
if err != nil {
return "", err
return "", nil, err
}
return sha256sum(configData), nil
hash := sha256sum(configData)
phHash, err := f.priorityConfigSHA256()
if err != nil {
log.Warnf("failed to get priority config sha256: %+v", err)
}
return hash, phHash, nil
}

func (f *FileLoader) priorityConfigSHA256() (map[string]string, error) {
if f.priorityDirectory == "" {
return map[string]string{}, nil
}
entrys, err := os.ReadDir(f.priorityDirectory)
if err != nil {
return nil, err
}
out := map[string]string{}
for _, e := range entrys {
if e.IsDir() {
continue
}
if filepath.Ext(e.Name()) != ".yaml" {
continue
}
configData, err := os.ReadFile(filepath.Join(f.priorityDirectory, e.Name()))
if err != nil {
return nil, err
}
out[e.Name()] = sha256sum(configData)
}
return out, nil
}

func (f *FileLoader) Load(_ context.Context) (*configv1.Gateway, error) {
Expand All @@ -88,9 +130,67 @@ func (f *FileLoader) Load(_ context.Context) (*configv1.Gateway, error) {
if err := _jsonOptions.Unmarshal(jsonData, out); err != nil {
return nil, err
}
if err := f.mergePriorityConfig(out); err != nil {
log.Warnf("failed to merge priority config: %+v", err)
}
return out, nil
}

func (f *FileLoader) mergePriorityConfig(dst *configv1.Gateway) error {
if f.priorityDirectory == "" {
return nil
}
entrys, err := os.ReadDir(f.priorityDirectory)
if err != nil {
return err
}
for _, e := range entrys {
if e.IsDir() {
continue
}
if filepath.Ext(e.Name()) != ".yaml" {
continue
}
cfgPath := filepath.Join(f.priorityDirectory, e.Name())
pCfg, err := f.parsePriorityConfig(cfgPath)
if err != nil {
log.Warnf("failed to parse priority config: %s: %+v, skip merge this file", cfgPath, err)
continue
}
for _, e := range pCfg.Endpoints {
dst.Endpoints = ReplaceOrAppendEndpoint(dst.Endpoints, e)
}
log.Infof("succeeded to merge priority config: %s, %d endpoints effected", cfgPath, len(pCfg.Endpoints))
}
return nil
}

func (f *FileLoader) parsePriorityConfig(cfgPath string) (*configv1.PriorityConfig, error) {
configData, err := os.ReadFile(cfgPath)
if err != nil {
return nil, err
}
jsonData, err := yaml.YAMLToJSON(configData)
if err != nil {
return nil, err
}
out := &configv1.PriorityConfig{}
if err := _jsonOptions.Unmarshal(jsonData, out); err != nil {
return nil, err
}
return out, nil
}

func ReplaceOrAppendEndpoint(dst []*configv1.Endpoint, item *configv1.Endpoint) []*configv1.Endpoint {
for i, e := range dst {
if e.Path == item.Path && e.Method == item.Method {
dst[i] = item
return dst
}
}
return append(dst, item)
}

func (f *FileLoader) Watch(fn OnChange) {
log.Info("add config file change event handler")
f.lock.Lock()
Expand Down Expand Up @@ -122,18 +222,19 @@ func (f *FileLoader) watchproc(ctx context.Context) {
case <-time.After(time.Second * 5):
}
func() {
sha256hex, err := f.configSHA256()
sha256hex, pfHash, err := f.configSHA256()
if err != nil {
log.Errorf("watch config file error: %+v", err)
return
}
if sha256hex != f.confSHA256 {
log.Infof("config file changed, reload config, last sha256: %s, new sha256: %s", f.confSHA256, sha256hex)
if sha256hex != f.confSHA256 || !reflect.DeepEqual(pfHash, f.priorityConfigHash) {
log.Infof("config file changed, reload config, last sha256: %s, new sha256: %s, last pfHash: %+v, new pfHash: %+v", f.confSHA256, sha256hex, f.priorityConfigHash, pfHash)
if err := f.executeLoader(); err != nil {
log.Errorf("execute config loader error with new sha256: %s: %+v, config digest will not be changed until all loaders are succeeded", sha256hex, err)
return
}
f.confSHA256 = sha256hex
f.priorityConfigHash = pfHash
return
}
}()
Expand All @@ -145,18 +246,20 @@ func (f *FileLoader) Close() {
}

type InspectFileLoader struct {
ConfPath string `json:"confPath"`
ConfSHA256 string `json:"confSha256"`
OnChangeHandlers int64 `json:"onChangeHandlers"`
ConfPath string `json:"confPath"`
ConfSHA256 string `json:"confSha256"`
PriorityConfigHash map[string]string `json:"priorityConfigHash"`
OnChangeHandlers int64 `json:"onChangeHandlers"`
}

func (f *FileLoader) DebugHandler() http.Handler {
debugMux := http.NewServeMux()
debugMux.HandleFunc("/debug/config/inspect", func(rw http.ResponseWriter, r *http.Request) {
out := &InspectFileLoader{
ConfPath: f.confPath,
ConfSHA256: f.confSHA256,
OnChangeHandlers: int64(len(f.onChangeHandlers)),
ConfPath: f.confPath,
ConfSHA256: f.confSHA256,
PriorityConfigHash: f.priorityConfigHash,
OnChangeHandlers: int64(len(f.onChangeHandlers)),
}
rw.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(rw).Encode(out)
Expand Down

0 comments on commit 1f4f9dd

Please sign in to comment.