From 7226883c390b94fc3acc472f517d84b5fa027fab Mon Sep 17 00:00:00 2001 From: jikang_cui Date: Tue, 15 Nov 2022 22:17:08 +0800 Subject: [PATCH] add retry mechanism for pushing config (#8) --- control_plane.go | 50 +++++++++++++++++++++++++++++------------- go.mod | 4 +++- go.sum | 6 +++++ pkg/main/main.go | 7 +++++- pkg/options/options.go | 33 ++++++++++++++++++++++++++++ 5 files changed, 83 insertions(+), 17 deletions(-) create mode 100644 pkg/options/options.go diff --git a/control_plane.go b/control_plane.go index 1cedfec..292019d 100644 --- a/control_plane.go +++ b/control_plane.go @@ -15,14 +15,17 @@ package opensergo import ( - "os" - "sync" - + "github.com/avast/retry-go/v4" "github.com/opensergo/opensergo-control-plane/pkg/controller" "github.com/opensergo/opensergo-control-plane/pkg/model" + "github.com/opensergo/opensergo-control-plane/pkg/options" trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "os" + "sync" ) type ControlPlane struct { @@ -31,11 +34,14 @@ type ControlPlane struct { protoDesc *trpb.ControlPlaneDesc + opts *options.Options + mux sync.RWMutex } -func NewControlPlane() (*ControlPlane, error) { +func NewControlPlane(opts *options.Options) (*ControlPlane, error) { cp := &ControlPlane{} + cp.opts = opts operator, err := controller.NewKubernetesOperator(cp.sendMessage) if err != nil { @@ -89,20 +95,34 @@ func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion return nil } -func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error { +func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, trpbStatus *trpb.Status, respId string) error { if stream == nil { return nil } - return stream.SendMsg(&trpb.SubscribeResponse{ - Status: status, - Ack: "", - Namespace: namespace, - App: app, - Kind: kind, - DataWithVersion: dataWithVersion, - ControlPlane: c.protoDesc, - ResponseId: respId, - }) + + return retry.Do( + func() error { + error := stream.SendMsg(&trpb.SubscribeResponse{ + Status: trpbStatus, + Ack: "", + Namespace: namespace, + App: app, + Kind: kind, + DataWithVersion: dataWithVersion, + ControlPlane: c.protoDesc, + ResponseId: respId, + }) + + return error + }, + retry.Attempts(uint(c.opts.ConfigPushMaxAttempt)), + retry.RetryIf(func(err error) bool { + s, _ := status.FromError(err) + if s.Code() == codes.DeadlineExceeded { + return true + } + return false + })) } func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { diff --git a/go.mod b/go.mod index 8d430a9..bf8e953 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,15 @@ go 1.14 require ( github.com/alibaba/sentinel-golang v1.0.3 + github.com/avast/retry-go/v4 v4.3.0 github.com/envoyproxy/protoc-gen-validate v0.1.0 github.com/go-logr/logr v0.4.0 github.com/json-iterator/go v1.1.12 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/pkg/errors v0.9.1 github.com/rogpeppe/go-internal v1.8.0 // indirect - github.com/stretchr/testify v1.7.1 // indirect + github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.7.1 go.uber.org/atomic v1.7.0 golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368 // indirect diff --git a/go.sum b/go.sum index c7d8905..7bd9064 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/avast/retry-go/v4 v4.3.0 h1:cqI48aXx0BExKoM7XPklDpoHAg7/srPPLAfWG5z62jo= +github.com/avast/retry-go/v4 v4.3.0/go.mod h1:bqOlT4nxk4phk9buiQFaghzjpqdchOSwPgjdfdQBtdg= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= @@ -476,6 +478,7 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -484,6 +487,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tklauser/go-sysconf v0.3.6/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI= github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM= @@ -853,6 +857,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/main/main.go b/pkg/main/main.go index d58df5a..d29f635 100644 --- a/pkg/main/main.go +++ b/pkg/main/main.go @@ -15,13 +15,18 @@ package main import ( + "github.com/opensergo/opensergo-control-plane/pkg/options" "log" "github.com/opensergo/opensergo-control-plane" ) func main() { - cp, err := opensergo.NewControlPlane() + opts, err := options.NewOption() + if err != nil { + log.Fatal(err) + } + cp, err := opensergo.NewControlPlane(opts) if err != nil { log.Fatal(err) } diff --git a/pkg/options/options.go b/pkg/options/options.go new file mode 100644 index 0000000..099d9d4 --- /dev/null +++ b/pkg/options/options.go @@ -0,0 +1,33 @@ +package options + +import ( + "github.com/spf13/pflag" + "log" + "os" +) + +type Options struct { + ConfigPushMaxAttempt int + flags *pflag.FlagSet +} + +func (o *Options) AddFlag() { + o.flags.IntVar(&o.ConfigPushMaxAttempt, "ConfigPushMaxAttempt", 3, "max times for pushing config after timeout error") +} + +func (o *Options) Parse() error { + err := o.flags.Parse(os.Args) + return err +} + +func NewOption() (*Options, error) { + o := &Options{ + flags: pflag.NewFlagSet("sergo-flag", pflag.ExitOnError), + } + o.AddFlag() + err := o.Parse() + if err != nil { + log.Fatalf("Parse flag failure: %s", err) + } + return o, err +}