Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition between service account availability and webhook invocation #236

Merged
merged 9 commits into from
Sep 12, 2024
63 changes: 32 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,37 +143,38 @@ When running a container with a non-root user, you need to give the container ac

```
Usage of amazon-eks-pod-identity-webhook:
--add_dir_header If true, adds the file directory to the header
--alsologtostderr log to standard error as well as files
--annotation-prefix string The Service Account annotation to look for (default "eks.amazonaws.com")
--aws-default-region string If set, AWS_DEFAULT_REGION and AWS_REGION will be set to this value in mutated containers
--enable-debugging-handlers Enable debugging handlers. Currently /debug/alpha/cache is supported
--in-cluster Use in-cluster authentication and certificate request API (default true)
--kube-api string (out-of-cluster) The url to the API server
--kubeconfig string (out-of-cluster) Absolute path to the API server kubeconfig file
--log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0)
--log_dir string If non-empty, write log files in this directory
--log_file string If non-empty, use this log file
--log_file_max_size uint Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. (default 1800)
--logtostderr log to standard error instead of files (default true)
--metrics-port int Port to listen on for metrics (http) (default 9999)
--namespace string (in-cluster) The namespace name this webhook, the TLS secret, and configmap resides in (default "eks")
--port int Port to listen on (default 443)
--service-name string (in-cluster) The service name fronting this webhook (default "pod-identity-webhook")
--skip_headers If true, avoid header prefixes in the log messages
--skip_log_headers If true, avoid headers when opening log files
--stderrthreshold severity logs at or above this threshold go to stderr (default 2)
--sts-regional-endpoint false Whether to inject the AWS_STS_REGIONAL_ENDPOINTS=regional env var in mutated pods. Defaults to false.
--tls-cert string (out-of-cluster) TLS certificate file path (default "/etc/webhook/certs/tls.crt")
--tls-key string (out-of-cluster) TLS key file path (default "/etc/webhook/certs/tls.key")
--tls-secret string (in-cluster) The secret name for storing the TLS serving cert (default "pod-identity-webhook")
--token-audience string The default audience for tokens. Can be overridden by annotation (default "sts.amazonaws.com")
--token-expiration int The token expiration (default 86400)
--token-mount-path string The path to mount tokens (default "/var/run/secrets/eks.amazonaws.com/serviceaccount")
-v, --v Level number for the log level verbosity
--version Display the version and exit
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
--watch-config-map Enables watching serviceaccounts that are configured through the pod-identity-webhook configmap instead of using annotations
--add_dir_header If true, adds the file directory to the header
--alsologtostderr log to standard error as well as files
--annotation-prefix string The Service Account annotation to look for (default "eks.amazonaws.com")
--aws-default-region string If set, AWS_DEFAULT_REGION and AWS_REGION will be set to this value in mutated containers
--enable-debugging-handlers Enable debugging handlers. Currently /debug/alpha/cache is supported
--in-cluster Use in-cluster authentication and certificate request API (default true)
--kube-api string (out-of-cluster) The url to the API server
--kubeconfig string (out-of-cluster) Absolute path to the API server kubeconfig file
--log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0)
--log_dir string If non-empty, write log files in this directory
--log_file string If non-empty, use this log file
--log_file_max_size uint Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. (default 1800)
--logtostderr log to standard error instead of files (default true)
--metrics-port int Port to listen on for metrics (http) (default 9999)
--namespace string (in-cluster) The namespace name this webhook, the TLS secret, and configmap resides in (default "eks")
--port int Port to listen on (default 443)
--service-name string (in-cluster) The service name fronting this webhook (default "pod-identity-webhook")
--service-account-lookup-grace-period The grace period for service account to be available in cache before not mutating a pod. Set to 0 to deactivate waiting. Carefully use higher values as it may have significant impact on Kubernetes' pod scheduling performance. (default 100ms)
--skip_headers If true, avoid header prefixes in the log messages
--skip_log_headers If true, avoid headers when opening log files
--stderrthreshold severity logs at or above this threshold go to stderr (default 2)
--sts-regional-endpoint false Whether to inject the AWS_STS_REGIONAL_ENDPOINTS=regional env var in mutated pods. Defaults to false.
--tls-cert string (out-of-cluster) TLS certificate file path (default "/etc/webhook/certs/tls.crt")
--tls-key string (out-of-cluster) TLS key file path (default "/etc/webhook/certs/tls.key")
--tls-secret string (in-cluster) The secret name for storing the TLS serving cert (default "pod-identity-webhook")
--token-audience string The default audience for tokens. Can be overridden by annotation (default "sts.amazonaws.com")
--token-expiration int The token expiration (default 86400)
--token-mount-path string The path to mount tokens (default "/var/run/secrets/eks.amazonaws.com/serviceaccount")
-v, --v Level number for the log level verbosity
--version Display the version and exit
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
--watch-config-map Enables watching serviceaccounts that are configured through the pod-identity-webhook configmap instead of using annotations
```

