Skip to content

Commit

Permalink
Merge pull request rancher#45510 from vatsalparekh/re-sync-fix
Browse files Browse the repository at this point in the history
[WIP] Use Mutex & Lease to stop concurrent secret creation
  • Loading branch information
bigkevmcd authored Jul 5, 2024
2 parents 1d34dbc + 278fd7a commit 75f1e7e
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 0 deletions.
79 changes: 79 additions & 0 deletions pkg/serviceaccounttoken/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,39 @@ package serviceaccounttoken
import (
"context"
"fmt"
"sync"
"time"

corecontrollers "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
coordinationv1 "k8s.io/api/coordination/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/utils/ptr"
)

const (
// ServiceAccountSecretLabel is the label used to search for the secret belonging to a service account.
ServiceAccountSecretLabel = "cattle.io/service-account.name"

serviceAccountSecretAnnotation = "kubernetes.io/service-account.name"

// LeasePrefix is the name of the lease used to manage concurrency
LeasePrefix = "sa-token-lease-"
)

var lockMap sync.Map

func getLock(key string) *sync.Mutex {
actual, _ := lockMap.LoadOrStore(key, &sync.Mutex{})
return actual.(*sync.Mutex)
}

// secretLister is an abstraction over any kind of secret lister.
// The caller can use any cache or client it has available, whether that is from norman, wrangler, or client-go,
// as long as it can wrap it in a simplified lambda with this signature.
Expand All @@ -33,6 +47,24 @@ func EnsureSecretForServiceAccount(ctx context.Context, secretsCache corecontrol
if sa == nil {
return nil, fmt.Errorf("could not ensure secret for invalid service account")
}

// Lock avoids multiple calls to this func at the same time and creation of same resources multiple times
// Mutex is a addition to the Lease, it helps sync within the pod and avoid multiple Lease waits from the same pod
mutex := getLock(fmt.Sprintf("%v-%v", sa.Namespace, sa.Name))
mutex.Lock()
defer mutex.Unlock()

// Acquire lease
// acquireLease has a wait.Backoff until the lease is acquired, so this will be a blocking func call
if err := acquireLease(ctx, clientSet, sa.Namespace, sa.Name); err != nil {
return nil, fmt.Errorf("error acquiring lease: %w", err)
}
defer func() {
if err := releaseLease(ctx, clientSet, sa.Namespace, sa.Name); err != nil {
logrus.Errorf("error releasing lease: %v", err)
}
}()

secretClient := clientSet.CoreV1().Secrets(sa.Namespace)
var secretLister secretLister
if secretsCache != nil {
Expand Down Expand Up @@ -164,3 +196,50 @@ func isSecretForServiceAccount(secret *v1.Secret, sa *v1.ServiceAccount) bool {
annotation := annotations[serviceAccountSecretAnnotation]
return sa.Name == annotation
}

func acquireLease(ctx context.Context, clientSet kubernetes.Interface, namespace, name string) error {
leaseClient := clientSet.CoordinationV1().Leases(namespace)
leaseDuration := int32(30)
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprint(LeasePrefix + name),
Namespace: namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("serviceaccounttoken-controller"),
LeaseDurationSeconds: ptr.To(leaseDuration),
},
}
// Wait for the Lease to be granted
backoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 1.0,
Jitter: 1.0,
Steps: 50,
}
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
_, err := leaseClient.Create(ctx, lease, metav1.CreateOptions{})
// if create was success, meaning we got the lease
if err == nil {
return true, nil
}
// if lease already exists, another request has this lease, continue to wait
if errors.IsAlreadyExists(err) {
return false, nil
}
return false, err
})
if err != nil {
return fmt.Errorf("error acquiring the lease for %v: %w", name, err)
}
return nil
}

func releaseLease(ctx context.Context, clientSet kubernetes.Interface, namespace, name string) error {
leaseClient := clientSet.CoordinationV1().Leases(namespace)
err := leaseClient.Delete(ctx, fmt.Sprint(LeasePrefix+name), metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("error deleting lease: %w", err)
}
return nil
}
67 changes: 67 additions & 0 deletions pkg/serviceaccounttoken/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package serviceaccounttoken
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/rand"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -339,6 +341,71 @@ func TestServiceAccountSecret(t *testing.T) {
}
}

