Skip to content

Commit

Permalink
Retry on kubernetes connection refused errors (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Apr 11, 2022
1 parent 3ea1743 commit 1e9abad
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
10 changes: 5 additions & 5 deletions actions/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestActions(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)
cfg := Config{
PollWaitInterval: 1 * time.Millisecond,
PollWaitInterval: 10 * time.Millisecond,
PollTimeout: 100 * time.Millisecond,
AckTimeout: 1 * time.Second,
AckRetriesCount: 3,
Expand All @@ -49,7 +49,7 @@ func TestActions(t *testing.T) {
return svc
}

t.Run("poll, handle and ack", func(t *testing.T) {
t.Run("poll handle and ack", func(t *testing.T) {
r := require.New(t)

apiActions := []*castai.ClusterAction{
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestActions(t *testing.T) {
client := mock.NewMockAPIClient(apiActions)
handler := &mockAgentActionHandler{handleDelay: 2 * time.Millisecond}
svc := newTestService(handler, client)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer func() {
cancel()
svc.startedActionsWg.Wait()
Expand All @@ -103,7 +103,7 @@ func TestActions(t *testing.T) {
client.GetActionsErr = errors.New("ups")
handler := &mockAgentActionHandler{err: errors.New("ups")}
svc := newTestService(handler, client)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer func() {
cancel()
svc.startedActionsWg.Wait()
Expand All @@ -128,7 +128,7 @@ func TestActions(t *testing.T) {
client := mock.NewMockAPIClient(apiActions)
handler := &mockAgentActionHandler{err: errors.New("ups")}
svc := newTestService(handler, client)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer func() {
cancel()
svc.startedActionsWg.Wait()
Expand Down
39 changes: 39 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"os"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -261,10 +263,47 @@ func retrieveKubeConfig(log logrus.FieldLogger) (*rest.Config, error) {
if err != nil {
return nil, err
}
inClusterConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &kubeRetryTransport{
log: log,
next: rt,
maxRetries: 10,
retryInterval: 3 * time.Second,
}
})
log.Debug("using in cluster kubeconfig")
return inClusterConfig, nil
}

type kubeRetryTransport struct {
log logrus.FieldLogger
next http.RoundTripper
maxRetries uint64
retryInterval time.Duration
}

func (rt *kubeRetryTransport) RoundTrip(req *http.Request) (*http.Response, error) {
var resp *http.Response
err := backoff.RetryNotify(func() error {
var err error
resp, err = rt.next.RoundTrip(req)
if err != nil {
// Previously client-go contained logic to retry connection refused errors. See https://github.com/kubernetes/kubernetes/pull/88267/files
if net.IsConnectionRefused(err) {
return err
}
return backoff.Permanent(err)
}
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(rt.retryInterval), rt.maxRetries),
func(err error, duration time.Duration) {
if err != nil {
rt.log.Warnf("kube api server connection refused, will retry: %v", err)
}
})
return resp, err
}

type logContextErr struct {
err error
fields logrus.Fields
Expand Down
62 changes: 62 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"errors"
"net/http"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

func TestKubeRetryTransport(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)

t.Run("retry connection refused error", func(t *testing.T) {
r := require.New(t)

next := &mockRoundTripper{
err: syscall.ECONNREFUSED,
}
rt := kubeRetryTransport{
log: log,
next: next,
maxRetries: 3,
retryInterval: 100 * time.Millisecond,
}
_, err := rt.RoundTrip(nil)
r.EqualError(err, "connection refused")
r.Equal(int32(4), next.calls)
})

t.Run("do not retry non connection refused errors", func(t *testing.T) {
r := require.New(t)

next := &mockRoundTripper{
err: errors.New("ups"),
}
rt := kubeRetryTransport{
log: log,
next: next,
maxRetries: 3,
retryInterval: 100 * time.Millisecond,
}
_, err := rt.RoundTrip(nil)
r.EqualError(err, "ups")
r.Equal(int32(1), next.calls)
})
}

type mockRoundTripper struct {
err error
calls int32
}

func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
atomic.AddInt32(&m.calls, 1)
return nil, m.err
}

0 comments on commit 1e9abad

Please sign in to comment.