Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Reserve plugin #82

Merged
merged 17 commits into from
Dec 22, 2023
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ examples/advanced/main.wasm: examples/advanced/main.go
@(cd $(@D); tinygo build -o main.wasm -scheduler=none --no-debug -target=wasi .)

.PHONY: build-tinygo
build-tinygo: examples/nodenumber/main.wasm examples/advanced/main.wasm guest/testdata/cyclestate/main.wasm guest/testdata/filter/main.wasm guest/testdata/score/main.wasm guest/testdata/bind/main.wasm
build-tinygo: examples/nodenumber/main.wasm examples/advanced/main.wasm guest/testdata/cyclestate/main.wasm guest/testdata/filter/main.wasm guest/testdata/score/main.wasm guest/testdata/bind/main.wasm guest/testdata/reserve/main.wasm

%/main-debug.wasm: %/main.go
@(cd $(@D); tinygo build -o main-debug.wasm -gc=custom -tags=custommalloc -scheduler=none -target=wasi .)
Expand Down
8 changes: 8 additions & 0 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ type ScoreExtensions interface {
NormalizeScore(state CycleState, pod proto.Pod, scores NodeScore) (map[string]int, *Status)
}

// ReservePlugin is a WebAssembly implementation of framework.ReservePlugin.
type ReservePlugin interface {
Plugin

Reserve(state CycleState, p proto.Pod, nodeName string) *Status
Unreserve(state CycleState, p proto.Pod, nodeName string)
}

// PreBindPlugin is a WebAssembly implementation of framework.PreBindPlugin.
type PreBindPlugin interface {
Plugin
Expand Down
4 changes: 4 additions & 0 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/reserve"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/scoreextensions"
)
Expand Down Expand Up @@ -68,6 +69,9 @@ func Set(plugin api.Plugin) {
if plugin, ok := plugin.(api.ScoreExtensions); ok {
scoreextensions.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.ReservePlugin); ok {
reserve.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PreBindPlugin); ok {
prebind.SetPlugin(plugin)
}
Expand Down
90 changes: 90 additions & 0 deletions guest/reserve/reserve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package reserve exports an api.ReservePlugin to the host.
package reserve

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
)

// reserve is the current plugin assigned with SetPlugin.
var reserve api.ReservePlugin

// SetPlugin should be called in `main` to assign an api.ReservePlugin instance.
//
// For example:
//
// func main() {
// plugin := reservePlugin{}
// reserve.SetPlugin(plugin)
// }
//
// type reservePlugin struct{}
//
// func (reservePlugin) Reserve(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// // Write state you need on Reserve
// }
//
// func (reservePlugin) Unreserve(state api.CycleState, pod proto.Pod, nodeName string) {
// // Write state you need on Unreserve
// }
func SetPlugin(reservePlugin api.ReservePlugin) {
if reservePlugin == nil {
panic("nil reservePlugin")
}
reserve = reservePlugin
plugin.MustSet(reserve)
}

// prevent unused lint errors (lint is run with normal go).
var (
_ func() uint32 = _reserve
_ func() = _unreserve
)

// _reserve is only exported to the host.
//
//export reserve
func _reserve() uint32 { //nolint
if reserve == nil { // Then, the user didn't define one.
// Unlike most plugins we always export reserve so that we can reset
// the cycle state: return success to avoid no-op overhead.
return 0
}

nodeName := imports.NodeName()
status := reserve.Reserve(cyclestate.Values, cyclestate.Pod, nodeName)

return imports.StatusToCode(status)
}

