From 2621555f1976957abf56300b8cebee60382594da Mon Sep 17 00:00:00 2001 From: Alex Angelini Date: Wed, 23 Nov 2022 14:30:15 -0500 Subject: [PATCH 01/13] Force delete pods over concurrency --- pkg/executor/executortype/poolmgr/gpm.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/executor/executortype/poolmgr/gpm.go b/pkg/executor/executortype/poolmgr/gpm.go index 9e2ea9400b..3c2b2a0d00 100644 --- a/pkg/executor/executortype/poolmgr/gpm.go +++ b/pkg/executor/executortype/poolmgr/gpm.go @@ -659,6 +659,11 @@ func (gpm *GenericPoolManager) doIdleObjectReaper(ctx context.Context) { return } + countByFunction := make(map[k8sTypes.UID]int) + for _, fsvc := range funcSvcs { + countByFunction[fsvc.Function.UID]++ + } + for i := range funcSvcs { fsvc := funcSvcs[i] @@ -688,6 +693,20 @@ func (gpm *GenericPoolManager) doIdleObjectReaper(ctx context.Context) { } } + concurrency := 1 + if fn, ok := fnList[fsvc.Function.UID]; ok { + concurrency = fn.Spec.Concurrency + } + + // If we have more function services running than concurrency should allow + // set the idlePodReapTime to 0 to force this service to be deleted and decrement the concurrent counter + if count, ok := countByFunction[fsvc.Function.UID]; ok { + if count > concurrency { + idlePodReapTime = 0 + countByFunction[fsvc.Function.UID]-- + } + } + if time.Since(fsvc.Atime) < idlePodReapTime { continue } From 2ff27133bec29b2becef962f17b9eb84672b6d81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Thu, 9 Feb 2023 10:58:23 -0500 Subject: [PATCH 02/13] Use readonly mounts for the non-fetcher container --- pkg/fetcher/config/config.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/fetcher/config/config.go b/pkg/fetcher/config/config.go index 0562e56d69..ec7f9f8ae2 100644 --- a/pkg/fetcher/config/config.go +++ b/pkg/fetcher/config/config.go @@ -297,7 +297,18 @@ func (cfg *Config) addFetcherToPodSpecWithCommand(podSpec *apiv1.PodSpec, mainCo } found = true - container.VolumeMounts = append(container.VolumeMounts, mounts...) + + // use a readonly version of the mounts so that the fetcher is the only container with write access to the shared volumes + readOnlyMounts := make([]apiv1.VolumeMount, len(mounts)) + for ix, mount := range mounts { + readOnlyMounts[ix] = apiv1.VolumeMount{ + Name: mount.Name, + MountPath: mount.MountPath, + ReadOnly: true, + } + } + + container.VolumeMounts = append(container.VolumeMounts, readOnlyMounts...) podSpec.Containers[ix] = container } if !found { From 266859da8e710ad71245f499f294985a15ad1f98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Thu, 9 Feb 2023 10:58:53 -0500 Subject: [PATCH 03/13] Make the fetcher noop after it has specialized --- pkg/fetcher/fetcher.go | 56 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go index d7040443f0..29865168bc 100644 --- a/pkg/fetcher/fetcher.go +++ b/pkg/fetcher/fetcher.go @@ -26,6 +26,7 @@ import ( "os" "path/filepath" "strconv" + "sync/atomic" "time" "github.com/google/uuid" @@ -63,6 +64,7 @@ type ( fissionClient versioned.Interface kubeClient kubernetes.Interface httpClient *http.Client + isSpecialized *atomic.Bool Info PodInfo } PodInfo struct { @@ -110,6 +112,40 @@ func MakeFetcher(logger *zap.Logger, clientGen crd.ClientGeneratorInterface, sha return nil, errors.Wrap(err, "error reading pod namespace from downward volume") } + hasFiles := func(dir string) (bool, error) { + f, err := os.Open(dir) + if err != nil { + return false, err + } + defer f.Close() + + _, err = f.Readdirnames(1) + if err == io.EOF { + return false, nil + } + return true, err + } + + userFuncHasFiles, err := hasFiles(sharedVolumePath) + if err != nil { + return nil, err + } + + secretsHasFiles, err := hasFiles(sharedSecretPath) + if err != nil { + return nil, err + } + + configMapsHasFiles, err := hasFiles(sharedConfigPath) + if err != nil { + return nil, err + } + + // if any of the directories have files, then the pod has already been specialized. + // we made these volumes read only for the sandbox here: pkg/fetcher/config/config.go:323 + isSpecialized := &atomic.Bool{} + isSpecialized.Store(userFuncHasFiles || secretsHasFiles || configMapsHasFiles) + hc := &http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)} return &Fetcher{ logger: fLogger, @@ -118,6 +154,7 @@ func MakeFetcher(logger *zap.Logger, clientGen crd.ClientGeneratorInterface, sha sharedConfigPath: sharedConfigPath, fissionClient: fissionClient, kubeClient: kubeClient, + isSpecialized: isSpecialized, Info: PodInfo{ Name: string(name), Namespace: string(namespace), @@ -171,6 +208,12 @@ func (fetcher *Fetcher) FetchHandler(w http.ResponseWriter, r *http.Request) { logger.Info("fetch request done", zap.Duration("elapsed_time", elapsed)) }() + if fetcher.isSpecialized.Load() { + logger.Warn("fetch request received on specialized pod") + w.WriteHeader(http.StatusTeapot) + return + } + // parse request body, err := io.ReadAll(r.Body) if err != nil { @@ -222,6 +265,12 @@ func (fetcher *Fetcher) SpecializeHandler(w http.ResponseWriter, r *http.Request } logger := otelUtils.LoggerWithTraceID(ctx, fetcher.logger) + if fetcher.isSpecialized.Load() { + logger.Warn("specialize request received on specialized pod") + w.WriteHeader(http.StatusTeapot) + return + } + // parse request body, err := io.ReadAll(r.Body) if err != nil { @@ -504,6 +553,12 @@ func (fetcher *Fetcher) UploadHandler(w http.ResponseWriter, r *http.Request) { logger.Info("upload request done", zap.Duration("elapsed_time", elapsed)) }() + if fetcher.isSpecialized.Load() { + logger.Warn("upload request received on specialized pod") + w.WriteHeader(http.StatusTeapot) + return + } + // parse request body, err := io.ReadAll(r.Body) if err != nil { @@ -722,6 +777,7 @@ func (fetcher *Fetcher) SpecializePod(ctx context.Context, fetchReq FunctionFetc if err == nil && resp.StatusCode < 300 { // Success resp.Body.Close() + fetcher.isSpecialized.Store(true) return nil } From cf714ab513164017e1e5c0eb7ea1ca35b2fbfd61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Mon, 17 Apr 2023 11:17:07 -0400 Subject: [PATCH 04/13] Check context after acquiring key --- pkg/executor/executortype/poolmgr/gp.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/executor/executortype/poolmgr/gp.go b/pkg/executor/executortype/poolmgr/gp.go index a137456a16..f1bc046ab4 100644 --- a/pkg/executor/executortype/poolmgr/gp.go +++ b/pkg/executor/executortype/poolmgr/gp.go @@ -96,8 +96,8 @@ func MakeGenericPool( fetcherConfig *fetcherConfig.Config, instanceID string, enableIstio bool, - podSpecPatch *apiv1.PodSpec) *GenericPool { - + podSpecPatch *apiv1.PodSpec, +) *GenericPool { gpLogger := logger.Named("generic_pool") podReadyTimeoutStr := os.Getenv("POD_READY_TIMEOUT") @@ -272,6 +272,23 @@ func (gp *GenericPool) choosePod(ctx context.Context, newLabels map[string]strin return "", nil, errors.New("readypod controller is not running") } key = item.(string) + + select { + case <-ctx.Done(): + gp.readyPodQueue.Done(key) + gp.readyPodQueue.Add(key) + logger.Error("context cancelled before getting key", zap.String("key", key)) + return "", nil, errors.New("context cancelled before getting key") + default: + deadline, ok := ctx.Deadline() + if ok && time.Until(deadline) < 3*time.Second { + gp.readyPodQueue.Done(key) + gp.readyPodQueue.Add(key) + logger.Error("context is to close to deadline after getting key", zap.String("key", key), zap.Stringer("deadline", time.Until(deadline))) + return "", nil, fmt.Errorf("context is %s from deadline after getting key", time.Until(deadline)) + } + } + logger.Debug("got key from the queue", zap.String("key", key)) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -443,7 +460,6 @@ func (gp *GenericPool) specializePod(ctx context.Context, pod *apiv1.Pod, fn *fv } else { return err } - } } // specialize pod with service From 31238cb5b392003187fed8f382fc0d238bca29a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Thu, 20 Apr 2023 16:45:37 -0400 Subject: [PATCH 05/13] Slow down pod terminations --- go.mod | 2 +- .../executortype/poolmgr/poolpodcontroller.go | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 3b6bf68b23..27f2609a34 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( go.opentelemetry.io/otel/trace v1.22.0 go.uber.org/zap v1.26.0 golang.org/x/net v0.20.0 + golang.org/x/time v0.3.0 google.golang.org/grpc v1.61.0 k8s.io/api v0.29.1 k8s.io/apiextensions-apiserver v0.29.1 @@ -157,7 +158,6 @@ require ( golang.org/x/sys v0.16.0 // indirect golang.org/x/term v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.16.1 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect diff --git a/pkg/executor/executortype/poolmgr/poolpodcontroller.go b/pkg/executor/executortype/poolmgr/poolpodcontroller.go index 06b08a2ae4..56f133780f 100644 --- a/pkg/executor/executortype/poolmgr/poolpodcontroller.go +++ b/pkg/executor/executortype/poolmgr/poolpodcontroller.go @@ -22,6 +22,7 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/time/rate" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -69,11 +70,19 @@ type ( } ) +func cleanupPodRateLimiter() workqueue.RateLimiter { + return workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(time.Second, 60*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 5)}, + ) +} + func NewPoolPodController(ctx context.Context, logger *zap.Logger, kubernetesClient kubernetes.Interface, enableIstio bool, finformerFactory map[string]genInformer.SharedInformerFactory, - gpmInformerFactory map[string]k8sInformers.SharedInformerFactory) (*PoolPodController, error) { + gpmInformerFactory map[string]k8sInformers.SharedInformerFactory, +) (*PoolPodController, error) { logger = logger.Named("pool_pod_controller") p := &PoolPodController{ logger: logger, @@ -86,7 +95,7 @@ func NewPoolPodController(ctx context.Context, logger *zap.Logger, podListerSynced: make(map[string]k8sCache.InformerSynced), envCreateUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "EnvAddUpdateQueue"), envDeleteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "EnvDeleteQueue"), - spCleanupPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SpecializedPodCleanupQueue"), + spCleanupPodQueue: workqueue.NewNamedRateLimitingQueue(cleanupPodRateLimiter(), "SpecializedPodCleanupQueue"), } if p.enableIstio { for _, factory := range finformerFactory { @@ -178,7 +187,7 @@ func (p *PoolPodController) processRS(rs *apps.ReplicaSet) { logger.Error("Failed to get key for pod", zap.Error(err)) continue } - p.spCleanupPodQueue.Add(key) + p.spCleanupPodQueue.AddRateLimited(key) } } From 88bae368d3dc50fbf4c087035813a6a74afcfdaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Tue, 25 Apr 2023 14:12:09 -0400 Subject: [PATCH 06/13] Delay specialized pod termination --- .../executortype/poolmgr/poolpodcontroller.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/executor/executortype/poolmgr/poolpodcontroller.go b/pkg/executor/executortype/poolmgr/poolpodcontroller.go index 56f133780f..1cf78b12f6 100644 --- a/pkg/executor/executortype/poolmgr/poolpodcontroller.go +++ b/pkg/executor/executortype/poolmgr/poolpodcontroller.go @@ -73,7 +73,7 @@ type ( func cleanupPodRateLimiter() workqueue.RateLimiter { return workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(time.Second, 60*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 5)}, + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 1)}, ) } @@ -162,7 +162,12 @@ func (p *PoolPodController) processRS(rs *apps.ReplicaSet) { return } logger := p.logger.With(zap.String("rs", rs.Name), zap.String("namespace", rs.Namespace)) - logger.Debug("replica set has zero replica count") + + // wait for unspecialized pods to finish booting before beginning to cleanup specialized ones + delay := 15 * time.Second + logger.Info("replica set has zero replica count, delaying before cleanup", zap.Stringer("delay", delay)) + time.Sleep(delay) + // List all specialized pods and schedule for cleanup rsLabelMap, err := metav1.LabelSelectorAsMap(rs.Spec.Selector) if err != nil { @@ -197,7 +202,7 @@ func (p *PoolPodController) handleRSAdd(obj interface{}) { p.logger.Error("unexpected type when adding rs to pool pod controller", zap.Any("obj", obj)) return } - p.processRS(rs) + go p.processRS(rs) } func (p *PoolPodController) handleRSUpdate(oldObj interface{}, newObj interface{}) { @@ -206,7 +211,7 @@ func (p *PoolPodController) handleRSUpdate(oldObj interface{}, newObj interface{ p.logger.Error("unexpected type when updating rs to pool pod controller", zap.Any("obj", newObj)) return } - p.processRS(rs) + go p.processRS(rs) } func (p *PoolPodController) handleRSDelete(obj interface{}) { @@ -223,7 +228,7 @@ func (p *PoolPodController) handleRSDelete(obj interface{}) { return } } - p.processRS(rs) + go p.processRS(rs) } func (p *PoolPodController) enqueueEnvAdd(obj interface{}) { From 4cc0880f23d5bb0103855e3261ddb5a00ddf6e49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Tue, 25 Apr 2023 13:44:02 -0400 Subject: [PATCH 07/13] Only pre-stop if specialized --- .../executortype/container/deployment.go | 10 ++++++---- .../executortype/newdeploy/newdeploy.go | 14 +++++++------ .../executortype/poolmgr/gp_deployment.go | 12 ++++------- pkg/fetcher/config/config.go | 20 ------------------- 4 files changed, 18 insertions(+), 38 deletions(-) diff --git a/pkg/executor/executortype/container/deployment.go b/pkg/executor/executortype/container/deployment.go index 6afaa6ba5c..292bf1996f 100644 --- a/pkg/executor/executortype/container/deployment.go +++ b/pkg/executor/executortype/container/deployment.go @@ -164,8 +164,8 @@ func (cn *Container) waitForDeploy(ctx context.Context, depl *appsv1.Deployment, } func (cn *Container) getDeploymentSpec(ctx context.Context, fn *fv1.Function, targetReplicas *int32, - deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string) (*appsv1.Deployment, error) { - + deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string, +) (*appsv1.Deployment, error) { replicas := int32(fn.Spec.InvokeStrategy.ExecutionStrategy.MinScale) if targetReplicas != nil { replicas = *targetReplicas @@ -226,12 +226,14 @@ func (cn *Container) getDeploymentSpec(ctx context.Context, fn *fv1.Function, ta Name: fn.ObjectMeta.Name, ImagePullPolicy: cn.runtimeImagePullPolicy, TerminationMessagePath: "/dev/termination-log", + // if the pod is specialized (i.e. has secrets), wait 60 seconds for the routers endpoint cache to expire before shutting down Lifecycle: &apiv1.Lifecycle{ PreStop: &apiv1.LifecycleHandler{ Exec: &apiv1.ExecAction{ Command: []string{ - "/bin/sleep", - fmt.Sprintf("%d", gracePeriodSeconds), + "bash", + "-c", + "test $(ls /secrets/) && sleep 63 || exit 0", }, }, }, diff --git a/pkg/executor/executortype/newdeploy/newdeploy.go b/pkg/executor/executortype/newdeploy/newdeploy.go index 18d8e340dd..86f19f2921 100644 --- a/pkg/executor/executortype/newdeploy/newdeploy.go +++ b/pkg/executor/executortype/newdeploy/newdeploy.go @@ -38,8 +38,8 @@ import ( ) func (deploy *NewDeploy) createOrGetDeployment(ctx context.Context, fn *fv1.Function, env *fv1.Environment, - deployName string, deployLabels map[string]string, deployAnnotations map[string]string, deployNamespace string) (*appsv1.Deployment, error) { - + deployName string, deployLabels map[string]string, deployAnnotations map[string]string, deployNamespace string, +) (*appsv1.Deployment, error) { specializationTimeout := fn.Spec.InvokeStrategy.ExecutionStrategy.SpecializationTimeout minScale := int32(fn.Spec.InvokeStrategy.ExecutionStrategy.MinScale) @@ -128,8 +128,8 @@ func (deploy *NewDeploy) deleteDeployment(ctx context.Context, ns string, name s } func (deploy *NewDeploy) getDeploymentSpec(ctx context.Context, fn *fv1.Function, env *fv1.Environment, targetReplicas *int32, - deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string) (*appsv1.Deployment, error) { - + deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string, +) (*appsv1.Deployment, error) { replicas := int32(fn.Spec.InvokeStrategy.ExecutionStrategy.MinScale) if targetReplicas != nil { replicas = *targetReplicas @@ -190,12 +190,14 @@ func (deploy *NewDeploy) getDeploymentSpec(ctx context.Context, fn *fv1.Function Image: env.Spec.Runtime.Image, ImagePullPolicy: deploy.runtimeImagePullPolicy, TerminationMessagePath: "/dev/termination-log", + // if the pod is specialized (i.e. has secrets), wait 60 seconds for the routers endpoint cache to expire before shutting down Lifecycle: &apiv1.Lifecycle{ PreStop: &apiv1.LifecycleHandler{ Exec: &apiv1.ExecAction{ Command: []string{ - "/bin/sleep", - fmt.Sprintf("%d", gracePeriodSeconds), + "bash", + "-c", + "test $(ls /secrets/) && sleep 63 || exit 0", }, }, }, diff --git a/pkg/executor/executortype/poolmgr/gp_deployment.go b/pkg/executor/executortype/poolmgr/gp_deployment.go index 4d2f415576..cbfd66bbd1 100644 --- a/pkg/executor/executortype/poolmgr/gp_deployment.go +++ b/pkg/executor/executortype/poolmgr/gp_deployment.go @@ -103,18 +103,14 @@ func (gp *GenericPool) genDeploymentSpec(env *fv1.Environment) (*appsv1.Deployme ImagePullPolicy: gp.runtimeImagePullPolicy, TerminationMessagePath: "/dev/termination-log", Resources: env.Spec.Resources, - // Pod is removed from endpoints list for service when it's - // state became "Termination". We used preStop hook as the - // workaround for connection draining since pod maybe shutdown - // before grace period expires. - // https://kubernetes.io/docs/concepts/workloads/pods/pod/#termination-of-pods - // https://github.com/kubernetes/kubernetes/issues/47576#issuecomment-308900172 + // if the pod is specialized (i.e. has secrets), wait 60 seconds for the routers endpoint cache to expire before shutting down Lifecycle: &apiv1.Lifecycle{ PreStop: &apiv1.LifecycleHandler{ Exec: &apiv1.ExecAction{ Command: []string{ - "/bin/sleep", - fmt.Sprintf("%d", gracePeriodSeconds), + "bash", + "-c", + "test $(ls /secrets/) && sleep 63 || exit 0", }, }, }, diff --git a/pkg/fetcher/config/config.go b/pkg/fetcher/config/config.go index ec7f9f8ae2..8b7e6803de 100644 --- a/pkg/fetcher/config/config.go +++ b/pkg/fetcher/config/config.go @@ -2,7 +2,6 @@ package container import ( "encoding/json" - "fmt" "os" "path/filepath" @@ -271,25 +270,6 @@ func (cfg *Config) addFetcherToPodSpecWithCommand(podSpec *apiv1.PodSpec, mainCo Env: otel.OtelEnvForContainer(), } - // Pod is removed from endpoints list for service when it's - // state became "Termination". We used preStop hook as the - // workaround for connection draining since pod maybe shutdown - // before grace period expires. - // https://kubernetes.io/docs/concepts/workloads/pods/pod/#termination-of-pods - // https://github.com/kubernetes/kubernetes/issues/47576#issuecomment-308900172 - if podSpec.TerminationGracePeriodSeconds != nil { - c.Lifecycle = &apiv1.Lifecycle{ - PreStop: &apiv1.LifecycleHandler{ - Exec: &apiv1.ExecAction{ - Command: []string{ - "/bin/sleep", - fmt.Sprintf("%v", *podSpec.TerminationGracePeriodSeconds), - }, - }, - }, - } - } - found := false for ix, container := range podSpec.Containers { if container.Name != mainContainerName { From 4381a09522ac769133650c4fa474df1565d28329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Wed, 9 Aug 2023 15:32:35 -0400 Subject: [PATCH 08/13] Add /router-startup endpoint to router --- pkg/router/httpTriggers.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/router/httpTriggers.go b/pkg/router/httpTriggers.go index 107a856356..794dbd7f20 100644 --- a/pkg/router/httpTriggers.go +++ b/pkg/router/httpTriggers.go @@ -20,6 +20,7 @@ import ( "context" "net/http" "strings" + "sync/atomic" "time" "github.com/bep/debounce" @@ -57,6 +58,7 @@ type HTTPTriggerSet struct { functions []fv1.Function funcInformer map[string]k8sCache.SharedIndexInformer updateRouterRequestChannel chan struct{} + hasFunctionsAndTriggers atomic.Bool tsRoundTripperParams *tsRoundTripperParams isDebugEnv bool svcAddrUpdateThrottler *throttler.Throttler @@ -65,8 +67,8 @@ type HTTPTriggerSet struct { } func makeHTTPTriggerSet(logger *zap.Logger, fmap *functionServiceMap, fissionClient versioned.Interface, - kubeClient kubernetes.Interface, executor eclient.ClientInterface, params *tsRoundTripperParams, isDebugEnv bool, unTapServiceTimeout time.Duration, actionThrottler *throttler.Throttler) (*HTTPTriggerSet, error) { - + kubeClient kubernetes.Interface, executor eclient.ClientInterface, params *tsRoundTripperParams, isDebugEnv bool, unTapServiceTimeout time.Duration, actionThrottler *throttler.Throttler, +) (*HTTPTriggerSet, error) { httpTriggerSet := &HTTPTriggerSet{ logger: logger.Named("http_trigger_set"), functionServiceMap: fmap, @@ -142,7 +144,6 @@ func versionHandler(w http.ResponseWriter, r *http.Request) { } func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) (*mux.Router, error) { - featureConfig, err := config.GetFeatureConfig(ts.logger) if err != nil { return nil, err @@ -297,6 +298,15 @@ func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) (*mux.Router // Healthz endpoint for the router. muxRouter.HandleFunc("/router-healthz", routerHealthHandler).Methods("GET") + // Startup endpoint for the router. + muxRouter.HandleFunc("/router-startup", func(w http.ResponseWriter, r *http.Request) { + if ts.hasFunctionsAndTriggers.Load() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + }).Methods("GET") + // version of application. muxRouter.HandleFunc("/_version", versionHandler).Methods("GET") @@ -424,5 +434,10 @@ func (ts *HTTPTriggerSet) updateRouter(ctx context.Context) { continue } ts.mutableRouter.updateRouter(router) + + if !ts.hasFunctionsAndTriggers.Load() && len(ts.triggers) > 0 && len(ts.functions) > 0 { + // let the startup probe know that the router is ready + ts.hasFunctionsAndTriggers.Store(true) + } } } From 63b928f670ceec655215fb04726a72016b6bf71c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Mon, 4 Dec 2023 11:32:15 -0500 Subject: [PATCH 09/13] Ensure funcSvcGroup is not nil --- pkg/executor/fscache/poolcache.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/executor/fscache/poolcache.go b/pkg/executor/fscache/poolcache.go index 984591bd5e..19bca4acba 100644 --- a/pkg/executor/fscache/poolcache.go +++ b/pkg/executor/fscache/poolcache.go @@ -287,11 +287,16 @@ func (c *PoolCache) service() { } } case markSpecializationFailure: - if c.cache[req.function].svcWaiting > c.cache[req.function].queue.Len() { - c.cache[req.function].svcWaiting-- - if c.cache[req.function].svcWaiting == c.cache[req.function].queue.Len() { - expiredRequests := c.cache[req.function].queue.Expired() - c.cache[req.function].svcWaiting = c.cache[req.function].svcWaiting - expiredRequests + funcSvcGroup, ok := c.cache[req.function] + if !ok || funcSvcGroup == nil { + break + } + + if funcSvcGroup.svcWaiting > funcSvcGroup.queue.Len() { + funcSvcGroup.svcWaiting-- + if funcSvcGroup.svcWaiting == funcSvcGroup.queue.Len() { + expiredRequests := funcSvcGroup.queue.Expired() + funcSvcGroup.svcWaiting = funcSvcGroup.svcWaiting - expiredRequests } } case deleteValue: From e25994ad53decfc4413262be0bd5f351466baca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Mon, 4 Dec 2023 12:27:38 -0500 Subject: [PATCH 10/13] Ensure c.cache[req.function] is not nil --- pkg/executor/fscache/poolcache.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/executor/fscache/poolcache.go b/pkg/executor/fscache/poolcache.go index 19bca4acba..3b3234ac66 100644 --- a/pkg/executor/fscache/poolcache.go +++ b/pkg/executor/fscache/poolcache.go @@ -287,16 +287,15 @@ func (c *PoolCache) service() { } } case markSpecializationFailure: - funcSvcGroup, ok := c.cache[req.function] - if !ok || funcSvcGroup == nil { - break + if _, ok := c.cache[req.function]; !ok { + c.cache[req.function] = NewFuncSvcGroup() } - if funcSvcGroup.svcWaiting > funcSvcGroup.queue.Len() { - funcSvcGroup.svcWaiting-- - if funcSvcGroup.svcWaiting == funcSvcGroup.queue.Len() { - expiredRequests := funcSvcGroup.queue.Expired() - funcSvcGroup.svcWaiting = funcSvcGroup.svcWaiting - expiredRequests + if c.cache[req.function].svcWaiting > c.cache[req.function].queue.Len() { + c.cache[req.function].svcWaiting-- + if c.cache[req.function].svcWaiting == c.cache[req.function].queue.Len() { + expiredRequests := c.cache[req.function].queue.Expired() + c.cache[req.function].svcWaiting = c.cache[req.function].svcWaiting - expiredRequests } } case deleteValue: From f40323c2d00e5266e3f8728a7beb7ea43f9fac75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Tue, 20 Feb 2024 14:43:49 -0500 Subject: [PATCH 11/13] Revert "Use concurrency in poolmanager as per old behaviour (#2876)" This reverts commit b85ba9e419e67a92639d3b3474a3305f473d565f. --- pkg/executor/fscache/poolcache.go | 21 +++++--------------- pkg/executor/fscache/poolcache_test.go | 27 ++++++++++---------------- 2 files changed, 15 insertions(+), 33 deletions(-) diff --git a/pkg/executor/fscache/poolcache.go b/pkg/executor/fscache/poolcache.go index 3b3234ac66..d2021f7d00 100644 --- a/pkg/executor/fscache/poolcache.go +++ b/pkg/executor/fscache/poolcache.go @@ -96,6 +96,7 @@ type ( ) // NewPoolCache create a Cache object + func NewPoolCache(logger *zap.Logger) *PoolCache { c := &PoolCache{ cache: make(map[crd.CacheKeyURG]*funcSvcGroup), @@ -121,7 +122,6 @@ func (c *PoolCache) service() { case getValue: funcSvcGroup, ok := c.cache[req.function] if !ok { - // first request for this function, create a new group c.cache[req.function] = NewFuncSvcGroup() c.cache[req.function].svcWaiting++ resp.error = ferror.MakeError(ferror.ErrorNotFound, @@ -131,7 +131,6 @@ func (c *PoolCache) service() { } found := false totalActiveRequests := 0 - // check if any specialized pod is available for addr := range funcSvcGroup.svcs { totalActiveRequests += funcSvcGroup.svcs[addr].activeRequests if funcSvcGroup.svcs[addr].activeRequests < req.requestsPerPod && @@ -146,22 +145,12 @@ func (c *PoolCache) service() { break } } - // if specialized pod is available then return svc if found { req.responseChannel <- resp continue } - concurrencyUsed := len(funcSvcGroup.svcs) + (funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len()) - // if concurrency is available then be aggressive and use it as we are not sure if specialization will complete for other requests - if req.concurrency > 0 && concurrencyUsed < req.concurrency { - funcSvcGroup.svcWaiting++ - resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%s' not found", req.function)) - req.responseChannel <- resp - continue - } - // if no concurrency is available then check if there is any virtual capacity in the existing pods to serve the request in future - // if specialization doesnt complete within request then request will be timeout - capacity := (concurrencyUsed * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting) + specializationInProgress := funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len() + capacity := ((specializationInProgress + len(funcSvcGroup.svcs)) * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting) if capacity > 0 { funcSvcGroup.svcWaiting++ svcWait := &svcWait{ @@ -175,8 +164,8 @@ func (c *PoolCache) service() { } // concurrency should not be set to zero and - // sum of specialization in progress and specialized pods should be less then req.concurrency - if req.concurrency > 0 && concurrencyUsed >= req.concurrency { + //sum of specialization in progress and specialized pods should be less then req.concurrency + if req.concurrency > 0 && (specializationInProgress+len(funcSvcGroup.svcs)) >= req.concurrency { resp.error = ferror.MakeError(ferror.ErrorTooManyRequests, fmt.Sprintf("function '%s' concurrency '%d' limit reached.", req.function, req.concurrency)) } else { funcSvcGroup.svcWaiting++ diff --git a/pkg/executor/fscache/poolcache_test.go b/pkg/executor/fscache/poolcache_test.go index 892bbc3398..db11bbbcb3 100644 --- a/pkg/executor/fscache/poolcache_test.go +++ b/pkg/executor/fscache/poolcache_test.go @@ -224,7 +224,6 @@ func TestPoolCacheRequests(t *testing.T) { simultaneous = 1 } for i := 1; i <= tt.requests; i++ { - reqno := i wg.Add(1) go func(reqno int) { defer wg.Done() @@ -232,11 +231,10 @@ func TestPoolCacheRequests(t *testing.T) { if err != nil { code, _ := ferror.GetHTTPError(err) if code == http.StatusNotFound { - atomic.AddUint64(&svcCounter, 1) - address := fmt.Sprintf("svc-%d", atomic.LoadUint64(&svcCounter)) - p.SetSvcValue(context.Background(), key, address, &FuncSvc{ - Name: address, + p.SetSvcValue(context.Background(), key, fmt.Sprintf("svc-%d", svcCounter), &FuncSvc{ + Name: "value", }, resource.MustParse("45m"), tt.rpp, tt.retainPods) + atomic.AddUint64(&svcCounter, 1) } else { t.Log(reqno, "=>", err) atomic.AddUint64(&failedRequests, 1) @@ -246,12 +244,9 @@ func TestPoolCacheRequests(t *testing.T) { t.Log(reqno, "=>", "svc is nil") atomic.AddUint64(&failedRequests, 1) } - // } else { - // t.Log(reqno, "=>", svc.Name) - // } } - }(reqno) - if reqno%simultaneous == 0 { + }(i) + if i%simultaneous == 0 { wg.Wait() } } @@ -263,11 +258,10 @@ func TestPoolCacheRequests(t *testing.T) { for i := 0; i < tt.concurrency; i++ { for j := 0; j < tt.rpp; j++ { wg.Add(1) - svcno := i - go func(svcno int) { + go func(i int) { defer wg.Done() - p.MarkAvailable(key, fmt.Sprintf("svc-%d", svcno+1)) - }(svcno) + p.MarkAvailable(key, fmt.Sprintf("svc-%d", i)) + }(i) } } wg.Wait() @@ -276,9 +270,8 @@ func TestPoolCacheRequests(t *testing.T) { UID: "func", Generation: 2, } - address := fmt.Sprintf("svc-%d", svcCounter) - p.SetSvcValue(context.Background(), newKey, address, &FuncSvc{ - Name: address, + p.SetSvcValue(context.Background(), newKey, fmt.Sprintf("svc-%d", svcCounter), &FuncSvc{ + Name: "value", }, resource.MustParse("45m"), tt.rpp, tt.retainPods) funcSvc := p.ListAvailableValue() require.Equal(t, tt.concurrency, len(funcSvc)) From b374704f73f85ee2f17f662b9ca78c117fdde4a1 Mon Sep 17 00:00:00 2001 From: Mike Presman Date: Thu, 23 May 2024 13:50:59 -0400 Subject: [PATCH 12/13] add better logging for debugging 404s --- pkg/executor/api.go | 2 +- pkg/router/functionHandler.go | 4 ++-- pkg/router/httpTriggers.go | 10 ++++++++++ pkg/router/router.go | 1 + 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/executor/api.go b/pkg/executor/api.go index c864a6bd75..faf87e1767 100644 --- a/pkg/executor/api.go +++ b/pkg/executor/api.go @@ -60,7 +60,7 @@ func (executor *Executor) getServiceForFunctionAPI(w http.ResponseWriter, r *htt logger := otelUtils.LoggerWithTraceID(ctx, executor.logger) // Check function -> svc cache - logger.Debug("checking for cached function service", + logger.Info("checking for cached function service", zap.String("function_name", fn.ObjectMeta.Name), zap.String("function_namespace", fn.ObjectMeta.Namespace)) if t == fv1.ExecutorTypePoolmgr && !fn.Spec.OnceOnly { diff --git a/pkg/router/functionHandler.go b/pkg/router/functionHandler.go index e732c9252c..7aa3896210 100644 --- a/pkg/router/functionHandler.go +++ b/pkg/router/functionHandler.go @@ -650,7 +650,7 @@ func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serv service, err := fh.executor.GetServiceForFunction(fContext, fh.function) if err != nil { statusCode, errMsg := ferror.GetHTTPError(err) - logger.Error("error from GetServiceForFunction", + logger.Info("error from GetServiceForFunction", zap.Error(err), zap.String("error_message", errMsg), zap.Any("function", fh.function), @@ -660,7 +660,7 @@ func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serv // parse the address into url svcURL, err := url.Parse(fmt.Sprintf("http://%v", service)) if err != nil { - logger.Error("error parsing service url", + logger.Info("error parsing service url", zap.Error(err), zap.String("service_url", svcURL.String())) return nil, err diff --git a/pkg/router/httpTriggers.go b/pkg/router/httpTriggers.go index 794dbd7f20..d646b17e08 100644 --- a/pkg/router/httpTriggers.go +++ b/pkg/router/httpTriggers.go @@ -18,6 +18,7 @@ package router import ( "context" + "log" "net/http" "strings" "sync/atomic" @@ -143,6 +144,11 @@ func versionHandler(w http.ResponseWriter, r *http.Request) { } } +func NotFoundHandler(w http.ResponseWriter, r *http.Request) { + log.Printf("404 Not Found: %s %s from %s at %s", r.Method, r.URL.Path, r.RemoteAddr, time.Now().Format(time.RFC3339)) + w.WriteHeader(http.StatusNotFound) +} + func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) (*mux.Router, error) { featureConfig, err := config.GetFeatureConfig(ts.logger) if err != nil { @@ -150,6 +156,9 @@ func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) (*mux.Router } muxRouter := mux.NewRouter() + + muxRouter.NotFoundHandler = http.HandlerFunc(NotFoundHandler) + muxRouter.Use(metrics.HTTPMetricMiddleware) if featureConfig.AuthConfig.IsEnabled { muxRouter.Use(authMiddleware(featureConfig)) @@ -434,6 +443,7 @@ func (ts *HTTPTriggerSet) updateRouter(ctx context.Context) { continue } ts.mutableRouter.updateRouter(router) + ts.logger.Info("updated router", zap.Int("triggers", len(ts.triggers)), zap.Int("functions", len(ts.functions))) if !ts.hasFunctionsAndTriggers.Load() && len(ts.triggers) > 0 && len(ts.functions) > 0 { // let the startup probe know that the router is ready diff --git a/pkg/router/router.go b/pkg/router/router.go index cc1bd8b033..ad8d8f5cc8 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -103,6 +103,7 @@ func serve(ctx context.Context, logger *zap.Logger, mgr manager.Interface, port // Start starts a router func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, mgr manager.Interface, port int, executor eclient.ClientInterface) error { fmap := makeFunctionServiceMap(logger, time.Minute) + logger.Info("starting router with a new functionServiceMap") fissionClient, err := clientGen.GetFissionClient() if err != nil { From 9e444b66e97ab5a72b1003a574f3bbe689dea68f Mon Sep 17 00:00:00 2001 From: Mike Presman Date: Thu, 23 May 2024 14:20:02 -0400 Subject: [PATCH 13/13] changes after review + reset some stuff --- pkg/executor/api.go | 2 +- pkg/router/functionHandler.go | 4 ++-- pkg/router/httpTriggers.go | 11 ++++++----- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/executor/api.go b/pkg/executor/api.go index faf87e1767..c864a6bd75 100644 --- a/pkg/executor/api.go +++ b/pkg/executor/api.go @@ -60,7 +60,7 @@ func (executor *Executor) getServiceForFunctionAPI(w http.ResponseWriter, r *htt logger := otelUtils.LoggerWithTraceID(ctx, executor.logger) // Check function -> svc cache - logger.Info("checking for cached function service", + logger.Debug("checking for cached function service", zap.String("function_name", fn.ObjectMeta.Name), zap.String("function_namespace", fn.ObjectMeta.Namespace)) if t == fv1.ExecutorTypePoolmgr && !fn.Spec.OnceOnly { diff --git a/pkg/router/functionHandler.go b/pkg/router/functionHandler.go index 7aa3896210..e732c9252c 100644 --- a/pkg/router/functionHandler.go +++ b/pkg/router/functionHandler.go @@ -650,7 +650,7 @@ func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serv service, err := fh.executor.GetServiceForFunction(fContext, fh.function) if err != nil { statusCode, errMsg := ferror.GetHTTPError(err) - logger.Info("error from GetServiceForFunction", + logger.Error("error from GetServiceForFunction", zap.Error(err), zap.String("error_message", errMsg), zap.Any("function", fh.function), @@ -660,7 +660,7 @@ func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serv // parse the address into url svcURL, err := url.Parse(fmt.Sprintf("http://%v", service)) if err != nil { - logger.Info("error parsing service url", + logger.Error("error parsing service url", zap.Error(err), zap.String("service_url", svcURL.String())) return nil, err diff --git a/pkg/router/httpTriggers.go b/pkg/router/httpTriggers.go index d646b17e08..9afd337668 100644 --- a/pkg/router/httpTriggers.go +++ b/pkg/router/httpTriggers.go @@ -18,7 +18,6 @@ package router import ( "context" - "log" "net/http" "strings" "sync/atomic" @@ -144,9 +143,11 @@ func versionHandler(w http.ResponseWriter, r *http.Request) { } } -func NotFoundHandler(w http.ResponseWriter, r *http.Request) { - log.Printf("404 Not Found: %s %s from %s at %s", r.Method, r.URL.Path, r.RemoteAddr, time.Now().Format(time.RFC3339)) - w.WriteHeader(http.StatusNotFound) +func NotFoundHandler(ts *HTTPTriggerSet) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ts.logger.Info("404 Not Found", zap.String("req_method", r.Method), zap.String("url_path", r.URL.Path), zap.String("req_address", r.RemoteAddr), zap.Int("route_table_size", len(ts.functions))) + w.WriteHeader(http.StatusNotFound) + } } func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) (*mux.Router, error) { @@ -157,7 +158,7 @@ func (ts *HTTPTriggerSet) getRouter(fnTimeoutMap map[types.UID]int) (*mux.Router muxRouter := mux.NewRouter() - muxRouter.NotFoundHandler = http.HandlerFunc(NotFoundHandler) + muxRouter.NotFoundHandler = http.HandlerFunc(NotFoundHandler(ts)) muxRouter.Use(metrics.HTTPMetricMiddleware) if featureConfig.AuthConfig.IsEnabled {