Skip to content

Commit

Permalink
Merge pull request #79 from Gekko0114/postbind
Browse files Browse the repository at this point in the history
Support postBind plugin
  • Loading branch information
k8s-ci-robot authored Dec 9, 2023
2 parents b22b122 + f6aab43 commit 66b69b4
Show file tree
Hide file tree
Showing 20 changed files with 360 additions and 19 deletions.
7 changes: 7 additions & 0 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ type BindPlugin interface {
Bind(state CycleState, pod proto.Pod, nodeName string) *Status
}

// PostBindPlugin is a WebAssembly implementation of framework.PostBindPlugin.
type PostBindPlugin interface {
Plugin

PostBind(state CycleState, pod proto.Pod, nodeName string)
}

type NodeInfo interface {
// Metadata is a convenience that triggers Get.
proto.Metadata
Expand Down
2 changes: 1 addition & 1 deletion guest/bind/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func SetPlugin(bindPlugin api.BindPlugin) {
panic("nil bindPlugin")
}
bind = bindPlugin
plugin.MustSet(bindPlugin)
plugin.MustSet(bind)
}

// prevent unused lint errors (lint is run with normal go).
Expand Down
4 changes: 4 additions & 0 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postbind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
Expand Down Expand Up @@ -73,4 +74,7 @@ func Set(plugin api.Plugin) {
if plugin, ok := plugin.(api.BindPlugin); ok {
bind.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PostBindPlugin); ok {
postbind.SetPlugin(plugin)
}
}
72 changes: 72 additions & 0 deletions guest/postbind/postbind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package postbind exports an api.PostBindPlugin to the host.
package postbind

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
)

// postbind is the current plugin assigned with SetPlugin.
var postbind api.PostBindPlugin

// SetPlugin should be called in `main` to assign an api.PostBindPlugin
// instance.
//
// For example:
//
// func main() {
// plugin := bindPlugin{}
// bind.SetPlugin(plugin)
// postbind.SetPlugin(plugin)
// }
//
// type bindPlugin struct{}
//
// func (bindPlugin) Bind(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// // Write state you need on Bind
// }
//
// func (bindPlugin) PostBind(state api.CycleState, pod proto.Pod, nodeName string) {
// // Write state you need on PostBind
// }
func SetPlugin(postbindPlugin api.PostBindPlugin) {
if postbindPlugin == nil {
panic("nil postbindPlugin")
}
postbind = postbindPlugin
plugin.MustSet(postbind)
}

// prevent unused lint errors (lint is run with normal go).
var _ func() = _postbind

// _postbind is only exported to the host.
//
//export postbind
func _postbind() { //nolint
if postbind == nil { // Then, the user didn't define one.
// This is likely caused by use of plugin.Set(p), where 'p' didn't
// implement PostBindPlugin: return success.
return
}

nodeName := imports.NodeName()
// The parameters passed are lazy with regard to host functions. This means
// a no-op plugin should not have any unmarshal penalty.
postbind.PostBind(cyclestate.Values, cyclestate.Pod, nodeName)
}
2 changes: 1 addition & 1 deletion guest/postfilter/postfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func SetPlugin(postfilterPlugin api.PostFilterPlugin) {
panic("nil postfilterPlugin")
}
postfilter = postfilterPlugin
plugin.MustSet(postfilterPlugin)
plugin.MustSet(postfilter)
}

// prevent unused lint errors (lint is run with normal go).
Expand Down
26 changes: 13 additions & 13 deletions guest/prebind/prebind.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,27 @@ var prebind api.PreBindPlugin
//
// For example:
//
// func main() {
// plugin := bindPlugin{}
// bind.SetPlugin(plugin)
// prebind.SetPlugin(plugin)
// }
// func main() {
// plugin := bindPlugin{}
// bind.SetPlugin(plugin)
// prebind.SetPlugin(plugin)
// }
//
// type bindPlugin struct{}
// type bindPlugin struct{}
//
// func (bindPlugin) Bind(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// // Write state you need on Bind
// }
// func (bindPlugin) Bind(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// // Write state you need on Bind
// }
//
// func (bindPlugin) PreBind(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// // Write state you need on Bind
// }
// func (bindPlugin) PreBind(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// // Write state you need on Bind
// }
func SetPlugin(prebindPlugin api.PreBindPlugin) {
if prebindPlugin == nil {
panic("nil prebindPlugin")
}
prebind = prebindPlugin
plugin.MustSet(prebindPlugin)
plugin.MustSet(prebind)
}