// _unreserve is only exported to the host.
//
//export unreserve
func _unreserve() { //nolint
if reserve == nil { // Then, the user didn't define one.
// Unlike most plugins we always export unreserve so that we can reset
// the cycle state: return success to avoid no-op overhead.
return
}

nodeName := imports.NodeName()
reserve.Unreserve(cyclestate.Values, cyclestate.Pod, nodeName)
}
11 changes: 11 additions & 0 deletions guest/testdata/cyclestate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/reserve"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/scoreextensions"
protoapi "sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/api"
Expand Down Expand Up @@ -65,6 +66,7 @@ func main() {
prescore.SetPlugin(plugin)
score.SetPlugin(plugin)
scoreextensions.SetPlugin(plugin)
reserve.SetPlugin(plugin)
prebind.SetPlugin(plugin)
bind.SetPlugin(plugin)
postbind.SetPlugin(plugin)
Expand Down Expand Up @@ -176,6 +178,15 @@ func (statePlugin) NormalizeScore(state api.CycleState, pod proto.Pod, _ api.Nod
return
}

func (statePlugin) Reserve(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// Actually, it is not called from the cycle test, but it needs for Reserve plugin.
return
}

func (statePlugin) Unreserve(state api.CycleState, pod proto.Pod, nodeName string) {
mustFilterState(state)
}

func (statePlugin) PreBind(state api.CycleState, pod proto.Pod, _ string) (status *api.Status) {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from pre-filter")
Expand Down
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
53 changes: 53 additions & 0 deletions guest/testdata/reserve/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/reserve"
)

type extensionPoints interface {
api.ReservePlugin
}

func main() {
var plugin extensionPoints = reservePlugin{}
reserve.SetPlugin(plugin)
}

type reservePlugin struct{}

func (reservePlugin) Reserve(state api.CycleState, pod proto.Pod, nodeName string) *api.Status {
state.Write(nodeName+pod.Spec().GetNodeName(), "ok")
status := 0
if nodeName == "bad" {
status = 1
}
return &api.Status{Code: api.StatusCode(status), Reason: "name is " + nodeName}
}

func (reservePlugin) Unreserve(state api.CycleState, pod proto.Pod, nodeName string) {
val, ok := state.Read(nodeName + pod.Spec().GetNodeName())
if ok && val == "ok" {
state.Delete(nodeName + pod.Spec().GetNodeName())
return
}

panic("unreserve failed")
}
Binary file added guest/testdata/reserve/main.wasm
Binary file not shown.
6 changes: 6 additions & 0 deletions internal/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func RunAll(ctx context.Context, t Testing, plugin framework.Plugin, pod *v1.Pod
RequireSuccess(t, s)
}

if reserveP, ok := plugin.(framework.ReservePlugin); ok {
utam0k marked this conversation as resolved.
Show resolved Hide resolved
s = reserveP.Reserve(ctx, nil, pod, ni.Node().Name)
RequireSuccess(t, s)
reserveP.Unreserve(ctx, nil, pod, ni.Node().Name)
}

if prebindP, ok := plugin.(framework.PreBindPlugin); ok {
s = prebindP.PreBind(ctx, nil, pod, "")
RequireSuccess(t, s)
Expand Down
2 changes: 1 addition & 1 deletion internal/e2e/scheduler_perf/scheduler_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
Metrics: map[string]*labelValues{
"scheduler_framework_extension_point_duration_seconds": {
label: extensionPointsLabelName,
values: []string{"PreFilter", "Filter", "PostFilter", "PreScore", "Score", "PreBind", "Bind", "PostBind"},
values: []string{"PreFilter", "Filter", "PostFilter", "PreScore", "Score", "Reserve", "PreBind", "Bind", "PostBind"},
},
"scheduler_scheduling_attempt_duration_seconds": nil,
"scheduler_pod_scheduling_duration_seconds": nil,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ replace (
)

require (
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/tetratelabs/wazero v1.3.1
k8s.io/api v0.27.3
Expand Down Expand Up @@ -73,7 +74,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/cel-go v0.12.6 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
Expand Down
41 changes: 41 additions & 0 deletions scheduler/plugin/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
guestExportPreScore = "prescore"
guestExportScore = "score"
guestExportNormalizeScore = "normalizescore"
guestExportReserve = "reserve"
guestExportUnreserve = "unreserve"
guestExportPreBind = "prebind"
guestExportBind = "bind"
guestExportPostBind = "postbind"
Expand All @@ -52,6 +54,8 @@ type guest struct {
prescoreFn wazeroapi.Function
scoreFn wazeroapi.Function
normalizescoreFn wazeroapi.Function
reserveFn wazeroapi.Function
unreserveFn wazeroapi.Function
prebindFn wazeroapi.Function
bindFn wazeroapi.Function
postbindFn wazeroapi.Function
Expand Down Expand Up @@ -102,6 +106,8 @@ func (pl *wasmPlugin) newGuest(ctx context.Context) (*guest, error) {
prescoreFn: g.ExportedFunction(guestExportPreScore),
scoreFn: g.ExportedFunction(guestExportScore),
normalizescoreFn: g.ExportedFunction(guestExportNormalizeScore),
reserveFn: g.ExportedFunction(guestExportReserve),
unreserveFn: g.ExportedFunction(guestExportUnreserve),
prebindFn: g.ExportedFunction(guestExportPreBind),
bindFn: g.ExportedFunction(guestExportBind),
postbindFn: g.ExportedFunction(guestExportPostBind),
Expand Down Expand Up @@ -207,6 +213,31 @@ func (g *guest) normalizeScore(ctx context.Context) (framework.NodeScoreList, *f
return normalizedScoreList, framework.NewStatus(framework.Code(statusCode), statusReason)
}

// reserve calls guestExportReserve.
func (g *guest) reserve(ctx context.Context) *framework.Status {
defer g.out.Reset()
callStack := g.callStack

if err := g.reserveFn.CallWithStack(ctx, callStack); err != nil {
return framework.AsStatus(decorateError(g.out, guestExportReserve, err))
}

statusCode := int32(callStack[0])
statusReason := paramsFromContext(ctx).resultStatusReason
return framework.NewStatus(framework.Code(statusCode), statusReason)
}

// unreserve calls guestExportUnreserve.
func (g *guest) unreserve(ctx context.Context) {
defer g.out.Reset()
callStack := g.callStack

logger := klog.FromContext(ctx)
if err := g.unreserveFn.CallWithStack(ctx, callStack); err != nil {
logger.Error(decorateError(g.out, guestExportUnreserve, err), "failed unreserve")
}
}

// preBind calls guestExportPreBind.
func (g *guest) preBind(ctx context.Context) *framework.Status {
defer g.out.Reset()
Expand Down Expand Up @@ -294,6 +325,16 @@ func detectInterfaces(exportedFns map[string]wazeroapi.FunctionDefinition) (inte
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
}
e |= iScoreExtensions
case guestExportReserve:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i32}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
}
e |= iReservePlugin
case guestExportUnreserve:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> ()", name)
}
e |= iReservePlugin
case guestExportPreBind:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i32}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
Expand Down
18 changes: 12 additions & 6 deletions scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)
Expand Down Expand Up @@ -356,8 +357,10 @@ var _ framework.ReservePlugin = (*wasmPlugin)(nil)

// Reserve implements the same method as documented on framework.ReservePlugin.
func (pl *wasmPlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
params := &stack{pod: pod, nodeName: nodeName}
ctx = context.WithValue(ctx, stackKey{}, params)
if err := pl.pool.doWithSchedulingGuest(ctx, pod.UID, func(g *guest) {
// TODO: partially implemented for testing
status = g.reserve(ctx)
}); err != nil {
status = framework.AsStatus(err)
}
Expand All @@ -366,13 +369,16 @@ func (pl *wasmPlugin) Reserve(ctx context.Context, state *framework.CycleState,

// Unreserve implements the same method as documented on framework.ReservePlugin.
func (pl *wasmPlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
// Note: Unlike the below diagram, this is not a part of the scheduling
// cycle, rather the binding on error.
// https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/#extension-points

defer pl.pool.freeFromBinding(pod.UID) // the cycle is over, put it back into the pool.

// TODO: partially implemented for testing
params := &stack{pod: pod, nodeName: nodeName}
ctx = context.WithValue(ctx, stackKey{}, params)
logger := klog.FromContext(ctx)
if err := pl.pool.doWithSchedulingGuest(ctx, pod.UID, func(g *guest) {
g.unreserve(ctx)
}); err != nil {
logger.Error(err, "doWithSchedulingGuest Failed")
}
}

var _ framework.PreBindPlugin = (*wasmPlugin)(nil)
Expand Down
Loading
Loading