From 1e9abad0d746424824cc68c3c9f556274412a6c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?And=C5=BEej=20Maciusovi=C4=8D?= Date: Mon, 11 Apr 2022 15:15:39 +0300 Subject: [PATCH] Retry on kubernetes connection refused errors (#28) --- actions/actions_test.go | 10 +++---- main.go | 39 ++++++++++++++++++++++++++ main_test.go | 62 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 main_test.go diff --git a/actions/actions_test.go b/actions/actions_test.go index 71bdd515..59bf6cdf 100644 --- a/actions/actions_test.go +++ b/actions/actions_test.go @@ -24,7 +24,7 @@ func TestActions(t *testing.T) { log := logrus.New() log.SetLevel(logrus.DebugLevel) cfg := Config{ - PollWaitInterval: 1 * time.Millisecond, + PollWaitInterval: 10 * time.Millisecond, PollTimeout: 100 * time.Millisecond, AckTimeout: 1 * time.Second, AckRetriesCount: 3, @@ -49,7 +49,7 @@ func TestActions(t *testing.T) { return svc } - t.Run("poll, handle and ack", func(t *testing.T) { + t.Run("poll handle and ack", func(t *testing.T) { r := require.New(t) apiActions := []*castai.ClusterAction{ @@ -78,7 +78,7 @@ func TestActions(t *testing.T) { client := mock.NewMockAPIClient(apiActions) handler := &mockAgentActionHandler{handleDelay: 2 * time.Millisecond} svc := newTestService(handler, client) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer func() { cancel() svc.startedActionsWg.Wait() @@ -103,7 +103,7 @@ func TestActions(t *testing.T) { client.GetActionsErr = errors.New("ups") handler := &mockAgentActionHandler{err: errors.New("ups")} svc := newTestService(handler, client) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer func() { cancel() svc.startedActionsWg.Wait() @@ -128,7 +128,7 @@ func TestActions(t *testing.T) { client := mock.NewMockAPIClient(apiActions) handler := &mockAgentActionHandler{err: errors.New("ups")} svc := newTestService(handler, client) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer func() { cancel() svc.startedActionsWg.Wait() diff --git a/main.go b/main.go index c58bb079..8b02de15 100644 --- a/main.go +++ b/main.go @@ -10,9 +10,11 @@ import ( "os" "time" + "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -261,10 +263,47 @@ func retrieveKubeConfig(log logrus.FieldLogger) (*rest.Config, error) { if err != nil { return nil, err } + inClusterConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return &kubeRetryTransport{ + log: log, + next: rt, + maxRetries: 10, + retryInterval: 3 * time.Second, + } + }) log.Debug("using in cluster kubeconfig") return inClusterConfig, nil } +type kubeRetryTransport struct { + log logrus.FieldLogger + next http.RoundTripper + maxRetries uint64 + retryInterval time.Duration +} + +func (rt *kubeRetryTransport) RoundTrip(req *http.Request) (*http.Response, error) { + var resp *http.Response + err := backoff.RetryNotify(func() error { + var err error + resp, err = rt.next.RoundTrip(req) + if err != nil { + // Previously client-go contained logic to retry connection refused errors. See https://github.com/kubernetes/kubernetes/pull/88267/files + if net.IsConnectionRefused(err) { + return err + } + return backoff.Permanent(err) + } + return nil + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(rt.retryInterval), rt.maxRetries), + func(err error, duration time.Duration) { + if err != nil { + rt.log.Warnf("kube api server connection refused, will retry: %v", err) + } + }) + return resp, err +} + type logContextErr struct { err error fields logrus.Fields diff --git a/main_test.go b/main_test.go new file mode 100644 index 00000000..6947e9e1 --- /dev/null +++ b/main_test.go @@ -0,0 +1,62 @@ +package main + +import ( + "errors" + "net/http" + "sync/atomic" + "syscall" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func TestKubeRetryTransport(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + t.Run("retry connection refused error", func(t *testing.T) { + r := require.New(t) + + next := &mockRoundTripper{ + err: syscall.ECONNREFUSED, + } + rt := kubeRetryTransport{ + log: log, + next: next, + maxRetries: 3, + retryInterval: 100 * time.Millisecond, + } + _, err := rt.RoundTrip(nil) + r.EqualError(err, "connection refused") + r.Equal(int32(4), next.calls) + }) + + t.Run("do not retry non connection refused errors", func(t *testing.T) { + r := require.New(t) + + next := &mockRoundTripper{ + err: errors.New("ups"), + } + rt := kubeRetryTransport{ + log: log, + next: next, + maxRetries: 3, + retryInterval: 100 * time.Millisecond, + } + _, err := rt.RoundTrip(nil) + r.EqualError(err, "ups") + r.Equal(int32(1), next.calls) + }) +} + +type mockRoundTripper struct { + err error + calls int32 +} + +func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + atomic.AddInt32(&m.calls, 1) + return nil, m.err +}