// prevent unused lint errors (lint is run with normal go).
Expand Down
20 changes: 20 additions & 0 deletions guest/testdata/bind/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/bind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postbind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
)

type extensionPoints interface {
api.PreBindPlugin
api.BindPlugin
api.PostBindPlugin
}

func main() {
Expand All @@ -40,10 +42,13 @@ func main() {
plugin = preBindPlugin{}
case "bind":
plugin = bindPlugin{}
case "postBind":
plugin = postBindPlugin{}
}
}
prebind.SetPlugin(plugin)
bind.SetPlugin(plugin)
postbind.SetPlugin(plugin)
}

// noopPlugin doesn't do anything, except evaluate each parameter.
Expand All @@ -63,6 +68,12 @@ func (noopPlugin) Bind(state api.CycleState, pod proto.Pod, nodeName string) *ap
return nil
}

func (noopPlugin) PostBind(state api.CycleState, pod proto.Pod, nodeName string) {
_, _ = state.Read("ok")
_ = pod.Spec()
_ = nodeName
}

// preBindPlugin returns the length of nodeName
type preBindPlugin struct{ noopPlugin }

Expand All @@ -84,3 +95,12 @@ func (bindPlugin) Bind(_ api.CycleState, _ proto.Pod, nodeName string) *api.Stat
}
return &api.Status{Code: api.StatusCode(status), Reason: "name is " + nodeName}
}

// postBindPlugin returns nothing
type postBindPlugin struct{ noopPlugin }

func (postBindPlugin) PostBind(_ api.CycleState, _ proto.Pod, nodeName string) {
if nodeName == "bad" {
panic("name is bad")
}
}
Binary file modified guest/testdata/bind/main.wasm
Binary file not shown.
15 changes: 13 additions & 2 deletions guest/testdata/cyclestate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/bind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postbind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
Expand Down Expand Up @@ -66,6 +67,7 @@ func main() {
scoreextensions.SetPlugin(plugin)
prebind.SetPlugin(plugin)
bind.SetPlugin(plugin)
postbind.SetPlugin(plugin)
}

