diff --git a/changelog/fragments/1701091034-add-cache-for-secrets.yaml b/changelog/fragments/1701091034-add-cache-for-secrets.yaml new file mode 100644 index 00000000000..3c52bafa577 --- /dev/null +++ b/changelog/fragments/1701091034-add-cache-for-secrets.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: add cache for secrets when using kubernetes secret provider + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3822 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3594 diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 95ff308c3aa..0f021a3aaae 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -4,10 +4,26 @@ package kubernetessecrets -import "github.com/elastic/elastic-agent-autodiscover/kubernetes" +import ( + "time" + + "github.com/elastic/elastic-agent-autodiscover/kubernetes" +) // Config for kubernetes provider type Config struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` + + RefreshInterval time.Duration `config:"cache_refresh_interval"` + TTLDelete time.Duration `config:"cache_ttl"` + RequestTimeout time.Duration `config:"cache_request_timeout"` + DisableCache bool `config:"cache_disable"` +} + +func (c *Config) InitDefaults() { + c.RefreshInterval = 60 * time.Second + c.TTLDelete = 1 * time.Hour + c.RequestTimeout = 5 * time.Second + c.DisableCache = false } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 1537a232dd1..4bcf90470b3 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -8,6 +8,7 @@ import ( "context" "strings" "sync" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sclient "k8s.io/client-go/kubernetes" @@ -33,6 +34,14 @@ type contextProviderK8sSecrets struct { clientMx sync.Mutex client k8sclient.Interface + + secretsCacheMx sync.RWMutex + secretsCache map[string]*secretsData +} + +type secretsData struct { + value string + lastAccess time.Time } // ContextProviderBuilder builds the context provider. @@ -46,22 +55,154 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo return nil, errors.New(err, "failed to unpack configuration") } return &contextProviderK8sSecrets{ - logger: logger, - config: &cfg, + logger: logger, + config: &cfg, + secretsCache: make(map[string]*secretsData), }, nil } func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { - // key = "kubernetes_secrets.somenamespace.somesecret.value" + if p.config.DisableCache { + valid := p.validateKey(key) + if valid { + return p.fetchSecretWithTimeout(key) + } else { + return "", false + } + } else { + return p.getFromCache(key) + } +} + +// Run initializes the k8s secrets context provider. +func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { + client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) + if err != nil { + p.logger.Debugf("kubernetes_secrets provider skipped, unable to connect: %s", err) + return nil + } p.clientMx.Lock() - client := p.client + p.client = client p.clientMx.Unlock() - if client == nil { - return "", false + + if !p.config.DisableCache { + go p.updateSecrets(ctx) } + + <-comm.Done() + + p.clientMx.Lock() + p.client = nil + p.clientMx.Unlock() + return comm.Err() +} + +func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { + return kubernetes.GetKubernetesClient(kubeconfig, opt) +} + +// Update the secrets in the cache every RefreshInterval +func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) { + timer := time.NewTimer(p.config.RefreshInterval) + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + p.updateCache() + timer.Reset(p.config.RefreshInterval) + } + } +} + +// mergeWithCurrent merges the updated map with the cache map. +// This function needs to be called between the mutex lock for the map. +func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) map[string]*secretsData { + merged := make(map[string]*secretsData) + + for name, data := range p.secretsCache { + diff := time.Since(data.lastAccess) + if diff < p.config.TTLDelete { + merged[name] = data + } + } + + for name, data := range updatedMap { + // We need to check if the key is already in the new map. If it is, lastAccess cannot be overwritten since + // it could have been updated when trying to fetch the secret at the same time we are running update cache. + // In that case, we only update the value. + if _, ok := merged[name]; ok { + merged[name].value = data.value + } + } + + return merged +} + +func (p *contextProviderK8sSecrets) updateCache() { + // deleting entries does not free the memory, so we need to create a new map + // to place the secrets we want to keep + cacheTmp := make(map[string]*secretsData) + + // to not hold the lock for long, we copy the current state of the cache map + copyMap := make(map[string]secretsData) + p.secretsCacheMx.RLock() + for name, data := range p.secretsCache { + copyMap[name] = *data + } + p.secretsCacheMx.RUnlock() + + for name, data := range copyMap { + diff := time.Since(data.lastAccess) + if diff < p.config.TTLDelete { + value, ok := p.fetchSecretWithTimeout(name) + if ok { + newData := &secretsData{ + value: value, + lastAccess: data.lastAccess, + } + cacheTmp[name] = newData + } + + } + } + + // While the cache was updated, it is possible that some secret was added through another go routine. + // We need to merge the updated map with the current cache map to catch the new entries and avoid + // loss of data. + p.secretsCacheMx.Lock() + p.secretsCache = p.mergeWithCurrent(cacheTmp) + p.secretsCacheMx.Unlock() +} + +func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { + p.secretsCacheMx.RLock() + _, ok := p.secretsCache[key] + p.secretsCacheMx.RUnlock() + + // if value is still not present in cache, it is possible we haven't tried to fetch it yet + if !ok { + value, ok := p.addToCache(key) + // if it was not possible to fetch the secret, return + if !ok { + return value, ok + } + } + + p.secretsCacheMx.Lock() + data, ok := p.secretsCache[key] + data.lastAccess = time.Now() + pass := data.value + p.secretsCacheMx.Unlock() + + return pass, ok +} + +func (p *contextProviderK8sSecrets) validateKey(key string) bool { + // Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value" tokens := strings.Split(key, ".") if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" { - return "", false + return false } if len(tokens) != 4 { p.logger.Debugf( @@ -69,44 +210,80 @@ func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { key, "kubernetes_secrets.somenamespace.somesecret.value", ) + return false + } + return true +} + +func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) { + valid := p.validateKey(key) + if !valid { return "", false } + + value, ok := p.fetchSecretWithTimeout(key) + if ok { + p.secretsCacheMx.Lock() + p.secretsCache[key] = &secretsData{value: value} + p.secretsCacheMx.Unlock() + } + return value, ok +} + +type Result struct { + value string + ok bool +} + +func (p *contextProviderK8sSecrets) fetchSecretWithTimeout(key string) (string, bool) { + ctxTimeout, cancel := context.WithTimeout(context.Background(), p.config.RequestTimeout) + defer cancel() + + resultCh := make(chan Result, 1) + p.fetchSecret(ctxTimeout, key, resultCh) + + select { + case <-ctxTimeout.Done(): + p.logger.Errorf("Could not retrieve value for key %v: %v", key, ctxTimeout.Err()) + return "", false + case result := <-resultCh: + return result.value, result.ok + } +} + +func (p *contextProviderK8sSecrets) fetchSecret(context context.Context, key string, resultCh chan Result) { + p.clientMx.Lock() + client := p.client + p.clientMx.Unlock() + if client == nil { + resultCh <- Result{value: "", ok: false} + return + } + + tokens := strings.Split(key, ".") + // key has the format "kubernetes_secrets.somenamespace.somesecret.value" + // This function is only called from: + // - addToCache, where we already validated that the key has the right format. + // - updateCache, where the results are only added to the cache through addToCache + // Because of this we no longer need to validate the key ns := tokens[1] secretName := tokens[2] secretVar := tokens[3] - secretIntefrace := client.CoreV1().Secrets(ns) - ctx := context.TODO() - secret, err := secretIntefrace.Get(ctx, secretName, metav1.GetOptions{}) + secretInterface := client.CoreV1().Secrets(ns) + secret, err := secretInterface.Get(context, secretName, metav1.GetOptions{}) + if err != nil { p.logger.Errorf("Could not retrieve secret from k8s API: %v", err) - return "", false + resultCh <- Result{value: "", ok: false} + return } if _, ok := secret.Data[secretVar]; !ok { p.logger.Errorf("Could not retrieve value %v for secret %v", secretVar, secretName) - return "", false + resultCh <- Result{value: "", ok: false} + return } - secretString := secret.Data[secretVar] - return string(secretString), true -} -// Run initializes the k8s secrets context provider. -func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { - client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) - if err != nil { - p.logger.Debugf("Kubernetes_secrets provider skipped, unable to connect: %s", err) - return nil - } - p.clientMx.Lock() - p.client = client - p.clientMx.Unlock() - <-comm.Done() - p.clientMx.Lock() - p.client = nil - p.clientMx.Unlock() - return comm.Err() -} - -func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { - return kubernetes.GetKubernetesClient(kubeconfig, opt) + secretString := secret.Data[secretVar] + resultCh <- Result{value: string(secretString), ok: true} } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 9924c84e6bc..78d632d6437 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -29,7 +29,7 @@ const ( pass = "testing_passpass" ) -func Test_K8sSecretsProvider_Fetch(t *testing.T) { +func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { client := k8sfake.NewSimpleClientset() secret := &v1.Secret{ TypeMeta: metav1.TypeMeta{ @@ -79,13 +79,131 @@ func Test_K8sSecretsProvider_Fetch(t *testing.T) { <-time.After(10 * time.Millisecond) } - val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secret.secret_value") + val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secretHACK.secret_value") + assert.False(t, found) + assert.EqualValues(t, val, "") +} + +func Test_K8sSecretsProvider_Fetch_Cache_Enabled(t *testing.T) { + client := k8sfake.NewSimpleClientset() + + ttlDelete, err := time.ParseDuration("1s") + require.NoError(t, err) + + refreshInterval, err := time.ParseDuration("100ms") + require.NoError(t, err) + + secret := &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testing_secret", + Namespace: ns, + }, + Data: map[string][]byte{ + "secret_value": []byte(pass), + }, + } + _, err = client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) + require.NoError(t, err) + + logger := logp.NewLogger("test_k8s_secrets") + + c := map[string]interface{}{ + "cache_refresh_interval": refreshInterval, + "cache_ttl": ttlDelete, + "cache_disable": false, + } + cfg, err := config.NewConfigFrom(c) + require.NoError(t, err) + + p, err := ContextProviderBuilder(logger, cfg, true) + require.NoError(t, err) + + fp, _ := p.(*contextProviderK8sSecrets) + + getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { + return client, nil + } + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + comm := ctesting.NewContextComm(ctx) + + go func() { + _ = fp.Run(ctx, comm) + }() + + for { + fp.clientMx.Lock() + client := fp.client + fp.clientMx.Unlock() + if client != nil { + break + } + <-time.After(10 * time.Millisecond) + } + + // Secret cache should be empty at start + fp.secretsCacheMx.Lock() + assert.Equal(t, 0, len(fp.secretsCache)) + fp.secretsCacheMx.Unlock() + + key := "kubernetes_secrets.test_namespace.testing_secret.secret_value" + + // Secret should be in the cache after this call + val, found := fp.Fetch(key) assert.True(t, found) assert.Equal(t, val, pass) + fp.secretsCacheMx.RLock() + assert.Equal(t, len(fp.secretsCache), 1) + assert.NotNil(t, fp.secretsCache[key]) + assert.NotZero(t, fp.secretsCache[key].lastAccess) + fp.secretsCacheMx.RUnlock() + + // Update the secret and check after TTL time, the secret value is correct + newPass := "new-pass" + secret = &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testing_secret", + Namespace: ns, + }, + Data: map[string][]byte{ + "secret_value": []byte(newPass), + }, + } + _, err = client.CoreV1().Secrets(ns).Update(context.Background(), secret, metav1.UpdateOptions{}) + require.NoError(t, err) + + // wait for ttl update + <-time.After(refreshInterval) + assert.Eventuallyf(t, func() bool { + val, found = fp.Fetch(key) + return found && val == newPass + }, refreshInterval*3, refreshInterval, "Failed to update the secret value after TTL update has passed.") + + // After TTL delete, secret should no longer be found in cache since it was never + // fetched during that time + <-time.After(ttlDelete) + assert.Eventuallyf(t, func() bool { + fp.secretsCacheMx.RLock() + size := len(fp.secretsCache) + fp.secretsCacheMx.RUnlock() + return size == 0 + }, ttlDelete*3, ttlDelete, "Failed to delete the secret after TTL delete has passed.") + } -func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { +func Test_K8sSecretsProvider_Fetch_Cache_Disabled(t *testing.T) { client := k8sfake.NewSimpleClientset() + secret := &v1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", @@ -103,7 +221,11 @@ func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { require.NoError(t, err) logger := logp.NewLogger("test_k8s_secrets") - cfg, err := config.NewConfigFrom(map[string]string{"a": "b"}) + + c := map[string]interface{}{ + "cache_disable": true, + } + cfg, err := config.NewConfigFrom(c) require.NoError(t, err) p, err := ContextProviderBuilder(logger, cfg, true) @@ -134,7 +256,37 @@ func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { <-time.After(10 * time.Millisecond) } - val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secretHACK.secret_value") + key := "kubernetes_secrets.test_namespace.testing_secret.secret_value" + + // Secret should be in the cache after this call + val, found := fp.Fetch(key) + assert.True(t, found) + assert.Equal(t, val, pass) + + // Update the secret and check the result + newPass := "new-pass" + secret = &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "apps/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "testing_secret", + Namespace: ns, + }, + Data: map[string][]byte{ + "secret_value": []byte(newPass), + }, + } + _, err = client.CoreV1().Secrets(ns).Update(context.Background(), secret, metav1.UpdateOptions{}) + require.NoError(t, err) + + val, found = fp.Fetch(key) + assert.True(t, found) + assert.Equal(t, val, newPass) + + // Check key that does not exist + val, found = fp.Fetch(key + "doesnotexist") assert.False(t, found) - assert.EqualValues(t, val, "") + assert.Equal(t, "", val) }