From 278fd7a816c0e4e02b9eae063d17da0d063f831b Mon Sep 17 00:00:00 2001 From: Vatsal Parekh Date: Thu, 16 May 2024 17:50:04 +0530 Subject: [PATCH] Use Mutex & Lease to stop concurrent secret creation Signed-off-by: Vatsal Parekh --- pkg/serviceaccounttoken/secret.go | 79 +++++++++++++++ pkg/serviceaccounttoken/secret_test.go | 67 +++++++++++++ .../serviceaccounttoken_test.go | 96 +++++++++++++++++++ 3 files changed, 242 insertions(+) create mode 100644 tests/v2/integration/serviceaccount/serviceaccounttoken_test.go diff --git a/pkg/serviceaccounttoken/secret.go b/pkg/serviceaccounttoken/secret.go index b9f461f2ed5..198cd6e5f6f 100644 --- a/pkg/serviceaccounttoken/secret.go +++ b/pkg/serviceaccounttoken/secret.go @@ -3,16 +3,20 @@ package serviceaccounttoken import ( "context" "fmt" + "sync" "time" corecontrollers "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" "github.com/sirupsen/logrus" + coordinationv1 "k8s.io/api/coordination/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/utils/ptr" ) const ( @@ -20,8 +24,18 @@ const ( ServiceAccountSecretLabel = "cattle.io/service-account.name" serviceAccountSecretAnnotation = "kubernetes.io/service-account.name" + + // LeasePrefix is the name of the lease used to manage concurrency + LeasePrefix = "sa-token-lease-" ) +var lockMap sync.Map + +func getLock(key string) *sync.Mutex { + actual, _ := lockMap.LoadOrStore(key, &sync.Mutex{}) + return actual.(*sync.Mutex) +} + // secretLister is an abstraction over any kind of secret lister. // The caller can use any cache or client it has available, whether that is from norman, wrangler, or client-go, // as long as it can wrap it in a simplified lambda with this signature. @@ -33,6 +47,24 @@ func EnsureSecretForServiceAccount(ctx context.Context, secretsCache corecontrol if sa == nil { return nil, fmt.Errorf("could not ensure secret for invalid service account") } + + // Lock avoids multiple calls to this func at the same time and creation of same resources multiple times + // Mutex is a addition to the Lease, it helps sync within the pod and avoid multiple Lease waits from the same pod + mutex := getLock(fmt.Sprintf("%v-%v", sa.Namespace, sa.Name)) + mutex.Lock() + defer mutex.Unlock() + + // Acquire lease + // acquireLease has a wait.Backoff until the lease is acquired, so this will be a blocking func call + if err := acquireLease(ctx, clientSet, sa.Namespace, sa.Name); err != nil { + return nil, fmt.Errorf("error acquiring lease: %w", err) + } + defer func() { + if err := releaseLease(ctx, clientSet, sa.Namespace, sa.Name); err != nil { + logrus.Errorf("error releasing lease: %v", err) + } + }() + secretClient := clientSet.CoreV1().Secrets(sa.Namespace) var secretLister secretLister if secretsCache != nil { @@ -164,3 +196,50 @@ func isSecretForServiceAccount(secret *v1.Secret, sa *v1.ServiceAccount) bool { annotation := annotations[serviceAccountSecretAnnotation] return sa.Name == annotation } + +func acquireLease(ctx context.Context, clientSet kubernetes.Interface, namespace, name string) error { + leaseClient := clientSet.CoordinationV1().Leases(namespace) + leaseDuration := int32(30) + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprint(LeasePrefix + name), + Namespace: namespace, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: ptr.To("serviceaccounttoken-controller"), + LeaseDurationSeconds: ptr.To(leaseDuration), + }, + } + // Wait for the Lease to be granted + backoff := wait.Backoff{ + Duration: 500 * time.Millisecond, + Factor: 1.0, + Jitter: 1.0, + Steps: 50, + } + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + _, err := leaseClient.Create(ctx, lease, metav1.CreateOptions{}) + // if create was success, meaning we got the lease + if err == nil { + return true, nil + } + // if lease already exists, another request has this lease, continue to wait + if errors.IsAlreadyExists(err) { + return false, nil + } + return false, err + }) + if err != nil { + return fmt.Errorf("error acquiring the lease for %v: %w", name, err) + } + return nil +} + +func releaseLease(ctx context.Context, clientSet kubernetes.Interface, namespace, name string) error { + leaseClient := clientSet.CoordinationV1().Leases(namespace) + err := leaseClient.Delete(ctx, fmt.Sprint(LeasePrefix+name), metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("error deleting lease: %w", err) + } + return nil +} diff --git a/pkg/serviceaccounttoken/secret_test.go b/pkg/serviceaccounttoken/secret_test.go index 527ebd26b01..c493ae7ee8e 100644 --- a/pkg/serviceaccounttoken/secret_test.go +++ b/pkg/serviceaccounttoken/secret_test.go @@ -3,6 +3,8 @@ package serviceaccounttoken import ( "context" "fmt" + "k8s.io/apimachinery/pkg/util/rand" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -339,6 +341,71 @@ func TestServiceAccountSecret(t *testing.T) { } } +func TestEnsureSecretForServiceAccount_in_parallel(t *testing.T) { + k8sClient := fake.NewSimpleClientset() + var m sync.Mutex + var created []*v1.Secret + + k8sClient.PrependReactor("*", "leases", + func(a k8stesting.Action) (bool, runtime.Object, error) { + switch action := a.(type) { + case k8stesting.CreateAction: + ret := action.GetObject() + return true, ret, nil + case k8stesting.DeleteAction: + return true, nil, nil + } + return false, nil, nil + }) + + k8sClient.PrependReactor("list", "secrets", + func(a k8stesting.Action) (bool, runtime.Object, error) { + m.Lock() + defer m.Unlock() + + secrets := &v1.SecretList{} + for _, v := range created { + secrets.Items = append(secrets.Items, *v) + } + + return true, secrets, nil + }, + ) + + k8sClient.PrependReactor("create", "secrets", + func(action k8stesting.Action) (bool, runtime.Object, error) { + ret := action.(k8stesting.CreateAction).GetObject().(*v1.Secret) + ret.ObjectMeta.Name = ret.GenerateName + rand.String(5) + ret.Data = map[string][]byte{ + "token": []byte("abcde"), + } + created = append(created, ret) + return true, ret, nil + }, + ) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := EnsureSecretForServiceAccount(context.Background(), nil, k8sClient, &v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + }) + assert.NoError(t, err) + }() + } + + wg.Wait() + + if l := len(created); l != 1 { + t.Fatalf("EnsureSecretForServiceAccount() created %d secrets, want 1", l) + } +} + type fakeSecretLister struct { secrets []*v1.Secret err error diff --git a/tests/v2/integration/serviceaccount/serviceaccounttoken_test.go b/tests/v2/integration/serviceaccount/serviceaccounttoken_test.go new file mode 100644 index 00000000000..1d06547f0d6 --- /dev/null +++ b/tests/v2/integration/serviceaccount/serviceaccounttoken_test.go @@ -0,0 +1,96 @@ +package integration + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/rancher/rancher/pkg/serviceaccounttoken" + "github.com/rancher/shepherd/clients/rancher" + "github.com/rancher/shepherd/pkg/session" + "github.com/stretchr/testify/suite" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +type ServiceAccountSuite struct { + suite.Suite + client *rancher.Client + session *session.Session +} + +func (s *ServiceAccountSuite) TearDownSuite() { + s.session.Cleanup() +} + +func (s *ServiceAccountSuite) SetupSuite() { + testSession := session.NewSession() + s.session = testSession + + client, err := rancher.NewClient("", testSession) + s.Require().NoError(err) + s.client = client +} + +func (s *ServiceAccountSuite) TestSingleSecretForServiceAccount() { + localCluster, err := s.client.Management.Cluster.ByID("local") + s.Require().NoError(err) + s.Require().NotEmpty(localCluster) + localClusterKubeconfig, err := s.client.Management.Cluster.ActionGenerateKubeconfig(localCluster) + s.Require().NoError(err) + c, err := clientcmd.NewClientConfigFromBytes([]byte(localClusterKubeconfig.Config)) + s.Require().NoError(err) + cc, err := c.ClientConfig() + s.Require().NoError(err) + clientset, err := kubernetes.NewForConfig(cc) + s.Require().NoError(err) + + testNS := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ns", + }, + } + testNS, err = clientset.CoreV1().Namespaces().Create(context.Background(), testNS, metav1.CreateOptions{}) + s.Require().NoError(err) + + serviceAccount := &v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: testNS.Name, + }, + } + serviceAccount, err = clientset.CoreV1().ServiceAccounts(testNS.Name).Create(context.Background(), serviceAccount, metav1.CreateOptions{}) + s.Require().NoError(err) + + // mimic a scenario where multiple func calls for the same SA, and check the resulting Secrets + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := serviceaccounttoken.EnsureSecretForServiceAccount(context.Background(), nil, clientset, serviceAccount) + s.Require().NoError(err) + }() + } + + // Define the interval for checking. + interval := 500 * time.Millisecond + s.Assert().Eventually(func() bool { + leases, err := clientset.CoordinationV1().Leases(testNS.Name).List(context.Background(), metav1.ListOptions{}) + s.Require().NoError(err) + return len(leases.Items) <= 1 + }, time.Second*50, interval) + + wg.Wait() + + secrets, err := clientset.CoreV1().Secrets(testNS.Name).List(context.Background(), metav1.ListOptions{}) + s.Require().NoError(err) + s.Assert().Equal(len(secrets.Items), 1) +} + +func TestSATestSuite(t *testing.T) { + suite.Run(t, new(ServiceAccountSuite)) +}