const (
Expand Down Expand Up @@ -178,7 +180,6 @@ func (statePlugin) PreBind(state api.CycleState, pod proto.Pod, _ string) (statu
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from score")
}
mustFilterState(state)
if _, ok := state.Read(preBindStateKey); ok {
panic("didn't reset pre-bind state on pre-bind")
} else {
Expand All @@ -191,7 +192,6 @@ func (statePlugin) Bind(state api.CycleState, pod proto.Pod, _ string) (status *
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from pre-bind")
}
mustFilterState(state)
if val, ok := state.Read(preBindStateKey); !ok {
panic("didn't propagate pre-bind state from pre-bind")
} else {
Expand All @@ -200,6 +200,17 @@ func (statePlugin) Bind(state api.CycleState, pod proto.Pod, _ string) (status *
return
}

func (statePlugin) PostBind(state api.CycleState, pod proto.Pod, _ string) {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from pre-bind")
}
if val, ok := state.Read(preBindStateKey); !ok {
panic("didn't propagate pre-bind state from pre-bind")
} else if _, ok = val.(preBindStateVal)["bind"]; !ok {
panic("bind value lost propagating from bind")
}
}

// mustNotScoreState ensures that score state, written after filter, cannot
// be read by extension points before it.
//
Expand Down
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
4 changes: 4 additions & 0 deletions internal/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func RunAll(ctx context.Context, t Testing, plugin framework.Plugin, pod *v1.Pod
s = bindP.Bind(ctx, nil, pod, "")
RequireSuccess(t, s)
}

if postbindP, ok := plugin.(framework.PostBindPlugin); ok {
postbindP.PostBind(ctx, nil, pod, "")
}
return
}

Expand Down
2 changes: 1 addition & 1 deletion internal/e2e/scheduler_perf/scheduler_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
Metrics: map[string]*labelValues{
"scheduler_framework_extension_point_duration_seconds": {
label: extensionPointsLabelName,
values: []string{"PreFilter", "Filter", "PostFilter", "PreScore", "Score", "PreBind", "Bind"},
values: []string{"PreFilter", "Filter", "PostFilter", "PreScore", "Score", "PreBind", "Bind", "PostBind"},
},
"scheduler_scheduling_attempt_duration_seconds": nil,
"scheduler_pod_scheduling_duration_seconds": nil,
Expand Down
19 changes: 19 additions & 0 deletions scheduler/plugin/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/tetratelabs/wazero"
wazeroapi "github.com/tetratelabs/wazero/api"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

Expand All @@ -38,6 +39,7 @@ const (
guestExportNormalizeScore = "normalizescore"
guestExportPreBind = "prebind"
guestExportBind = "bind"
guestExportPostBind = "postbind"
)

type guest struct {
Expand All @@ -52,6 +54,7 @@ type guest struct {
normalizescoreFn wazeroapi.Function
prebindFn wazeroapi.Function
bindFn wazeroapi.Function
postbindFn wazeroapi.Function
callStack []uint64
}

Expand Down Expand Up @@ -101,6 +104,7 @@ func (pl *wasmPlugin) newGuest(ctx context.Context) (*guest, error) {
normalizescoreFn: g.ExportedFunction(guestExportNormalizeScore),
prebindFn: g.ExportedFunction(guestExportPreBind),
bindFn: g.ExportedFunction(guestExportBind),
postbindFn: g.ExportedFunction(guestExportPostBind),
callStack: callStack,
}, nil
}
Expand Down Expand Up @@ -231,6 +235,16 @@ func (g *guest) bind(ctx context.Context) *framework.Status {
return framework.NewStatus(framework.Code(statusCode), statusReason)
}

// postBind calls guestExportPostBind.
func (g *guest) postBind(ctx context.Context) {
defer g.out.Reset()
callStack := g.callStack
logger := klog.FromContext(ctx)
if err := g.postbindFn.CallWithStack(ctx, callStack); err != nil {
logger.Error(decorateError(g.out, guestExportPostBind, err), "failed postbind")
}
}

func decorateError(out fmt.Stringer, fn string, err error) error {
detail := out.String()
if detail != "" {
Expand Down Expand Up @@ -290,6 +304,11 @@ func detectInterfaces(exportedFns map[string]wazeroapi.FunctionDefinition) (inte
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
}
e |= iBindPlugin
case guestExportPostBind:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> ()", name)
}
e |= iPostBindPlugin
}
}
if e == 0 {
Expand Down
10 changes: 9 additions & 1 deletion scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)
Expand Down Expand Up @@ -406,7 +407,14 @@ func (pl *wasmPlugin) PostBind(ctx context.Context, state *framework.CycleState,
}

defer pl.pool.freeFromBinding(pod.UID) // the cycle is over, put it back into the pool.
// TODO: partially implemented for testing
params := &stack{pod: pod, nodeName: nodeName}
ctx = context.WithValue(ctx, stackKey{}, params)
logger := klog.FromContext(ctx)
if err := pl.pool.doWithSchedulingGuest(ctx, pod.UID, func(g *guest) {
g.postBind(ctx)
}); err != nil {
logger.Error(err, "doWithSchedulingGuest Failed")
}
}

var _ framework.PermitPlugin = (*wasmPlugin)(nil)
Expand Down
Loading

0 comments on commit 66b69b4

Please sign in to comment.