From dadc79b3d338bd2f4b37e3488c453d145450e436 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 14:03:08 +0100 Subject: [PATCH 01/29] Add secret cache --- .../providers/kubernetessecrets/config.go | 6 + .../kubernetessecrets/kubernetes_secrets.go | 115 ++++++++++++++---- 2 files changed, 97 insertions(+), 24 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 95ff308c3aa..6875ccf1711 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -10,4 +10,10 @@ import "github.com/elastic/elastic-agent-autodiscover/kubernetes" type Config struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` + + TTL int `config:"ttl"` +} + +func (c *Config) InitDefaults() { + c.TTL = 10 } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 1537a232dd1..652f4d83de3 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -8,6 +8,7 @@ import ( "context" "strings" "sync" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sclient "k8s.io/client-go/kubernetes" @@ -33,6 +34,9 @@ type contextProviderK8sSecrets struct { clientMx sync.Mutex client k8sclient.Interface + + secretsCacheMx sync.Mutex + secretsCache map[string]string } // ContextProviderBuilder builds the context provider. @@ -46,19 +50,102 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo return nil, errors.New(err, "failed to unpack configuration") } return &contextProviderK8sSecrets{ - logger: logger, - config: &cfg, + logger: logger, + config: &cfg, + secretsCache: make(map[string]string), }, nil } func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { - // key = "kubernetes_secrets.somenamespace.somesecret.value" + return p.getFromCache(key) +} + +// Run initializes the k8s secrets context provider. +func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { + client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) + if err != nil { + p.logger.Debugf("Kubernetes_secrets provider skipped, unable to connect: %s", err) + return nil + } + p.clientMx.Lock() + p.client = client + p.clientMx.Unlock() + go p.updateSecrets(ctx) + + <-comm.Done() + + p.clientMx.Lock() + p.client = nil + p.clientMx.Unlock() + return comm.Err() +} + +func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { + return kubernetes.GetKubernetesClient(kubeconfig, opt) +} + +// Update the secrets in the cache every TTL minutes +func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { + d := time.Duration(p.config.TTL) * time.Minute + timer := time.NewTimer(d) + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + p.updateCache() + timer.Reset(d) + } + } +} + +func (p *contextProviderK8sSecrets) updateCache() { + p.secretsCacheMx.Lock() + for name, _ := range p.secretsCache { + newValue, ok := p.fetchSecret(name) + + // remove the secret from the cache + if !ok { + delete(p.secretsCache, name) + } else { + p.secretsCache[name] = newValue + } + } + p.secretsCacheMx.Unlock() +} + +func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { + p.secretsCacheMx.Lock() + value, ok := p.secretsCache[key] + p.secretsCacheMx.Unlock() + + // if value is still not present in cache, it is possible we haven't tried to fetch it yet + if !ok { + value, ok = p.addToCache(key) + } + return value, ok +} + +func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) { + value, ok := p.fetchSecret(key) + if ok { + p.secretsCacheMx.Lock() + p.secretsCache[key] = value + p.secretsCacheMx.Unlock() + } + return value, ok +} + +func (p *contextProviderK8sSecrets) fetchSecret(key string) (string, bool) { p.clientMx.Lock() client := p.client p.clientMx.Unlock() if client == nil { return "", false } + + // key = "kubernetes_secrets.somenamespace.somesecret.value" tokens := strings.Split(key, ".") if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" { return "", false @@ -87,26 +174,6 @@ func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { return "", false } secretString := secret.Data[secretVar] - return string(secretString), true -} -// Run initializes the k8s secrets context provider. -func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { - client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) - if err != nil { - p.logger.Debugf("Kubernetes_secrets provider skipped, unable to connect: %s", err) - return nil - } - p.clientMx.Lock() - p.client = client - p.clientMx.Unlock() - <-comm.Done() - p.clientMx.Lock() - p.client = nil - p.clientMx.Unlock() - return comm.Err() -} - -func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { - return kubernetes.GetKubernetesClient(kubeconfig, opt) + return string(secretString), true } From ea3a100b94aec0906b4c8547ce092a176c123cb4 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 14:18:29 +0100 Subject: [PATCH 02/29] Add changelog --- .../1701091034-add-cache-for-secrets.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1701091034-add-cache-for-secrets.yaml diff --git a/changelog/fragments/1701091034-add-cache-for-secrets.yaml b/changelog/fragments/1701091034-add-cache-for-secrets.yaml new file mode 100644 index 00000000000..3c52bafa577 --- /dev/null +++ b/changelog/fragments/1701091034-add-cache-for-secrets.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: add cache for secrets when using kubernetes secret provider + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3822 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3594 From a71101b03e33b711aa6bc6b7f69148bb26b85259 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 14:21:04 +0100 Subject: [PATCH 03/29] Remove assignment --- .../providers/kubernetessecrets/kubernetes_secrets.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 652f4d83de3..fa2033fc46e 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -102,7 +102,7 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { func (p *contextProviderK8sSecrets) updateCache() { p.secretsCacheMx.Lock() - for name, _ := range p.secretsCache { + for name := range p.secretsCache { newValue, ok := p.fetchSecret(name) // remove the secret from the cache From 125330fb9199e25ad278b69ba1d367bd9cefba02 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 15:18:46 +0100 Subject: [PATCH 04/29] Change TTL default to 1 min --- internal/pkg/composable/providers/kubernetessecrets/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 6875ccf1711..c7e73987924 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -15,5 +15,5 @@ type Config struct { } func (c *Config) InitDefaults() { - c.TTL = 10 + c.TTL = 1 } From 09ba2b62d323ced51c7415b82243e3fff4f40bdd Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 16:16:52 +0100 Subject: [PATCH 05/29] - Remove secrets based on last access - Update duration config format --- .../providers/kubernetessecrets/config.go | 6 +- .../kubernetessecrets/kubernetes_secrets.go | 56 +++++++++++++++---- 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index c7e73987924..6bebda6fd6a 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -11,9 +11,11 @@ type Config struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` - TTL int `config:"ttl"` + TTL string `config:"ttl"` } +var defaultTTL = "60s" + func (c *Config) InitDefaults() { - c.TTL = 1 + c.TTL = defaultTTL } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index fa2033fc46e..eee899211ff 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -36,7 +36,12 @@ type contextProviderK8sSecrets struct { client k8sclient.Interface secretsCacheMx sync.Mutex - secretsCache map[string]string + secretsCache map[string]secretsData +} + +type secretsData struct { + value string + lastAccess time.Time } // ContextProviderBuilder builds the context provider. @@ -52,7 +57,7 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo return &contextProviderK8sSecrets{ logger: logger, config: &cfg, - secretsCache: make(map[string]string), + secretsCache: make(map[string]secretsData), }, nil } @@ -86,7 +91,16 @@ func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclien // Update the secrets in the cache every TTL minutes func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { - d := time.Duration(p.config.TTL) * time.Minute + d, err := time.ParseDuration(p.config.TTL) + if err != nil { + p.logger.Debugf( + "TTL %v is not valid: %v. Default of %v will be used.", + p.config.TTL, + err, + defaultTTL, + ) + d, _ = time.ParseDuration(defaultTTL) + } timer := time.NewTimer(d) for { @@ -102,14 +116,21 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { func (p *contextProviderK8sSecrets) updateCache() { p.secretsCacheMx.Lock() - for name := range p.secretsCache { + for name, data := range p.secretsCache { newValue, ok := p.fetchSecret(name) // remove the secret from the cache if !ok { delete(p.secretsCache, name) } else { - p.secretsCache[name] = newValue + // if the secret has not been accessed in over 1h, delete it + // to avoid keeping secrets in cache that are no longer in use + diff := time.Now().Sub(data.lastAccess).Hours() + if diff > 1 { + delete(p.secretsCache, name) + } else { + data.value = newValue + } } } p.secretsCacheMx.Unlock() @@ -117,24 +138,37 @@ func (p *contextProviderK8sSecrets) updateCache() { func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { p.secretsCacheMx.Lock() - value, ok := p.secretsCache[key] + _, ok := p.secretsCache[key] p.secretsCacheMx.Unlock() // if value is still not present in cache, it is possible we haven't tried to fetch it yet if !ok { - value, ok = p.addToCache(key) + data, ok := p.addToCache(key) + // if it was not possible to fetch the secret, return + if !ok { + return data.value, ok + } } - return value, ok + + p.secretsCacheMx.Lock() + data, ok := p.secretsCache[key] + data.lastAccess = time.Now() + p.secretsCacheMx.Unlock() + + return data.value, ok } -func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) { +func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { value, ok := p.fetchSecret(key) + data := secretsData{ + value: value, + } if ok { p.secretsCacheMx.Lock() - p.secretsCache[key] = value + p.secretsCache[key] = data p.secretsCacheMx.Unlock() } - return value, ok + return data, ok } func (p *contextProviderK8sSecrets) fetchSecret(key string) (string, bool) { From 4908655a0e3958875ba1af679ab5b89afe2d03de Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 16:19:40 +0100 Subject: [PATCH 06/29] - Remove secrets based on last access - Update duration config format --- .../providers/kubernetessecrets/kubernetes_secrets.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index eee899211ff..9269578a435 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -125,7 +125,7 @@ func (p *contextProviderK8sSecrets) updateCache() { } else { // if the secret has not been accessed in over 1h, delete it // to avoid keeping secrets in cache that are no longer in use - diff := time.Now().Sub(data.lastAccess).Hours() + diff := time.Since(data.lastAccess).Hours() if diff > 1 { delete(p.secretsCache, name) } else { From a291bae7edbadb2c340d6a1c8af0985e08b873c8 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 17:24:16 +0100 Subject: [PATCH 07/29] - Update duration config format - Add ttl for both update and delete --- .../providers/kubernetessecrets/config.go | 14 +++++++++----- .../kubernetessecrets/kubernetes_secrets.go | 19 ++++--------------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 6bebda6fd6a..af7a1370fc5 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -4,18 +4,22 @@ package kubernetessecrets -import "github.com/elastic/elastic-agent-autodiscover/kubernetes" +import ( + "time" + + "github.com/elastic/elastic-agent-autodiscover/kubernetes" +) // Config for kubernetes provider type Config struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` - TTL string `config:"ttl"` + TTLUpdate time.Duration `config:"ttl_update"` + TTLDelete time.Duration `config:"ttl_delete"` } -var defaultTTL = "60s" - func (c *Config) InitDefaults() { - c.TTL = defaultTTL + c.TTLUpdate = 60 * time.Second + c.TTLDelete = 1 * time.Hour } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 9269578a435..f4b0e5860cb 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -91,25 +91,14 @@ func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclien // Update the secrets in the cache every TTL minutes func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { - d, err := time.ParseDuration(p.config.TTL) - if err != nil { - p.logger.Debugf( - "TTL %v is not valid: %v. Default of %v will be used.", - p.config.TTL, - err, - defaultTTL, - ) - d, _ = time.ParseDuration(defaultTTL) - } - timer := time.NewTimer(d) - + timer := time.NewTimer(p.config.TTLUpdate) for { select { case <-ctx.Done(): return case <-timer.C: p.updateCache() - timer.Reset(d) + timer.Reset(p.config.TTLUpdate) } } } @@ -125,8 +114,8 @@ func (p *contextProviderK8sSecrets) updateCache() { } else { // if the secret has not been accessed in over 1h, delete it // to avoid keeping secrets in cache that are no longer in use - diff := time.Since(data.lastAccess).Hours() - if diff > 1 { + diff := time.Since(data.lastAccess) + if diff > p.config.TTLDelete { delete(p.secretsCache, name) } else { data.value = newValue From 9f5b92c261d5cca13023dc6208a4f24ce5628683 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 18:19:10 +0100 Subject: [PATCH 08/29] Add unit test --- .../kubernetessecrets/kubernetes_secrets.go | 12 +- .../kubernetes_secrets_test.go | 107 ++++++++++++++++++ 2 files changed, 111 insertions(+), 8 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index f4b0e5860cb..ee3669eaa57 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -106,16 +106,12 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { func (p *contextProviderK8sSecrets) updateCache() { p.secretsCacheMx.Lock() for name, data := range p.secretsCache { - newValue, ok := p.fetchSecret(name) - - // remove the secret from the cache - if !ok { + diff := time.Since(data.lastAccess) + if diff > p.config.TTLDelete { delete(p.secretsCache, name) } else { - // if the secret has not been accessed in over 1h, delete it - // to avoid keeping secrets in cache that are no longer in use - diff := time.Since(data.lastAccess) - if diff > p.config.TTLDelete { + newValue, ok := p.fetchSecret(name) + if !ok { delete(p.secretsCache, name) } else { data.value = newValue diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 9924c84e6bc..e92d9e38bfe 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -138,3 +138,110 @@ func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { assert.False(t, found) assert.EqualValues(t, val, "") } + +func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { + client := k8sfake.NewSimpleClientset() + + ttlDelete, err := time.ParseDuration("1s") + require.NoError(t, err) + + ttlUpdate, err := time.ParseDuration("100ms") + require.NoError(t, err) + + secret := &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testing_secret", + Namespace: ns, + }, + Data: map[string][]byte{ + "secret_value": []byte(pass), + }, + } + _, err = client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) + require.NoError(t, err) + + logger := logp.NewLogger("test_k8s_secrets") + + c := map[string]interface{}{ + "ttl_update": ttlUpdate, + "ttl_delete": ttlDelete, + } + cfg, err := config.NewConfigFrom(c) + require.NoError(t, err) + + p, err := ContextProviderBuilder(logger, cfg, true) + require.NoError(t, err) + + fp, _ := p.(*contextProviderK8sSecrets) + + getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { + return client, nil + } + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + comm := ctesting.NewContextComm(ctx) + + go func() { + _ = fp.Run(ctx, comm) + }() + + for { + fp.clientMx.Lock() + client := fp.client + fp.clientMx.Unlock() + if client != nil { + break + } + <-time.After(10 * time.Millisecond) + } + + // Secret cache should be empty at start + fp.secretsCacheMx.Lock() + assert.Equal(t, len(fp.secretsCache), 0) + fp.secretsCacheMx.Unlock() + + // Secret should be in the cache after this call + val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") + assert.True(t, found) + assert.Equal(t, val, pass) + fp.secretsCacheMx.Lock() + assert.Equal(t, len(fp.secretsCache), 1) + fp.secretsCacheMx.Unlock() + + // Update the secret and check after TTL time, the secret value is correct + newPass := "new-pass" + secret = &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testing_secret", + Namespace: ns, + }, + Data: map[string][]byte{ + "secret_value": []byte(newPass), + }, + } + _, err = client.CoreV1().Secrets(ns).Update(context.Background(), secret, metav1.UpdateOptions{}) + require.NoError(t, err) + + // wait for ttl update + <-time.After(ttlUpdate) + val, found = fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") + assert.True(t, found) + assert.Equal(t, val, newPass) + + // After TTL delete, secret should no longer be found in cache since it was never + // fetched during that time + <-time.After(ttlDelete) + fp.secretsCacheMx.Lock() + assert.Equal(t, 0, len(fp.secretsCache)) + fp.secretsCacheMx.Unlock() +} From 9d948597bbd3448e3ff66693cf2431a89d05bb5f Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 27 Nov 2023 18:20:33 +0100 Subject: [PATCH 09/29] Add unit test --- .../providers/kubernetessecrets/kubernetes_secrets_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index e92d9e38bfe..b9e42b8b4df 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -203,7 +203,7 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { // Secret cache should be empty at start fp.secretsCacheMx.Lock() - assert.Equal(t, len(fp.secretsCache), 0) + assert.Equal(t, 0, len(fp.secretsCache)) fp.secretsCacheMx.Unlock() // Secret should be in the cache after this call From 6d30148af47e1cd07c23aac69cf6b03aa3cd147b Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 28 Nov 2023 09:18:23 +0100 Subject: [PATCH 10/29] - Use assert.eventually - Switch to RWMutex - Create new map --- .../kubernetessecrets/kubernetes_secrets.go | 17 +++++++---- .../kubernetes_secrets_test.go | 29 ++++++++++++------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index ee3669eaa57..de45261b112 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -35,8 +35,8 @@ type contextProviderK8sSecrets struct { clientMx sync.Mutex client k8sclient.Interface - secretsCacheMx sync.Mutex - secretsCache map[string]secretsData + secretsCacheMx sync.RWMutex + secretsCache map[string]*secretsData } type secretsData struct { @@ -57,7 +57,7 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo return &contextProviderK8sSecrets{ logger: logger, config: &cfg, - secretsCache: make(map[string]secretsData), + secretsCache: make(map[string]*secretsData), }, nil } @@ -105,6 +105,9 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { func (p *contextProviderK8sSecrets) updateCache() { p.secretsCacheMx.Lock() + // deleting entries does not free the memory, so we need to create a new map + // to place the ones the secrets we want to keep + cacheTmp := make(map[string]*secretsData) for name, data := range p.secretsCache { diff := time.Since(data.lastAccess) if diff > p.config.TTLDelete { @@ -115,16 +118,18 @@ func (p *contextProviderK8sSecrets) updateCache() { delete(p.secretsCache, name) } else { data.value = newValue + cacheTmp[name] = data } } } + p.secretsCache = cacheTmp p.secretsCacheMx.Unlock() } func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { - p.secretsCacheMx.Lock() + p.secretsCacheMx.RLock() _, ok := p.secretsCache[key] - p.secretsCacheMx.Unlock() + p.secretsCacheMx.RUnlock() // if value is still not present in cache, it is possible we haven't tried to fetch it yet if !ok { @@ -150,7 +155,7 @@ func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { } if ok { p.secretsCacheMx.Lock() - p.secretsCache[key] = data + p.secretsCache[key] = &data p.secretsCacheMx.Unlock() } return data, ok diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index b9e42b8b4df..3590fba371c 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -206,13 +206,17 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { assert.Equal(t, 0, len(fp.secretsCache)) fp.secretsCacheMx.Unlock() + key := "kubernetes_secrets.test_namespace.testing_secret.secret_value" + // Secret should be in the cache after this call - val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") + val, found := fp.Fetch(key) assert.True(t, found) assert.Equal(t, val, pass) - fp.secretsCacheMx.Lock() + fp.secretsCacheMx.RLock() assert.Equal(t, len(fp.secretsCache), 1) - fp.secretsCacheMx.Unlock() + assert.NotNil(t, fp.secretsCache[key]) + assert.NotZero(t, fp.secretsCache[key].lastAccess) + fp.secretsCacheMx.RUnlock() // Update the secret and check after TTL time, the secret value is correct newPass := "new-pass" @@ -233,15 +237,18 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { require.NoError(t, err) // wait for ttl update - <-time.After(ttlUpdate) - val, found = fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") - assert.True(t, found) - assert.Equal(t, val, newPass) + assert.Eventuallyf(t, func() bool { + val, found = fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") + return found && val == newPass + }, 3*ttlUpdate, ttlUpdate, "Failed to update secret in time.") // After TTL delete, secret should no longer be found in cache since it was never // fetched during that time - <-time.After(ttlDelete) - fp.secretsCacheMx.Lock() - assert.Equal(t, 0, len(fp.secretsCache)) - fp.secretsCacheMx.Unlock() + assert.Eventuallyf(t, func() bool { + fp.secretsCacheMx.Lock() + size := len(fp.secretsCache) + fp.secretsCacheMx.Unlock() + return size == 0 + }, 3*ttlDelete, ttlDelete, "Failed to remove secret from the cache after TTL Delete has passed.") + } From d7c8756dc613610b3745d243a82b18f3b32f86fa Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 28 Nov 2023 09:30:59 +0100 Subject: [PATCH 11/29] - Fix typos --- .../providers/kubernetessecrets/kubernetes_secrets.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index de45261b112..faac1450657 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -106,7 +106,7 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { func (p *contextProviderK8sSecrets) updateCache() { p.secretsCacheMx.Lock() // deleting entries does not free the memory, so we need to create a new map - // to place the ones the secrets we want to keep + // to place the secrets we want to keep cacheTmp := make(map[string]*secretsData) for name, data := range p.secretsCache { diff := time.Since(data.lastAccess) @@ -186,9 +186,9 @@ func (p *contextProviderK8sSecrets) fetchSecret(key string) (string, bool) { secretName := tokens[2] secretVar := tokens[3] - secretIntefrace := client.CoreV1().Secrets(ns) + secretInterface := client.CoreV1().Secrets(ns) ctx := context.TODO() - secret, err := secretIntefrace.Get(ctx, secretName, metav1.GetOptions{}) + secret, err := secretInterface.Get(ctx, secretName, metav1.GetOptions{}) if err != nil { p.logger.Errorf("Could not retrieve secret from k8s API: %v", err) return "", false From b3ba2192a841d4e353ac625620a88643b87144ef Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 28 Nov 2023 10:10:52 +0100 Subject: [PATCH 12/29] - Move key validation to addToCache --- .../kubernetessecrets/kubernetes_secrets.go | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index faac1450657..5d216d4b6f8 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -149,6 +149,24 @@ func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { } func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { + // Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value" + tokens := strings.Split(key, ".") + if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" { + return secretsData{ + value: "", + }, false + } + if len(tokens) != 4 { + p.logger.Debugf( + "not valid secret key: %v. Secrets should be of the following format %v", + key, + "kubernetes_secrets.somenamespace.somesecret.value", + ) + return secretsData{ + value: "", + }, false + } + value, ok := p.fetchSecret(key) data := secretsData{ value: value, @@ -169,19 +187,12 @@ func (p *contextProviderK8sSecrets) fetchSecret(key string) (string, bool) { return "", false } - // key = "kubernetes_secrets.somenamespace.somesecret.value" tokens := strings.Split(key, ".") - if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" { - return "", false - } - if len(tokens) != 4 { - p.logger.Debugf( - "not valid secret key: %v. Secrets should be of the following format %v", - key, - "kubernetes_secrets.somenamespace.somesecret.value", - ) - return "", false - } + // key has the format "kubernetes_secrets.somenamespace.somesecret.value" + // This function is only called from: + // - addToCache, where we already validated that the key has the right format. + // - updateCache, where the results are only added to the cache through addToCache + // Because of this we no longer need to validate the key ns := tokens[1] secretName := tokens[2] secretVar := tokens[3] From 8913d04c2cdb0de918020de1c712097edc924c66 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 28 Nov 2023 10:35:56 +0100 Subject: [PATCH 13/29] fix race in tests --- .../providers/kubernetessecrets/kubernetes_secrets_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 3590fba371c..879e97680b8 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -237,6 +237,7 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { require.NoError(t, err) // wait for ttl update + <-time.After(ttlUpdate) assert.Eventuallyf(t, func() bool { val, found = fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") return found && val == newPass @@ -244,6 +245,7 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { // After TTL delete, secret should no longer be found in cache since it was never // fetched during that time + <-time.After(ttlDelete) assert.Eventuallyf(t, func() bool { fp.secretsCacheMx.Lock() size := len(fp.secretsCache) From 0fcd65a7cababfd2807ff39d5eda725d71f3edd0 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 28 Nov 2023 11:15:12 +0100 Subject: [PATCH 14/29] fix race in tests --- .../kubernetes_secrets_test.go | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 879e97680b8..d22159114e9 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -237,20 +237,17 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { require.NoError(t, err) // wait for ttl update - <-time.After(ttlUpdate) - assert.Eventuallyf(t, func() bool { - val, found = fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") - return found && val == newPass - }, 3*ttlUpdate, ttlUpdate, "Failed to update secret in time.") + <-time.After(ttlUpdate * 2) + val, found = fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") + assert.True(t, found) + assert.Equal(t, newPass, val) // After TTL delete, secret should no longer be found in cache since it was never // fetched during that time - <-time.After(ttlDelete) - assert.Eventuallyf(t, func() bool { - fp.secretsCacheMx.Lock() - size := len(fp.secretsCache) - fp.secretsCacheMx.Unlock() - return size == 0 - }, 3*ttlDelete, ttlDelete, "Failed to remove secret from the cache after TTL Delete has passed.") + <-time.After(ttlDelete * 2) + fp.secretsCacheMx.Lock() + size := len(fp.secretsCache) + fp.secretsCacheMx.Unlock() + assert.Equal(t, 0, size) } From 57ad12c03ace4c9ec2d786cd9c5ab24774003ae9 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 28 Nov 2023 11:17:04 +0100 Subject: [PATCH 15/29] fix race in tests --- .../providers/kubernetessecrets/kubernetes_secrets_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index d22159114e9..61ecfcfa98e 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -238,7 +238,7 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { // wait for ttl update <-time.After(ttlUpdate * 2) - val, found = fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") + val, found = fp.Fetch(key) assert.True(t, found) assert.Equal(t, newPass, val) From 8533ec4617031f1829b72b4e683cb8410d15c802 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 28 Nov 2023 11:53:20 +0100 Subject: [PATCH 16/29] fix race in tests --- .../kubernetessecrets/kubernetes_secrets.go | 5 +++-- .../kubernetes_secrets_test.go | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 5d216d4b6f8..efcf81383fd 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -140,12 +140,13 @@ func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { } } + var pass string p.secretsCacheMx.Lock() data, ok := p.secretsCache[key] data.lastAccess = time.Now() + pass = data.value p.secretsCacheMx.Unlock() - - return data.value, ok + return pass, ok } func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 61ecfcfa98e..77c27df19a8 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -237,17 +237,20 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { require.NoError(t, err) // wait for ttl update - <-time.After(ttlUpdate * 2) - val, found = fp.Fetch(key) - assert.True(t, found) - assert.Equal(t, newPass, val) + <-time.After(ttlUpdate) + assert.Eventuallyf(t, func() bool { + val, found = fp.Fetch(key) + return found && val == newPass + }, ttlUpdate*3, ttlUpdate, "Failed to update the secret value after TTL update has passed.") // After TTL delete, secret should no longer be found in cache since it was never // fetched during that time - <-time.After(ttlDelete * 2) - fp.secretsCacheMx.Lock() - size := len(fp.secretsCache) - fp.secretsCacheMx.Unlock() - assert.Equal(t, 0, size) + <-time.After(ttlDelete) + assert.Eventuallyf(t, func() bool { + fp.secretsCacheMx.RLock() + size := len(fp.secretsCache) + fp.secretsCacheMx.RUnlock() + return size == 0 + }, ttlDelete*3, ttlDelete, "Failed to delete the secret after TTL delete has passed.") } From c65ae201dc938f74140f8d9e349570ac868bc9ef Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 4 Dec 2023 18:17:48 +0100 Subject: [PATCH 17/29] Rename TTLUpdate to refresh interval. --- .../composable/providers/kubernetessecrets/config.go | 6 +++--- .../providers/kubernetessecrets/kubernetes_secrets.go | 6 +++--- .../kubernetessecrets/kubernetes_secrets_test.go | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index af7a1370fc5..c00c35e8999 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -15,11 +15,11 @@ type Config struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` - TTLUpdate time.Duration `config:"ttl_update"` - TTLDelete time.Duration `config:"ttl_delete"` + RefreshInterval time.Duration `config:"refresh_interval"` + TTLDelete time.Duration `config:"ttl_delete"` } func (c *Config) InitDefaults() { - c.TTLUpdate = 60 * time.Second + c.RefreshInterval = 60 * time.Second c.TTLDelete = 1 * time.Hour } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index efcf81383fd..901584a35d8 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -89,16 +89,16 @@ func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclien return kubernetes.GetKubernetesClient(kubeconfig, opt) } -// Update the secrets in the cache every TTL minutes +// Update the secrets in the cache every RefreshInterval func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { - timer := time.NewTimer(p.config.TTLUpdate) + timer := time.NewTimer(p.config.RefreshInterval) for { select { case <-ctx.Done(): return case <-timer.C: p.updateCache() - timer.Reset(p.config.TTLUpdate) + timer.Reset(p.config.RefreshInterval) } } } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 77c27df19a8..3fea2dc4555 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -145,7 +145,7 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { ttlDelete, err := time.ParseDuration("1s") require.NoError(t, err) - ttlUpdate, err := time.ParseDuration("100ms") + refreshInterval, err := time.ParseDuration("100ms") require.NoError(t, err) secret := &v1.Secret{ @@ -167,8 +167,8 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { logger := logp.NewLogger("test_k8s_secrets") c := map[string]interface{}{ - "ttl_update": ttlUpdate, - "ttl_delete": ttlDelete, + "refresh_interval": refreshInterval, + "ttl_delete": ttlDelete, } cfg, err := config.NewConfigFrom(c) require.NoError(t, err) @@ -237,11 +237,11 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { require.NoError(t, err) // wait for ttl update - <-time.After(ttlUpdate) + <-time.After(refreshInterval) assert.Eventuallyf(t, func() bool { val, found = fp.Fetch(key) return found && val == newPass - }, ttlUpdate*3, ttlUpdate, "Failed to update the secret value after TTL update has passed.") + }, refreshInterval*3, refreshInterval, "Failed to update the secret value after TTL update has passed.") // After TTL delete, secret should no longer be found in cache since it was never // fetched during that time From 87f045372b59d0bc20b923a2f8c28c64f4749f8f Mon Sep 17 00:00:00 2001 From: constanca-m Date: Tue, 5 Dec 2023 09:44:48 +0100 Subject: [PATCH 18/29] Add context timeout. --- .../providers/kubernetessecrets/config.go | 2 + .../kubernetessecrets/kubernetes_secrets.go | 66 +++++++++++++------ 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index c00c35e8999..031a32f5144 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -17,9 +17,11 @@ type Config struct { RefreshInterval time.Duration `config:"refresh_interval"` TTLDelete time.Duration `config:"ttl_delete"` + Timeout time.Duration `config:"timeout"` } func (c *Config) InitDefaults() { c.RefreshInterval = 60 * time.Second c.TTLDelete = 1 * time.Hour + c.Timeout = 5 * time.Second } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 901584a35d8..c0b6ad9c2fd 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -104,24 +104,28 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { } func (p *contextProviderK8sSecrets) updateCache() { - p.secretsCacheMx.Lock() // deleting entries does not free the memory, so we need to create a new map // to place the secrets we want to keep cacheTmp := make(map[string]*secretsData) + + p.secretsCacheMx.RLock() for name, data := range p.secretsCache { diff := time.Since(data.lastAccess) - if diff > p.config.TTLDelete { - delete(p.secretsCache, name) - } else { - newValue, ok := p.fetchSecret(name) - if !ok { - delete(p.secretsCache, name) - } else { - data.value = newValue - cacheTmp[name] = data + if diff < p.config.TTLDelete { + value, ok := p.fetchSecretWithTimeout(name) + if ok { + newData := &secretsData{ + value: value, + lastAccess: data.lastAccess, + } + cacheTmp[name] = newData } + } } + p.secretsCacheMx.RUnlock() + + p.secretsCacheMx.Lock() p.secretsCache = cacheTmp p.secretsCacheMx.Unlock() } @@ -168,7 +172,7 @@ func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { }, false } - value, ok := p.fetchSecret(key) + value, ok := p.fetchSecretWithTimeout(key) data := secretsData{ value: value, } @@ -180,12 +184,34 @@ func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { return data, ok } -func (p *contextProviderK8sSecrets) fetchSecret(key string) (string, bool) { +type Result struct { + value string + ok bool +} + +func (p *contextProviderK8sSecrets) fetchSecretWithTimeout(key string) (string, bool) { + ctxTimeout, cancel := context.WithTimeout(context.Background(), p.config.Timeout) + defer cancel() + + resultCh := make(chan Result, 1) + go p.fetchSecret(ctxTimeout, key, resultCh) + + select { + case <-ctxTimeout.Done(): + p.logger.Errorf("Could not retrieve value for key %v: %v", key, ctxTimeout.Err()) + return "", false + case result := <-resultCh: + return result.value, result.ok + } +} + +func (p *contextProviderK8sSecrets) fetchSecret(context context.Context, key string, resultCh chan Result) { p.clientMx.Lock() client := p.client p.clientMx.Unlock() if client == nil { - return "", false + resultCh <- Result{value: "", ok: false} + return } tokens := strings.Split(key, ".") @@ -199,17 +225,19 @@ func (p *contextProviderK8sSecrets) fetchSecret(key string) (string, bool) { secretVar := tokens[3] secretInterface := client.CoreV1().Secrets(ns) - ctx := context.TODO() - secret, err := secretInterface.Get(ctx, secretName, metav1.GetOptions{}) + secret, err := secretInterface.Get(context, secretName, metav1.GetOptions{}) + if err != nil { p.logger.Errorf("Could not retrieve secret from k8s API: %v", err) - return "", false + resultCh <- Result{value: "", ok: false} + return } if _, ok := secret.Data[secretVar]; !ok { p.logger.Errorf("Could not retrieve value %v for secret %v", secretVar, secretName) - return "", false + resultCh <- Result{value: "", ok: false} + return } - secretString := secret.Data[secretVar] - return string(secretString), true + secretString := secret.Data[secretVar] + resultCh <- Result{value: string(secretString), ok: true} } From f02823e4b7b8dd50fc953c2eab4d900c077af56f Mon Sep 17 00:00:00 2001 From: constanca-m Date: Wed, 6 Dec 2023 08:34:45 +0100 Subject: [PATCH 19/29] Switch reading lock to writing lock. --- .../providers/kubernetessecrets/kubernetes_secrets.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index c0b6ad9c2fd..666ea78c965 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -6,6 +6,7 @@ package kubernetessecrets import ( "context" + "fmt" "strings" "sync" "time" @@ -108,7 +109,7 @@ func (p *contextProviderK8sSecrets) updateCache() { // to place the secrets we want to keep cacheTmp := make(map[string]*secretsData) - p.secretsCacheMx.RLock() + p.secretsCacheMx.Lock() for name, data := range p.secretsCache { diff := time.Since(data.lastAccess) if diff < p.config.TTLDelete { @@ -123,9 +124,6 @@ func (p *contextProviderK8sSecrets) updateCache() { } } - p.secretsCacheMx.RUnlock() - - p.secretsCacheMx.Lock() p.secretsCache = cacheTmp p.secretsCacheMx.Unlock() } @@ -148,6 +146,7 @@ func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { p.secretsCacheMx.Lock() data, ok := p.secretsCache[key] data.lastAccess = time.Now() + fmt.Println(data.lastAccess) pass = data.value p.secretsCacheMx.Unlock() return pass, ok From 97d6c8420259a335828409e1b67e4eac605bcafe Mon Sep 17 00:00:00 2001 From: constanca-m Date: Wed, 6 Dec 2023 08:38:59 +0100 Subject: [PATCH 20/29] Switch reading lock to writing lock. --- .../kubernetessecrets/kubernetes_secrets.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 666ea78c965..8129c6eb4b5 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -6,7 +6,6 @@ package kubernetessecrets import ( "context" - "fmt" "strings" "sync" "time" @@ -129,10 +128,10 @@ func (p *contextProviderK8sSecrets) updateCache() { } func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { - p.secretsCacheMx.RLock() - _, ok := p.secretsCache[key] - p.secretsCacheMx.RUnlock() + p.secretsCacheMx.Lock() + defer p.secretsCacheMx.Unlock() + _, ok := p.secretsCache[key] // if value is still not present in cache, it is possible we haven't tried to fetch it yet if !ok { data, ok := p.addToCache(key) @@ -142,16 +141,14 @@ func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { } } - var pass string - p.secretsCacheMx.Lock() data, ok := p.secretsCache[key] data.lastAccess = time.Now() - fmt.Println(data.lastAccess) - pass = data.value - p.secretsCacheMx.Unlock() + pass := data.value + return pass, ok } +// This is only called by getFromCache function, which is already locking the cache. func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { // Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value" tokens := strings.Split(key, ".") @@ -176,9 +173,7 @@ func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { value: value, } if ok { - p.secretsCacheMx.Lock() p.secretsCache[key] = &data - p.secretsCacheMx.Unlock() } return data, ok } From 0f8d86a4955f6737e74b504e83f11a6188fcb3fb Mon Sep 17 00:00:00 2001 From: constanca-m Date: Fri, 8 Dec 2023 08:21:13 +0100 Subject: [PATCH 21/29] - Add disable cache option - Remove goroutine - Refactor timeout name to requestTimeout --- .../providers/kubernetessecrets/config.go | 6 +- .../kubernetessecrets/kubernetes_secrets.go | 36 +++-- .../kubernetes_secrets_test.go | 128 +++++++++++------- 3 files changed, 113 insertions(+), 57 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 031a32f5144..8d0ca1bcd9a 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -17,11 +17,13 @@ type Config struct { RefreshInterval time.Duration `config:"refresh_interval"` TTLDelete time.Duration `config:"ttl_delete"` - Timeout time.Duration `config:"timeout"` + RequestTimeout time.Duration `config:"request_timeout"` + DisableCache bool `config:"disable_cache"` } func (c *Config) InitDefaults() { c.RefreshInterval = 60 * time.Second c.TTLDelete = 1 * time.Hour - c.Timeout = 5 * time.Second + c.RequestTimeout = 5 * time.Second + c.DisableCache = false } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 8129c6eb4b5..8bf6e53ae3d 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -62,7 +62,16 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo } func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { - return p.getFromCache(key) + if p.config.DisableCache { + valid := p.validateKey(key) + if valid { + return p.fetchSecretWithTimeout(key) + } else { + return "", false + } + } else { + return p.getFromCache(key) + } } // Run initializes the k8s secrets context provider. @@ -75,7 +84,10 @@ func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.Conte p.clientMx.Lock() p.client = client p.clientMx.Unlock() - go p.updateSecrets(ctx) + + if !p.config.DisableCache { + go p.updateSecrets(ctx) + } <-comm.Done() @@ -148,14 +160,11 @@ func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { return pass, ok } -// This is only called by getFromCache function, which is already locking the cache. -func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { +func (p *contextProviderK8sSecrets) validateKey(key string) bool { // Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value" tokens := strings.Split(key, ".") if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" { - return secretsData{ - value: "", - }, false + return false } if len(tokens) != 4 { p.logger.Debugf( @@ -163,6 +172,15 @@ func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { key, "kubernetes_secrets.somenamespace.somesecret.value", ) + return false + } + return true +} + +// This is only called by getFromCache function, which is already locking the cache. +func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { + valid := p.validateKey(key) + if !valid { return secretsData{ value: "", }, false @@ -184,11 +202,11 @@ type Result struct { } func (p *contextProviderK8sSecrets) fetchSecretWithTimeout(key string) (string, bool) { - ctxTimeout, cancel := context.WithTimeout(context.Background(), p.config.Timeout) + ctxTimeout, cancel := context.WithTimeout(context.Background(), p.config.RequestTimeout) defer cancel() resultCh := make(chan Result, 1) - go p.fetchSecret(ctxTimeout, key, resultCh) + p.fetchSecret(ctxTimeout, key, resultCh) select { case <-ctxTimeout.Done(): diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 3fea2dc4555..1e7d93dbcce 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -29,7 +29,7 @@ const ( pass = "testing_passpass" ) -func Test_K8sSecretsProvider_Fetch(t *testing.T) { +func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { client := k8sfake.NewSimpleClientset() secret := &v1.Secret{ TypeMeta: metav1.TypeMeta{ @@ -79,13 +79,20 @@ func Test_K8sSecretsProvider_Fetch(t *testing.T) { <-time.After(10 * time.Millisecond) } - val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") - assert.True(t, found) - assert.Equal(t, val, pass) + val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secretHACK.secret_value") + assert.False(t, found) + assert.EqualValues(t, val, "") } -func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { +func Test_K8sSecretsProvider_Fetch_Cache_Enabled(t *testing.T) { client := k8sfake.NewSimpleClientset() + + ttlDelete, err := time.ParseDuration("1s") + require.NoError(t, err) + + refreshInterval, err := time.ParseDuration("100ms") + require.NoError(t, err) + secret := &v1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", @@ -99,11 +106,17 @@ func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { "secret_value": []byte(pass), }, } - _, err := client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) + _, err = client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) require.NoError(t, err) logger := logp.NewLogger("test_k8s_secrets") - cfg, err := config.NewConfigFrom(map[string]string{"a": "b"}) + + c := map[string]interface{}{ + "refresh_interval": refreshInterval, + "ttl_delete": ttlDelete, + "disable_cache": false, + } + cfg, err := config.NewConfigFrom(c) require.NoError(t, err) p, err := ContextProviderBuilder(logger, cfg, true) @@ -134,20 +147,63 @@ func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { <-time.After(10 * time.Millisecond) } - val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secretHACK.secret_value") - assert.False(t, found) - assert.EqualValues(t, val, "") -} + // Secret cache should be empty at start + fp.secretsCacheMx.Lock() + assert.Equal(t, 0, len(fp.secretsCache)) + fp.secretsCacheMx.Unlock() -func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { - client := k8sfake.NewSimpleClientset() + key := "kubernetes_secrets.test_namespace.testing_secret.secret_value" - ttlDelete, err := time.ParseDuration("1s") - require.NoError(t, err) + // Secret should be in the cache after this call + val, found := fp.Fetch(key) + assert.True(t, found) + assert.Equal(t, val, pass) + fp.secretsCacheMx.RLock() + assert.Equal(t, len(fp.secretsCache), 1) + assert.NotNil(t, fp.secretsCache[key]) + assert.NotZero(t, fp.secretsCache[key].lastAccess) + fp.secretsCacheMx.RUnlock() - refreshInterval, err := time.ParseDuration("100ms") + // Update the secret and check after TTL time, the secret value is correct + newPass := "new-pass" + secret = &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testing_secret", + Namespace: ns, + }, + Data: map[string][]byte{ + "secret_value": []byte(newPass), + }, + } + _, err = client.CoreV1().Secrets(ns).Update(context.Background(), secret, metav1.UpdateOptions{}) require.NoError(t, err) + // wait for ttl update + <-time.After(refreshInterval) + assert.Eventuallyf(t, func() bool { + val, found = fp.Fetch(key) + return found && val == newPass + }, refreshInterval*3, refreshInterval, "Failed to update the secret value after TTL update has passed.") + + // After TTL delete, secret should no longer be found in cache since it was never + // fetched during that time + <-time.After(ttlDelete) + assert.Eventuallyf(t, func() bool { + fp.secretsCacheMx.RLock() + size := len(fp.secretsCache) + fp.secretsCacheMx.RUnlock() + return size == 0 + }, ttlDelete*3, ttlDelete, "Failed to delete the secret after TTL delete has passed.") + +} + +func Test_K8sSecretsProvider_Fetch_Cache_Disabled(t *testing.T) { + client := k8sfake.NewSimpleClientset() + secret := &v1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", @@ -161,14 +217,13 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { "secret_value": []byte(pass), }, } - _, err = client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) + _, err := client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) require.NoError(t, err) logger := logp.NewLogger("test_k8s_secrets") c := map[string]interface{}{ - "refresh_interval": refreshInterval, - "ttl_delete": ttlDelete, + "disable_cache": true, } cfg, err := config.NewConfigFrom(c) require.NoError(t, err) @@ -201,24 +256,14 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { <-time.After(10 * time.Millisecond) } - // Secret cache should be empty at start - fp.secretsCacheMx.Lock() - assert.Equal(t, 0, len(fp.secretsCache)) - fp.secretsCacheMx.Unlock() - key := "kubernetes_secrets.test_namespace.testing_secret.secret_value" // Secret should be in the cache after this call val, found := fp.Fetch(key) assert.True(t, found) assert.Equal(t, val, pass) - fp.secretsCacheMx.RLock() - assert.Equal(t, len(fp.secretsCache), 1) - assert.NotNil(t, fp.secretsCache[key]) - assert.NotZero(t, fp.secretsCache[key].lastAccess) - fp.secretsCacheMx.RUnlock() - // Update the secret and check after TTL time, the secret value is correct + // Update the secret and check the result newPass := "new-pass" secret = &v1.Secret{ TypeMeta: metav1.TypeMeta{ @@ -236,21 +281,12 @@ func Test_K8sSecretsProvider_Check_TTL(t *testing.T) { _, err = client.CoreV1().Secrets(ns).Update(context.Background(), secret, metav1.UpdateOptions{}) require.NoError(t, err) - // wait for ttl update - <-time.After(refreshInterval) - assert.Eventuallyf(t, func() bool { - val, found = fp.Fetch(key) - return found && val == newPass - }, refreshInterval*3, refreshInterval, "Failed to update the secret value after TTL update has passed.") - - // After TTL delete, secret should no longer be found in cache since it was never - // fetched during that time - <-time.After(ttlDelete) - assert.Eventuallyf(t, func() bool { - fp.secretsCacheMx.RLock() - size := len(fp.secretsCache) - fp.secretsCacheMx.RUnlock() - return size == 0 - }, ttlDelete*3, ttlDelete, "Failed to delete the secret after TTL delete has passed.") + val, found = fp.Fetch(key) + assert.True(t, found) + assert.Equal(t, val, newPass) + // Check key that does not exist + val, found = fp.Fetch(key + "doesnotexist") + assert.False(t, found) + assert.Equal(t, "", val) } From 70c16cacfdf302e30a7bdf58937bc2db11c75107 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Mon, 11 Dec 2023 08:20:30 +0100 Subject: [PATCH 22/29] Rename cache fields --- .../pkg/composable/providers/kubernetessecrets/config.go | 8 ++++---- .../providers/kubernetessecrets/kubernetes_secrets.go | 2 +- .../kubernetessecrets/kubernetes_secrets_test.go | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 8d0ca1bcd9a..4e61c1d99bb 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -15,10 +15,10 @@ type Config struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` - RefreshInterval time.Duration `config:"refresh_interval"` - TTLDelete time.Duration `config:"ttl_delete"` - RequestTimeout time.Duration `config:"request_timeout"` - DisableCache bool `config:"disable_cache"` + RefreshInterval time.Duration `config:"cache_refresh_interval"` + TTLDelete time.Duration `config:"cache_ttl_delete"` + RequestTimeout time.Duration `config:"cache_request_timeout"` + DisableCache bool `config:"cache_disable_cache"` } func (c *Config) InitDefaults() { diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 8bf6e53ae3d..2a75b632d16 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -78,7 +78,7 @@ func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) if err != nil { - p.logger.Debugf("Kubernetes_secrets provider skipped, unable to connect: %s", err) + p.logger.Debugf("kubernetes_secrets provider skipped, unable to connect: %s", err) return nil } p.clientMx.Lock() diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 1e7d93dbcce..b58b8b00bd2 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -112,9 +112,9 @@ func Test_K8sSecretsProvider_Fetch_Cache_Enabled(t *testing.T) { logger := logp.NewLogger("test_k8s_secrets") c := map[string]interface{}{ - "refresh_interval": refreshInterval, - "ttl_delete": ttlDelete, - "disable_cache": false, + "cache_refresh_interval": refreshInterval, + "cache_ttl_delete": ttlDelete, + "cache_disable_cache": false, } cfg, err := config.NewConfigFrom(c) require.NoError(t, err) @@ -223,7 +223,7 @@ func Test_K8sSecretsProvider_Fetch_Cache_Disabled(t *testing.T) { logger := logp.NewLogger("test_k8s_secrets") c := map[string]interface{}{ - "disable_cache": true, + "cache_disable_cache": true, } cfg, err := config.NewConfigFrom(c) require.NoError(t, err) From 7f40d9ca9c7dc9e18737080a14986032e2f5a3cb Mon Sep 17 00:00:00 2001 From: constanca-m Date: Wed, 13 Dec 2023 12:38:56 +0100 Subject: [PATCH 23/29] Changes config names --- .../pkg/composable/providers/kubernetessecrets/config.go | 4 ++-- .../providers/kubernetessecrets/kubernetes_secrets_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 4e61c1d99bb..0f021a3aaae 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -16,9 +16,9 @@ type Config struct { KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` RefreshInterval time.Duration `config:"cache_refresh_interval"` - TTLDelete time.Duration `config:"cache_ttl_delete"` + TTLDelete time.Duration `config:"cache_ttl"` RequestTimeout time.Duration `config:"cache_request_timeout"` - DisableCache bool `config:"cache_disable_cache"` + DisableCache bool `config:"cache_disable"` } func (c *Config) InitDefaults() { diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index b58b8b00bd2..78d632d6437 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -113,8 +113,8 @@ func Test_K8sSecretsProvider_Fetch_Cache_Enabled(t *testing.T) { c := map[string]interface{}{ "cache_refresh_interval": refreshInterval, - "cache_ttl_delete": ttlDelete, - "cache_disable_cache": false, + "cache_ttl": ttlDelete, + "cache_disable": false, } cfg, err := config.NewConfigFrom(c) require.NoError(t, err) @@ -223,7 +223,7 @@ func Test_K8sSecretsProvider_Fetch_Cache_Disabled(t *testing.T) { logger := logp.NewLogger("test_k8s_secrets") c := map[string]interface{}{ - "cache_disable_cache": true, + "cache_disable": true, } cfg, err := config.NewConfigFrom(c) require.NoError(t, err) From 62862a32522b66371f9ee5587e9db1a3ca2d2515 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Wed, 13 Dec 2023 18:17:47 +0100 Subject: [PATCH 24/29] Merge maps --- .../kubernetessecrets/kubernetes_secrets.go | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 2a75b632d16..2700014db56 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -115,12 +115,29 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { } } +// mergeWithCurrent merges the updated map with the cache map. +// This function needs to be called between the mutex lock for the map. +func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) map[string]*secretsData { + merged := make(map[string]*secretsData) + for name, data := range p.secretsCache { + lastAccess := data.lastAccess + diff := time.Since(lastAccess) + if diff < p.config.TTLDelete { + merged[name] = data + } + } + for name, data := range updatedMap { + merged[name] = data + } + return merged +} + func (p *contextProviderK8sSecrets) updateCache() { // deleting entries does not free the memory, so we need to create a new map // to place the secrets we want to keep cacheTmp := make(map[string]*secretsData) - p.secretsCacheMx.Lock() + p.secretsCacheMx.RLock() for name, data := range p.secretsCache { diff := time.Since(data.lastAccess) if diff < p.config.TTLDelete { @@ -135,7 +152,13 @@ func (p *contextProviderK8sSecrets) updateCache() { } } - p.secretsCache = cacheTmp + p.secretsCacheMx.RUnlock() + + // While the cache was updated, it is possible that some secret was added through another go routine. + // We need to merge the updated map with the current cache map to catch the new entries and avoid + // loss of data. + p.secretsCacheMx.Lock() + p.secretsCache = p.mergeWithCurrent(cacheTmp) p.secretsCacheMx.Unlock() } From 7642f09580aef236b5cdc8fc27beae68a0d21e2f Mon Sep 17 00:00:00 2001 From: constanca-m Date: Wed, 13 Dec 2023 18:19:24 +0100 Subject: [PATCH 25/29] Merge maps --- .../providers/kubernetessecrets/kubernetes_secrets.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 2700014db56..cbdc50df6b4 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -120,8 +120,7 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) map[string]*secretsData { merged := make(map[string]*secretsData) for name, data := range p.secretsCache { - lastAccess := data.lastAccess - diff := time.Since(lastAccess) + diff := time.Since(data.lastAccess) if diff < p.config.TTLDelete { merged[name] = data } From ab768054e449150be7401b757b2a4773a070a93e Mon Sep 17 00:00:00 2001 From: constanca-m Date: Thu, 14 Dec 2023 11:54:38 +0100 Subject: [PATCH 26/29] Merge maps, fix mistake --- .../providers/kubernetessecrets/kubernetes_secrets.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index cbdc50df6b4..a20056b2acc 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -119,15 +119,23 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { // This function needs to be called between the mutex lock for the map. func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) map[string]*secretsData { merged := make(map[string]*secretsData) + for name, data := range p.secretsCache { diff := time.Since(data.lastAccess) if diff < p.config.TTLDelete { merged[name] = data } } + for name, data := range updatedMap { - merged[name] = data + // We need to check if the key is already in the new map. If it is, lastAccess cannot be overwritten since + // it could have been updated when trying to fetch the secret at the same time we are running update cache. + // In that case, we only update the value. + if _, ok := merged[name]; ok { + merged[name].value = data.value + } } + return merged } From 4f3ab488e4e943ba474e772d7ec61eeaa4630c21 Mon Sep 17 00:00:00 2001 From: constanca-m Date: Thu, 14 Dec 2023 15:50:10 +0100 Subject: [PATCH 27/29] Change locks to reading locks --- .../kubernetessecrets/kubernetes_secrets.go | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index a20056b2acc..0037bf278dd 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -170,22 +170,24 @@ func (p *contextProviderK8sSecrets) updateCache() { } func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { - p.secretsCacheMx.Lock() - defer p.secretsCacheMx.Unlock() - + p.secretsCacheMx.RLock() _, ok := p.secretsCache[key] + p.secretsCacheMx.RUnlock() + // if value is still not present in cache, it is possible we haven't tried to fetch it yet if !ok { - data, ok := p.addToCache(key) + value, ok := p.addToCache(key) // if it was not possible to fetch the secret, return if !ok { - return data.value, ok + return value, ok } } + p.secretsCacheMx.Lock() data, ok := p.secretsCache[key] data.lastAccess = time.Now() pass := data.value + p.secretsCacheMx.Unlock() return pass, ok } @@ -208,22 +210,19 @@ func (p *contextProviderK8sSecrets) validateKey(key string) bool { } // This is only called by getFromCache function, which is already locking the cache. -func (p *contextProviderK8sSecrets) addToCache(key string) (secretsData, bool) { +func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) { valid := p.validateKey(key) if !valid { - return secretsData{ - value: "", - }, false + return "", false } value, ok := p.fetchSecretWithTimeout(key) - data := secretsData{ - value: value, - } if ok { - p.secretsCache[key] = &data + p.secretsCacheMx.Lock() + p.secretsCache[key] = &secretsData{value: value} + p.secretsCacheMx.Unlock() } - return data, ok + return value, ok } type Result struct { From 044e7d1bf40eb158a7e35e6a6fb370d1efcd700b Mon Sep 17 00:00:00 2001 From: constanca-m Date: Thu, 14 Dec 2023 15:52:04 +0100 Subject: [PATCH 28/29] Change locks to reading locks --- .../composable/providers/kubernetessecrets/kubernetes_secrets.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 0037bf278dd..280333d967b 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -209,7 +209,6 @@ func (p *contextProviderK8sSecrets) validateKey(key string) bool { return true } -// This is only called by getFromCache function, which is already locking the cache. func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) { valid := p.validateKey(key) if !valid { From 8b7da6c83d835d95326c5af6aabc7e268af4c14d Mon Sep 17 00:00:00 2001 From: constanca-m Date: Thu, 14 Dec 2023 18:15:08 +0100 Subject: [PATCH 29/29] Remove read lock for iteration --- .../providers/kubernetessecrets/kubernetes_secrets.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 280333d967b..4bcf90470b3 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -144,8 +144,15 @@ func (p *contextProviderK8sSecrets) updateCache() { // to place the secrets we want to keep cacheTmp := make(map[string]*secretsData) + // to not hold the lock for long, we copy the current state of the cache map + copyMap := make(map[string]secretsData) p.secretsCacheMx.RLock() for name, data := range p.secretsCache { + copyMap[name] = *data + } + p.secretsCacheMx.RUnlock() + + for name, data := range copyMap { diff := time.Since(data.lastAccess) if diff < p.config.TTLDelete { value, ok := p.fetchSecretWithTimeout(name) @@ -159,7 +166,6 @@ func (p *contextProviderK8sSecrets) updateCache() { } } - p.secretsCacheMx.RUnlock() // While the cache was updated, it is possible that some secret was added through another go routine. // We need to merge the updated map with the current cache map to catch the new entries and avoid