Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gadget 1.20 #25

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/zap v1.26.0
golang.org/x/net v0.20.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.61.0
k8s.io/api v0.29.1
k8s.io/apiextensions-apiserver v0.29.1
Expand Down Expand Up @@ -157,7 +158,6 @@ require (
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions pkg/executor/executortype/container/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (cn *Container) waitForDeploy(ctx context.Context, depl *appsv1.Deployment,
}

func (cn *Container) getDeploymentSpec(ctx context.Context, fn *fv1.Function, targetReplicas *int32,
deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string) (*appsv1.Deployment, error) {

deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string,
) (*appsv1.Deployment, error) {
replicas := int32(fn.Spec.InvokeStrategy.ExecutionStrategy.MinScale)
if targetReplicas != nil {
replicas = *targetReplicas
Expand Down Expand Up @@ -226,12 +226,14 @@ func (cn *Container) getDeploymentSpec(ctx context.Context, fn *fv1.Function, ta
Name: fn.ObjectMeta.Name,
ImagePullPolicy: cn.runtimeImagePullPolicy,
TerminationMessagePath: "/dev/termination-log",
// if the pod is specialized (i.e. has secrets), wait 60 seconds for the routers endpoint cache to expire before shutting down
Lifecycle: &apiv1.Lifecycle{
PreStop: &apiv1.LifecycleHandler{
Exec: &apiv1.ExecAction{
Command: []string{
"/bin/sleep",
fmt.Sprintf("%d", gracePeriodSeconds),
"bash",
"-c",
"test $(ls /secrets/) && sleep 63 || exit 0",
},
},
},
Expand Down
14 changes: 8 additions & 6 deletions pkg/executor/executortype/newdeploy/newdeploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (
)

func (deploy *NewDeploy) createOrGetDeployment(ctx context.Context, fn *fv1.Function, env *fv1.Environment,
deployName string, deployLabels map[string]string, deployAnnotations map[string]string, deployNamespace string) (*appsv1.Deployment, error) {

deployName string, deployLabels map[string]string, deployAnnotations map[string]string, deployNamespace string,
) (*appsv1.Deployment, error) {
specializationTimeout := fn.Spec.InvokeStrategy.ExecutionStrategy.SpecializationTimeout
minScale := int32(fn.Spec.InvokeStrategy.ExecutionStrategy.MinScale)

Expand Down Expand Up @@ -128,8 +128,8 @@ func (deploy *NewDeploy) deleteDeployment(ctx context.Context, ns string, name s
}

func (deploy *NewDeploy) getDeploymentSpec(ctx context.Context, fn *fv1.Function, env *fv1.Environment, targetReplicas *int32,
deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string) (*appsv1.Deployment, error) {

deployName string, deployNamespace string, deployLabels map[string]string, deployAnnotations map[string]string,
) (*appsv1.Deployment, error) {
replicas := int32(fn.Spec.InvokeStrategy.ExecutionStrategy.MinScale)
if targetReplicas != nil {
replicas = *targetReplicas
Expand Down Expand Up @@ -190,12 +190,14 @@ func (deploy *NewDeploy) getDeploymentSpec(ctx context.Context, fn *fv1.Function
Image: env.Spec.Runtime.Image,
ImagePullPolicy: deploy.runtimeImagePullPolicy,
TerminationMessagePath: "/dev/termination-log",
// if the pod is specialized (i.e. has secrets), wait 60 seconds for the routers endpoint cache to expire before shutting down
Lifecycle: &apiv1.Lifecycle{
PreStop: &apiv1.LifecycleHandler{
Exec: &apiv1.ExecAction{
Command: []string{
"/bin/sleep",
fmt.Sprintf("%d", gracePeriodSeconds),
"bash",
"-c",
"test $(ls /secrets/) && sleep 63 || exit 0",
},
},
},
Expand Down
22 changes: 19 additions & 3 deletions pkg/executor/executortype/poolmgr/gp.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func MakeGenericPool(
fetcherConfig *fetcherConfig.Config,
instanceID string,
enableIstio bool,
podSpecPatch *apiv1.PodSpec) *GenericPool {

podSpecPatch *apiv1.PodSpec,
) *GenericPool {
gpLogger := logger.Named("generic_pool")

podReadyTimeoutStr := os.Getenv("POD_READY_TIMEOUT")
Expand Down Expand Up @@ -272,6 +272,23 @@ func (gp *GenericPool) choosePod(ctx context.Context, newLabels map[string]strin
return "", nil, errors.New("readypod controller is not running")
}
key = item.(string)

select {
case <-ctx.Done():
gp.readyPodQueue.Done(key)
gp.readyPodQueue.Add(key)
logger.Error("context cancelled before getting key", zap.String("key", key))
return "", nil, errors.New("context cancelled before getting key")
default:
deadline, ok := ctx.Deadline()
if ok && time.Until(deadline) < 3*time.Second {
gp.readyPodQueue.Done(key)
gp.readyPodQueue.Add(key)
logger.Error("context is to close to deadline after getting key", zap.String("key", key), zap.Stringer("deadline", time.Until(deadline)))
return "", nil, fmt.Errorf("context is %s from deadline after getting key", time.Until(deadline))
}
}

logger.Debug("got key from the queue", zap.String("key", key))
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down Expand Up @@ -443,7 +460,6 @@ func (gp *GenericPool) specializePod(ctx context.Context, pod *apiv1.Pod, fn *fv
} else {
return err
}

}
}
// specialize pod with service
Expand Down
12 changes: 4 additions & 8 deletions pkg/executor/executortype/poolmgr/gp_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,14 @@ func (gp *GenericPool) genDeploymentSpec(env *fv1.Environment) (*appsv1.Deployme
ImagePullPolicy: gp.runtimeImagePullPolicy,
TerminationMessagePath: "/dev/termination-log",
Resources: env.Spec.Resources,
// Pod is removed from endpoints list for service when it's
// state became "Termination". We used preStop hook as the
// workaround for connection draining since pod maybe shutdown
// before grace period expires.
// https://kubernetes.io/docs/concepts/workloads/pods/pod/#termination-of-pods
// https://github.com/kubernetes/kubernetes/issues/47576#issuecomment-308900172
// if the pod is specialized (i.e. has secrets), wait 60 seconds for the routers endpoint cache to expire before shutting down
Lifecycle: &apiv1.Lifecycle{
PreStop: &apiv1.LifecycleHandler{
Exec: &apiv1.ExecAction{
Command: []string{
"/bin/sleep",
fmt.Sprintf("%d", gracePeriodSeconds),
"bash",
"-c",
"test $(ls /secrets/) && sleep 63 || exit 0",
},
},
},
Expand Down
19 changes: 19 additions & 0 deletions pkg/executor/executortype/poolmgr/gpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,11 @@ func (gpm *GenericPoolManager) doIdleObjectReaper(ctx context.Context) {
return
}

countByFunction := make(map[k8sTypes.UID]int)
for _, fsvc := range funcSvcs {
countByFunction[fsvc.Function.UID]++
}

for i := range funcSvcs {
fsvc := funcSvcs[i]

Expand Down Expand Up @@ -688,6 +693,20 @@ func (gpm *GenericPoolManager) doIdleObjectReaper(ctx context.Context) {
}
}

concurrency := 1
if fn, ok := fnList[fsvc.Function.UID]; ok {
concurrency = fn.Spec.Concurrency
}

// If we have more function services running than concurrency should allow
// set the idlePodReapTime to 0 to force this service to be deleted and decrement the concurrent counter
if count, ok := countByFunction[fsvc.Function.UID]; ok {
if count > concurrency {
idlePodReapTime = 0
countByFunction[fsvc.Function.UID]--
}
}

if time.Since(fsvc.Atime) < idlePodReapTime {
continue
}
Expand Down
28 changes: 21 additions & 7 deletions pkg/executor/executortype/poolmgr/poolpodcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"go.uber.org/zap"
"golang.org/x/time/rate"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -69,11 +70,19 @@ type (
}
)

func cleanupPodRateLimiter() workqueue.RateLimiter {
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(time.Second, 60*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 1)},
)
}

func NewPoolPodController(ctx context.Context, logger *zap.Logger,
kubernetesClient kubernetes.Interface,
enableIstio bool,
finformerFactory map[string]genInformer.SharedInformerFactory,
gpmInformerFactory map[string]k8sInformers.SharedInformerFactory) (*PoolPodController, error) {
gpmInformerFactory map[string]k8sInformers.SharedInformerFactory,
) (*PoolPodController, error) {
logger = logger.Named("pool_pod_controller")
p := &PoolPodController{
logger: logger,
Expand All @@ -86,7 +95,7 @@ func NewPoolPodController(ctx context.Context, logger *zap.Logger,
podListerSynced: make(map[string]k8sCache.InformerSynced),
envCreateUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "EnvAddUpdateQueue"),
envDeleteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "EnvDeleteQueue"),
spCleanupPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SpecializedPodCleanupQueue"),
spCleanupPodQueue: workqueue.NewNamedRateLimitingQueue(cleanupPodRateLimiter(), "SpecializedPodCleanupQueue"),
}
if p.enableIstio {
for _, factory := range finformerFactory {
Expand Down Expand Up @@ -153,7 +162,12 @@ func (p *PoolPodController) processRS(rs *apps.ReplicaSet) {
return
}
logger := p.logger.With(zap.String("rs", rs.Name), zap.String("namespace", rs.Namespace))
logger.Debug("replica set has zero replica count")

// wait for unspecialized pods to finish booting before beginning to cleanup specialized ones
delay := 15 * time.Second
logger.Info("replica set has zero replica count, delaying before cleanup", zap.Stringer("delay", delay))
time.Sleep(delay)

// List all specialized pods and schedule for cleanup
rsLabelMap, err := metav1.LabelSelectorAsMap(rs.Spec.Selector)
if err != nil {
Expand All @@ -178,7 +192,7 @@ func (p *PoolPodController) processRS(rs *apps.ReplicaSet) {
logger.Error("Failed to get key for pod", zap.Error(err))
continue
}
p.spCleanupPodQueue.Add(key)
p.spCleanupPodQueue.AddRateLimited(key)
}
}

Expand All @@ -188,7 +202,7 @@ func (p *PoolPodController) handleRSAdd(obj interface{}) {
p.logger.Error("unexpected type when adding rs to pool pod controller", zap.Any("obj", obj))
return
}
p.processRS(rs)
go p.processRS(rs)
}

func (p *PoolPodController) handleRSUpdate(oldObj interface{}, newObj interface{}) {
Expand All @@ -197,7 +211,7 @@ func (p *PoolPodController) handleRSUpdate(oldObj interface{}, newObj interface{
p.logger.Error("unexpected type when updating rs to pool pod controller", zap.Any("obj", newObj))
return
}
p.processRS(rs)
go p.processRS(rs)
}

func (p *PoolPodController) handleRSDelete(obj interface{}) {
Expand All @@ -214,7 +228,7 @@ func (p *PoolPodController) handleRSDelete(obj interface{}) {
return
}
}
p.processRS(rs)
go p.processRS(rs)
}

func (p *PoolPodController) enqueueEnvAdd(obj interface{}) {
Expand Down
25 changes: 9 additions & 16 deletions pkg/executor/fscache/poolcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type (
)

// NewPoolCache create a Cache object

func NewPoolCache(logger *zap.Logger) *PoolCache {
c := &PoolCache{
cache: make(map[crd.CacheKeyURG]*funcSvcGroup),
Expand All @@ -121,7 +122,6 @@ func (c *PoolCache) service() {
case getValue:
funcSvcGroup, ok := c.cache[req.function]
if !ok {
// first request for this function, create a new group
c.cache[req.function] = NewFuncSvcGroup()
c.cache[req.function].svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound,
Expand All @@ -131,7 +131,6 @@ func (c *PoolCache) service() {
}
found := false
totalActiveRequests := 0
// check if any specialized pod is available
for addr := range funcSvcGroup.svcs {
totalActiveRequests += funcSvcGroup.svcs[addr].activeRequests
if funcSvcGroup.svcs[addr].activeRequests < req.requestsPerPod &&
Expand All @@ -146,22 +145,12 @@ func (c *PoolCache) service() {
break
}
}
// if specialized pod is available then return svc
if found {
req.responseChannel <- resp
continue
}
concurrencyUsed := len(funcSvcGroup.svcs) + (funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len())
// if concurrency is available then be aggressive and use it as we are not sure if specialization will complete for other requests
if req.concurrency > 0 && concurrencyUsed < req.concurrency {
funcSvcGroup.svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%s' not found", req.function))
req.responseChannel <- resp
continue
}
// if no concurrency is available then check if there is any virtual capacity in the existing pods to serve the request in future
// if specialization doesnt complete within request then request will be timeout
capacity := (concurrencyUsed * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting)
specializationInProgress := funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len()
capacity := ((specializationInProgress + len(funcSvcGroup.svcs)) * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting)
if capacity > 0 {
funcSvcGroup.svcWaiting++
svcWait := &svcWait{
Expand All @@ -175,8 +164,8 @@ func (c *PoolCache) service() {
}

// concurrency should not be set to zero and
// sum of specialization in progress and specialized pods should be less then req.concurrency
if req.concurrency > 0 && concurrencyUsed >= req.concurrency {
//sum of specialization in progress and specialized pods should be less then req.concurrency
if req.concurrency > 0 && (specializationInProgress+len(funcSvcGroup.svcs)) >= req.concurrency {
resp.error = ferror.MakeError(ferror.ErrorTooManyRequests, fmt.Sprintf("function '%s' concurrency '%d' limit reached.", req.function, req.concurrency))
} else {
funcSvcGroup.svcWaiting++
Expand Down Expand Up @@ -287,6 +276,10 @@ func (c *PoolCache) service() {
}
}
case markSpecializationFailure:
if _, ok := c.cache[req.function]; !ok {
c.cache[req.function] = NewFuncSvcGroup()
}

if c.cache[req.function].svcWaiting > c.cache[req.function].queue.Len() {
c.cache[req.function].svcWaiting--
if c.cache[req.function].svcWaiting == c.cache[req.function].queue.Len() {
Expand Down
Loading