func TestEnsureSecretForServiceAccount_in_parallel(t *testing.T) {
k8sClient := fake.NewSimpleClientset()
var m sync.Mutex
var created []*v1.Secret

k8sClient.PrependReactor("*", "leases",
func(a k8stesting.Action) (bool, runtime.Object, error) {
switch action := a.(type) {
case k8stesting.CreateAction:
ret := action.GetObject()
return true, ret, nil
case k8stesting.DeleteAction:
return true, nil, nil
}
return false, nil, nil
})

k8sClient.PrependReactor("list", "secrets",
func(a k8stesting.Action) (bool, runtime.Object, error) {
m.Lock()
defer m.Unlock()

secrets := &v1.SecretList{}
for _, v := range created {
secrets.Items = append(secrets.Items, *v)
}

return true, secrets, nil
},
)

k8sClient.PrependReactor("create", "secrets",
func(action k8stesting.Action) (bool, runtime.Object, error) {
ret := action.(k8stesting.CreateAction).GetObject().(*v1.Secret)
ret.ObjectMeta.Name = ret.GenerateName + rand.String(5)
ret.Data = map[string][]byte{
"token": []byte("abcde"),
}
created = append(created, ret)
return true, ret, nil
},
)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := EnsureSecretForServiceAccount(context.Background(), nil, k8sClient, &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
})
assert.NoError(t, err)
}()
}

wg.Wait()

if l := len(created); l != 1 {
t.Fatalf("EnsureSecretForServiceAccount() created %d secrets, want 1", l)
}
}

type fakeSecretLister struct {
secrets []*v1.Secret
err error
Expand Down
96 changes: 96 additions & 0 deletions tests/v2/integration/serviceaccount/serviceaccounttoken_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package integration

import (
"context"
"sync"
"testing"
"time"

"github.com/rancher/rancher/pkg/serviceaccounttoken"
"github.com/rancher/shepherd/clients/rancher"
"github.com/rancher/shepherd/pkg/session"
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

type ServiceAccountSuite struct {
suite.Suite
client *rancher.Client
session *session.Session
}

func (s *ServiceAccountSuite) TearDownSuite() {
s.session.Cleanup()
}

func (s *ServiceAccountSuite) SetupSuite() {
testSession := session.NewSession()
s.session = testSession

client, err := rancher.NewClient("", testSession)
s.Require().NoError(err)
s.client = client
}

func (s *ServiceAccountSuite) TestSingleSecretForServiceAccount() {
localCluster, err := s.client.Management.Cluster.ByID("local")
s.Require().NoError(err)
s.Require().NotEmpty(localCluster)
localClusterKubeconfig, err := s.client.Management.Cluster.ActionGenerateKubeconfig(localCluster)
s.Require().NoError(err)
c, err := clientcmd.NewClientConfigFromBytes([]byte(localClusterKubeconfig.Config))
s.Require().NoError(err)
cc, err := c.ClientConfig()
s.Require().NoError(err)
clientset, err := kubernetes.NewForConfig(cc)
s.Require().NoError(err)

testNS := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ns",
},
}
testNS, err = clientset.CoreV1().Namespaces().Create(context.Background(), testNS, metav1.CreateOptions{})
s.Require().NoError(err)

serviceAccount := &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: testNS.Name,
},
}
serviceAccount, err = clientset.CoreV1().ServiceAccounts(testNS.Name).Create(context.Background(), serviceAccount, metav1.CreateOptions{})
s.Require().NoError(err)

// mimic a scenario where multiple func calls for the same SA, and check the resulting Secrets
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := serviceaccounttoken.EnsureSecretForServiceAccount(context.Background(), nil, clientset, serviceAccount)
s.Require().NoError(err)
}()
}

// Define the interval for checking.
interval := 500 * time.Millisecond
s.Assert().Eventually(func() bool {
leases, err := clientset.CoordinationV1().Leases(testNS.Name).List(context.Background(), metav1.ListOptions{})
s.Require().NoError(err)
return len(leases.Items) <= 1
}, time.Second*50, interval)

wg.Wait()

secrets, err := clientset.CoreV1().Secrets(testNS.Name).List(context.Background(), metav1.ListOptions{})
s.Require().NoError(err)
s.Assert().Equal(len(secrets.Items), 1)
}

func TestSATestSuite(t *testing.T) {
suite.Run(t, new(ServiceAccountSuite))
}

0 comments on commit 75f1e7e

Please sign in to comment.