### AWS_DEFAULT_REGION Injection
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func main() {

debug := flag.Bool("enable-debugging-handlers", false, "Enable debugging handlers. Currently /debug/alpha/cache is supported")

saLookupGracePeriod := flag.Duration("service-account-lookup-grace-period", 100*time.Millisecond, "The grace period for service account to be available in cache before not mutating a pod. Defaults to 100ms. Set to 0 to deactivate waiting. Carefully use higher values as it may have significant impact on Kubernetes' pod scheduling performance.")
modulitos marked this conversation as resolved.
Show resolved Hide resolved

klog.InitFlags(goflag.CommandLine)
// Add klog CommandLine flags to pflag CommandLine
goflag.CommandLine.VisitAll(func(f *goflag.Flag) {
Expand Down Expand Up @@ -208,6 +210,7 @@ func main() {
handler.WithServiceAccountCache(saCache),
handler.WithContainerCredentialsConfig(containerCredentialsConfig),
handler.WithRegion(*region),
handler.WithSALookupGraceTime(*saLookupGracePeriod),
)

addr := fmt.Sprintf(":%d", *port)
Expand Down
165 changes: 113 additions & 52 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,48 @@ import (
"k8s.io/klog/v2"
)

type CacheResponse struct {
type Entry struct {
RoleARN string
Audience string
UseRegionalSTS bool
TokenExpiration int64
}

type Request struct {
Name string
Namespace string
RequestNotification bool
}

func (r Request) CacheKey() string {
return r.Namespace + "/" + r.Name
}
func (r Request) WithNotification() Request {
modulitos marked this conversation as resolved.
Show resolved Hide resolved
r.RequestNotification = true
return r
}

type Response struct {
RoleARN string
Audience string
UseRegionalSTS bool
TokenExpiration int64
modulitos marked this conversation as resolved.
Show resolved Hide resolved
FoundInCache bool
Notifier <-chan struct{}
}
roehrijn marked this conversation as resolved.
Show resolved Hide resolved

type ServiceAccountCache interface {
Start(stop chan struct{})
Get(name, namespace string) (role, aud string, useRegionalSTS bool, tokenExpiration int64)
Get(request Request) Response
GetCommonConfigurations(name, namespace string) (useRegionalSTS bool, tokenExpiration int64)
// ToJSON returns cache contents as JSON string
ToJSON() string
}

type serviceAccountCache struct {
mu sync.RWMutex // guards cache
saCache map[string]*CacheResponse
cmCache map[string]*CacheResponse
saCache map[string]*Entry
cmCache map[string]*Entry
hasSynced cache.InformerSynced
clientset kubernetes.Interface
annotationPrefix string
Expand All @@ -60,6 +83,8 @@ type serviceAccountCache struct {
composeRoleArn ComposeRoleArn
defaultTokenExpiration int64
webhookUsage prometheus.Gauge
notificationHandlers map[string]chan struct{}
handlerMu sync.Mutex
}

type ComposeRoleArn struct {
Expand All @@ -85,56 +110,81 @@ func init() {
}

// Get will return the cached configuration of the given ServiceAccount.
// It will first look at the set of ServiceAccounts configured using annotations. If none are found, it will look for any
// ServiceAccount configured through the pod-identity-webhook ConfigMap.
func (c *serviceAccountCache) Get(name, namespace string) (role, aud string, useRegionalSTS bool, tokenExpiration int64) {
klog.V(5).Infof("Fetching sa %s/%s from cache", namespace, name)
// It will first look at the set of ServiceAccounts configured using annotations. If none is found and a notifier is
// requested, it will register a handler to be notified as soon as a ServiceAccount with given key is populated to the
// cache. Afterward it will check for a ServiceAccount configured through the pod-identity-webhook ConfigMap.
func (c *serviceAccountCache) Get(req Request) Response {
result := Response{
TokenExpiration: pkg.DefaultTokenExpiration,
}
klog.V(5).Infof("Fetching sa %s from cache", req.CacheKey())
{
resp := c.getSA(name, namespace)
if resp != nil && resp.RoleARN != "" {
return resp.RoleARN, resp.Audience, resp.UseRegionalSTS, resp.TokenExpiration
var entry *Entry
entry, result.Notifier = c.getSA(req)
if entry != nil {
result.FoundInCache = true
}
if entry != nil && entry.RoleARN != "" {
result.RoleARN = entry.RoleARN
result.Audience = entry.Audience
result.UseRegionalSTS = entry.UseRegionalSTS
result.TokenExpiration = entry.TokenExpiration
return result
}
}
{
resp := c.getCM(name, namespace)
if resp != nil {
return resp.RoleARN, resp.Audience, resp.UseRegionalSTS, resp.TokenExpiration
entry := c.getCM(req.Name, req.Namespace)
if entry != nil {
result.FoundInCache = true
result.RoleARN = entry.RoleARN
result.Audience = entry.Audience
result.UseRegionalSTS = entry.UseRegionalSTS
result.TokenExpiration = entry.TokenExpiration
return result
}
}
klog.V(5).Infof("Service account %s/%s not found in cache", namespace, name)
return "", "", false, pkg.DefaultTokenExpiration
klog.V(5).Infof("Service account %s not found in cache", req.CacheKey())
return result
}

// GetCommonConfigurations returns the common configurations that also applies to the new mutation method(i.e Container Credentials).
// The config file for the container credentials does not contain "TokenExpiration" or "UseRegionalSTS". For backward compatibility,
// Use these fields if they are set in the sa annotations or config map.
func (c *serviceAccountCache) GetCommonConfigurations(name, namespace string) (useRegionalSTS bool, tokenExpiration int64) {
if resp := c.getSA(name, namespace); resp != nil {
return resp.UseRegionalSTS, resp.TokenExpiration
} else if resp := c.getCM(name, namespace); resp != nil {
return resp.UseRegionalSTS, resp.TokenExpiration
if entry, _ := c.getSA(Request{Name: name, Namespace: namespace, RequestNotification: false}); entry != nil {
return entry.UseRegionalSTS, entry.TokenExpiration
} else if entry := c.getCM(name, namespace); entry != nil {
return entry.UseRegionalSTS, entry.TokenExpiration
}
return false, pkg.DefaultTokenExpiration
}

func (c *serviceAccountCache) getSA(name, namespace string) *CacheResponse {
func (c *serviceAccountCache) getSA(req Request) (*Entry, chan struct{}) {
c.mu.RLock()
defer c.mu.RUnlock()
resp, ok := c.saCache[namespace+"/"+name]
if !ok {
return nil
entry, ok := c.saCache[req.CacheKey()]
if !ok && req.RequestNotification {
klog.V(5).Infof("Service Account %s not found in cache, adding notification handler", req.CacheKey())
c.handlerMu.Lock()
defer c.handlerMu.Unlock()
notifier, found := c.notificationHandlers[req.CacheKey()]
if !found {
notifier = make(chan struct{})
c.notificationHandlers[req.CacheKey()] = notifier
}
return nil, notifier
}
return resp
return entry, nil
}

func (c *serviceAccountCache) getCM(name, namespace string) *CacheResponse {
func (c *serviceAccountCache) getCM(name, namespace string) *Entry {
c.mu.RLock()
defer c.mu.RUnlock()
resp, ok := c.cmCache[namespace+"/"+name]
entry, ok := c.cmCache[namespace+"/"+name]
if !ok {
return nil
}
return resp
return entry
}

func (c *serviceAccountCache) popSA(name, namespace string) {
Expand Down Expand Up @@ -164,7 +214,7 @@ func (c *serviceAccountCache) ToJSON() string {
}

func (c *serviceAccountCache) addSA(sa *v1.ServiceAccount) {
resp := &CacheResponse{}
entry := &Entry{}

arn, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.RoleARNAnnotation]
if ok {
Expand All @@ -178,49 +228,59 @@ func (c *serviceAccountCache) addSA(sa *v1.ServiceAccount) {
} else if !matched {
klog.Warningf("arn is invalid: %s", arn)
}
resp.RoleARN = arn
entry.RoleARN = arn
}

resp.Audience = c.defaultAudience
entry.Audience = c.defaultAudience
if audience, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.AudienceAnnotation]; ok {
resp.Audience = audience
entry.Audience = audience
}

resp.UseRegionalSTS = c.defaultRegionalSTS
entry.UseRegionalSTS = c.defaultRegionalSTS
if useRegionalStr, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.UseRegionalSTSAnnotation]; ok {
useRegional, err := strconv.ParseBool(useRegionalStr)
if err != nil {
klog.V(4).Infof("Ignoring service account %s/%s invalid value for disable-regional-sts annotation", sa.Namespace, sa.Name)
} else {
resp.UseRegionalSTS = useRegional
entry.UseRegionalSTS = useRegional
}
}

resp.TokenExpiration = c.defaultTokenExpiration
entry.TokenExpiration = c.defaultTokenExpiration
if tokenExpirationStr, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.TokenExpirationAnnotation]; ok {
if tokenExpiration, err := strconv.ParseInt(tokenExpirationStr, 10, 64); err != nil {
klog.V(4).Infof("Found invalid value for token expiration, using %d seconds as default: %v", resp.TokenExpiration, err)
klog.V(4).Infof("Found invalid value for token expiration, using %d seconds as default: %v", entry.TokenExpiration, err)
} else {
resp.TokenExpiration = pkg.ValidateMinTokenExpiration(tokenExpiration)
entry.TokenExpiration = pkg.ValidateMinTokenExpiration(tokenExpiration)
}
}
c.webhookUsage.Set(1)

c.setSA(sa.Name, sa.Namespace, resp)
c.setSA(sa.Name, sa.Namespace, entry)
}

func (c *serviceAccountCache) setSA(name, namespace string, resp *CacheResponse) {
func (c *serviceAccountCache) setSA(name, namespace string, entry *Entry) {
c.mu.Lock()
defer c.mu.Unlock()
klog.V(5).Infof("Adding SA %s/%s to SA cache: %+v", namespace, name, resp)
c.saCache[namespace+"/"+name] = resp

key := namespace + "/" + name
klog.V(5).Infof("Adding SA %q to SA cache: %+v", key, entry)
c.saCache[key] = entry

c.handlerMu.Lock()
defer c.handlerMu.Unlock()
if handler, found := c.notificationHandlers[key]; found {
klog.V(5).Infof("Notifying handlers for %q", key)
close(handler)
delete(c.notificationHandlers, key)
}
}

func (c *serviceAccountCache) setCM(name, namespace string, resp *CacheResponse) {
func (c *serviceAccountCache) setCM(name, namespace string, entry *Entry) {
c.mu.Lock()
defer c.mu.Unlock()
klog.V(5).Infof("Adding SA %s/%s to CM cache: %+v", namespace, name, resp)
c.cmCache[namespace+"/"+name] = resp
klog.V(5).Infof("Adding SA %s/%s to CM cache: %+v", namespace, name, entry)
c.cmCache[namespace+"/"+name] = entry
}

func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenExpiration int64, saInformer coreinformers.ServiceAccountInformer, cmInformer coreinformers.ConfigMapInformer, composeRoleArn ComposeRoleArn) ServiceAccountCache {
Expand All @@ -233,15 +293,16 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
}

c := &serviceAccountCache{
saCache: map[string]*CacheResponse{},
cmCache: map[string]*CacheResponse{},
saCache: map[string]*Entry{},
cmCache: map[string]*Entry{},
defaultAudience: defaultAudience,
annotationPrefix: prefix,
defaultRegionalSTS: defaultRegionalSTS,
composeRoleArn: composeRoleArn,
defaultTokenExpiration: defaultTokenExpiration,
hasSynced: hasSynced,
webhookUsage: webhookUsage,
notificationHandlers: map[string]chan struct{}{},
}

saInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -298,22 +359,22 @@ func (c *serviceAccountCache) populateCacheFromCM(oldCM, newCM *v1.ConfigMap) er
return nil
}
newConfig := newCM.Data["config"]
sas := make(map[string]*CacheResponse)
sas := make(map[string]*Entry)
err := json.Unmarshal([]byte(newConfig), &sas)
if err != nil {
return fmt.Errorf("failed to unmarshal new config %q: %v", newConfig, err)
}
for key, resp := range sas {
for key, entry := range sas {
parts := strings.Split(key, "/")
if resp.TokenExpiration == 0 {
resp.TokenExpiration = c.defaultTokenExpiration
if entry.TokenExpiration == 0 {
entry.TokenExpiration = c.defaultTokenExpiration
}
c.setCM(parts[1], parts[0], resp)
c.setCM(parts[1], parts[0], entry)
}

if oldCM != nil {
oldConfig := oldCM.Data["config"]
oldCache := make(map[string]*CacheResponse)
oldCache := make(map[string]*Entry)
err := json.Unmarshal([]byte(oldConfig), &oldCache)
if err != nil {
return fmt.Errorf("failed to unmarshal old config %q: %v", oldConfig, err)
Expand Down
Loading
Loading