Skip to content

Commit

Permalink
pluginfactory
Browse files Browse the repository at this point in the history
  • Loading branch information
Gekko0114 committed Jan 3, 2024
1 parent a86390f commit b4fa058
Show file tree
Hide file tree
Showing 29 changed files with 176 additions and 159 deletions.
31 changes: 3 additions & 28 deletions examples/advanced/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,15 @@ import (
_ "github.com/wasilibs/nottinygc"

"sigs.k8s.io/kube-scheduler-wasm-extension/examples/advanced/plugin"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
handleapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/handle/api"
klog "sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
)

// main is compiled to an exported Wasm function named "_start", called by the
// Wasm scheduler plugin during initialization.
func main() {
enqueue.SetPlugin(func(klog klog.Klog, jsonConfig []byte, h handleapi.Handle) api.EnqueueExtensions {
p := pluginInitializer(klog, jsonConfig, h)
return p.(api.EnqueueExtensions)
})
prescore.SetPlugin(func(klog klog.Klog, jsonConfig []byte, h handleapi.Handle) api.PreScorePlugin {
p := pluginInitializer(klog, jsonConfig, h)
return p.(api.PreScorePlugin)
})
score.SetPlugin(func(klog klog.Klog, jsonConfig []byte, h handleapi.Handle) api.ScorePlugin {
p := pluginInitializer(klog, jsonConfig, h)
return p.(api.ScorePlugin)
})
}

func pluginInitializer(klog klog.Klog, jsonConfig []byte, h handleapi.Handle) api.Plugin {
// The plugin package uses only normal Go code, which allows it to be
// unit testable via `tinygo test -target=wasi` as well normal `go test`.
//
// The real implementations use Wasm host functions
// (go:wasmimport), which cannot be tested with `tinygo test -target=wasi`.
plugin, err := plugin.New(klog, jsonConfig, h)
if err != nil {
panic(err)
}
return plugin
enqueue.SetPlugin(plugin.New)
prescore.SetPlugin(plugin.New)
score.SetPlugin(plugin.New)
}
Binary file modified examples/advanced/main.wasm
Binary file not shown.
6 changes: 2 additions & 4 deletions examples/advanced/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func (pl *NodeNumber) PreScore(state api.CycleState, pod proto.Pod, _ proto.Node

podnum, ok := lastNumber(pod.Spec().GetNodeName())
if !ok {
if recorder != nil {
recorder.Eventf(pod, nil, "PreScore", "not match lastNumber", "Skip", "")
}
recorder.Eventf(pod, nil, "PreScore", "not match lastNumber", "Skip", "")
return nil // return success even if its suffix is non-number.
}

Expand Down Expand Up @@ -135,7 +133,7 @@ func lastNumber(str string) (uint8, bool) {
//
// Note: This accepts config instead of implicitly calling config.Get for
// testing.
func New(klog klog.Klog, jsonConfig []byte, h handleapi.Handle) (*NodeNumber, error) {
func New(klog klog.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) {
var args nodeNumberArgs
if jsonConfig != nil {
if err := json.Unmarshal(jsonConfig, &args); err != nil {
Expand Down
8 changes: 3 additions & 5 deletions examples/nodenumber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func main() {
}
klog.Info("NodeNumberArgs is successfully applied")
}
plugin.Set(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.Plugin {
return &NodeNumber{reverse: args.Reverse, handle: h}
plugin.Set(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) {
return &NodeNumber{reverse: args.Reverse, handle: h}, nil
})
}

Expand Down Expand Up @@ -95,9 +95,7 @@ func (pl *NodeNumber) PreScore(state api.CycleState, pod proto.Pod, _ proto.Node

podnum, ok := lastNumber(pod.Spec().GetNodeName())
if !ok {
if recorder != nil {
recorder.Eventf(pod, nil, "PreScore", "not match lastNumber", "Skip", "")
}
recorder.Eventf(pod, nil, "PreScore", "not match lastNumber", "Skip", "")
return nil // return success even if its suffix is non-number.
}

Expand Down
Binary file modified examples/nodenumber/main.wasm
Binary file not shown.
15 changes: 9 additions & 6 deletions guest/bind/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog"
klogapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog/api"
)

// bind is the current plugin assigned with SetPlugin.
Expand All @@ -36,19 +35,23 @@ var bind api.BindPlugin
//
// func main() {
// plugin := bindPlugin{}
// bind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.BindPlugin { return plugin })
// bind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// }
//
// type bindPlugin struct{}
//
// func (bindPlugin) Bind(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status) {
// // Write state you need on Bind
// }
func SetPlugin(pluginInitializer func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.BindPlugin) {
func SetPlugin(pluginFactory handleapi.PluginFactory) {
handle := handle.NewFrameworkHandle()
bind = pluginInitializer(klog.Get(), config.Get(), handle)
if bind == nil {
panic("nil bindPlugin")
p, err := pluginFactory(klog.Get(), config.Get(), handle)
if err != nil {
panic(err)
}
bind, ok := p.(api.BindPlugin)
if !ok || bind == nil {
panic("nil BindPlugin or a plugin is not compatible with BindPlugin type")
}
plugin.MustSet(bind)
}
Expand Down
13 changes: 8 additions & 5 deletions guest/enqueue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ import (
handleapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/handle/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog"
klogapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog/api"
)

// enqueue is the current plugin assigned with SetPlugin.
var enqueue api.EnqueueExtensions

// SetPlugin is exposed to prevent package cycles.
func SetPlugin(pluginInitializer func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.EnqueueExtensions) {
func SetPlugin(pluginFactory handleapi.PluginFactory) {
handle := handle.NewFrameworkHandle()
enqueue = pluginInitializer(klog.Get(), config.Get(), handle)
if enqueue == nil {
panic("nil enqueueExtensions")
p, err := pluginFactory(klog.Get(), config.Get(), handle)
if err != nil {
panic(err)
}
enqueue, ok := p.(api.EnqueueExtensions)
if !ok || enqueue == nil {
panic("nil EnqueueExtensions or a plugin is not compatible with EnqueueExtensions type")
}
plugin.MustSet(enqueue)
}
Expand Down
15 changes: 9 additions & 6 deletions guest/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
internalproto "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog"
klogapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog/api"
protoapi "sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/api"
)

Expand All @@ -41,7 +40,7 @@ var filter api.FilterPlugin
// For example:
//
// func main() {
// filter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.FilterPlugin { return plugin })
// filter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// }
//
// type filterPlugin struct{}
Expand All @@ -51,11 +50,15 @@ var filter api.FilterPlugin
// }
//
// Note: If you need state, you can assign it with prefilter.SetPlugin.
func SetPlugin(pluginInitializer func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.FilterPlugin) {
func SetPlugin(pluginFactory handleapi.PluginFactory) {
handle := handle.NewFrameworkHandle()
filter = pluginInitializer(klog.Get(), config.Get(), handle)
if filter == nil {
panic("nil filterPlugin")
p, err := pluginFactory(klog.Get(), config.Get(), handle)
if err != nil {
panic(err)
}
filter, ok := p.(api.FilterPlugin)
if !ok || filter == nil {
panic("nil FilterPlugin or a plugin is not compatible with FilterPlugin type")
}
plugin.MustSet(filter)
}
Expand Down
5 changes: 5 additions & 0 deletions guest/handle/api/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package api

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
internalproto "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/proto"
klogapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog/api"
)

// Handle provides data and some tools that plugins can use.
Expand Down Expand Up @@ -45,3 +47,6 @@ type UnimplementedEventRecorder struct{}
// Eventf implements events.EventRecorder.Eventf()
func (UnimplementedEventRecorder) Eventf(regarding internalproto.KObject, related internalproto.KObject, eventtype, reason, action, note string) {
}

// PluginFactory is a type of function like runtime.PluginFactory
type PluginFactory = func(klog klogapi.Klog, jsonConfig []byte, h Handle) (api.Plugin, error)
12 changes: 8 additions & 4 deletions guest/internal/prefilter/prefilter_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ import (
var prefilter api.PreFilterPlugin

// SetPlugin is exposed to prevent package cycles.
func SetPlugin(pluginInitializer func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PreFilterPlugin, klog klogapi.Klog, jsonConfig []byte) {
func SetPlugin(pluginFactory handleapi.PluginFactory, klog klogapi.Klog, jsonConfig []byte) {
handle := handle.NewFrameworkHandle()
prefilter = pluginInitializer(klog, jsonConfig, handle)
if prefilter == nil {
panic("nil prefilterPlugin")
p, err := pluginFactory(klog, jsonConfig, handle)
if err != nil {
panic(err)
}
prefilter, ok := p.(api.PreFilterPlugin)
if !ok || prefilter == nil {
panic("nil PreFilterPlugin or a plugin is not compatible with PreFilterPlugin type")
}
plugin.MustSet(prefilter)
}
Expand Down
59 changes: 32 additions & 27 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,44 +49,49 @@ import (
//
// func main() {
// plugin := myPlugin{}
// prefilter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PreFilterPlugin { return plugin })
// filter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.FilterPlugin { return plugin })
// prefilter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// filter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// }
func Set(pluginInitializer func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.Plugin) {
func Set(pluginFactory handleapi.PluginFactory) {
handle := handle.NewFrameworkHandle()
plugin := pluginInitializer(klog.Get(), config.Get(), handle)

if plugin, ok := plugin.(api.EnqueueExtensions); ok {
enqueue.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.EnqueueExtensions { return plugin })
plugin, err := pluginFactory(klog.Get(), config.Get(), handle)
if err != nil {
panic(err)
}
pf := func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) {
return plugin, nil
}
if _, ok := plugin.(api.EnqueueExtensions); ok {
enqueue.SetPlugin(pf)
}
if plugin, ok := plugin.(api.PreFilterPlugin); ok {
prefilter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PreFilterPlugin { return plugin })
if _, ok := plugin.(api.PreFilterPlugin); ok {
prefilter.SetPlugin(pf)
}
if plugin, ok := plugin.(api.FilterPlugin); ok {
filter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.FilterPlugin { return plugin })
if _, ok := plugin.(api.FilterPlugin); ok {
filter.SetPlugin(pf)
}
if plugin, ok := plugin.(api.PostFilterPlugin); ok {
postfilter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PostFilterPlugin { return plugin })
if _, ok := plugin.(api.PostFilterPlugin); ok {
postfilter.SetPlugin(pf)
}
if plugin, ok := plugin.(api.PreScorePlugin); ok {
prescore.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PreScorePlugin { return plugin })
if _, ok := plugin.(api.PreScorePlugin); ok {
prescore.SetPlugin(pf)
}
if plugin, ok := plugin.(api.ScorePlugin); ok {
score.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.ScorePlugin { return plugin })
if _, ok := plugin.(api.ScorePlugin); ok {
score.SetPlugin(pf)
}
if plugin, ok := plugin.(api.ScoreExtensions); ok {
scoreextensions.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.ScoreExtensions { return plugin })
if _, ok := plugin.(api.ScoreExtensions); ok {
scoreextensions.SetPlugin(pf)
}
if plugin, ok := plugin.(api.ReservePlugin); ok {
reserve.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.ReservePlugin { return plugin })
if _, ok := plugin.(api.ReservePlugin); ok {
reserve.SetPlugin(pf)
}
if plugin, ok := plugin.(api.PreBindPlugin); ok {
prebind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PreBindPlugin { return plugin })
if _, ok := plugin.(api.PreBindPlugin); ok {
prebind.SetPlugin(pf)
}
if plugin, ok := plugin.(api.BindPlugin); ok {
bind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.BindPlugin { return plugin })
if _, ok := plugin.(api.BindPlugin); ok {
bind.SetPlugin(pf)
}
if plugin, ok := plugin.(api.PostBindPlugin); ok {
postbind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PostBindPlugin { return plugin })
if _, ok := plugin.(api.PostBindPlugin); ok {
postbind.SetPlugin(pf)
}
}
17 changes: 10 additions & 7 deletions guest/postbind/postbind.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog"
klogapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog/api"
)

// postbind is the current plugin assigned with SetPlugin.
Expand All @@ -36,8 +35,8 @@ var postbind api.PostBindPlugin
//
// func main() {
// plugin := bindPlugin{}
// bind.SetPlugin(func(h handleapi.Handle) api.BindPlugin { return plugin })
// postbind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PostBindPlugin { return plugin })
// bind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// postbind.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// }
//
// type bindPlugin struct{}
Expand All @@ -49,11 +48,15 @@ var postbind api.PostBindPlugin
// func (bindPlugin) PostBind(state api.CycleState, pod proto.Pod, nodeName string) {
// // Write state you need on PostBind
// }
func SetPlugin(pluginInitializer func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PostBindPlugin) {
func SetPlugin(pluginFactory handleapi.PluginFactory) {
handle := handle.NewFrameworkHandle()
postbind = pluginInitializer(klog.Get(), config.Get(), handle)
if postbind == nil {
panic("nil postbindPlugin")
p, err := pluginFactory(klog.Get(), config.Get(), handle)
if err != nil {
panic(err)
}
postbind, ok := p.(api.PostBindPlugin)
if !ok || postbind == nil {
panic("nil PostBindPlugin or a plugin is not compatible with PostBindPlugin type")
}
plugin.MustSet(postbind)
}
Expand Down
17 changes: 10 additions & 7 deletions guest/postfilter/postfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog"
klogapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/klog/api"
)

// postfilter is the current plugin assigned with SetPlugin.
Expand All @@ -42,8 +41,8 @@ var postfilter api.PostFilterPlugin
//
// func main() {
// plugin := filterPlugin{}
// postfilter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PostFilterPlugin { return plugin })
// filter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.FilterPlugin { return plugin })
// postfilter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// filter.SetPlugin(func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) (api.Plugin, error) { return plugin, nil })
// }
//
// type filterPlugin struct{}
Expand All @@ -57,11 +56,15 @@ var postfilter api.PostFilterPlugin
// // Derive Filter for the node name using state set on PreFilter!
// return Filter, nil
// }
func SetPlugin(pluginInitializer func(klog klogapi.Klog, jsonConfig []byte, h handleapi.Handle) api.PostFilterPlugin) {
func SetPlugin(pluginFactory handleapi.PluginFactory) {
handle := handle.NewFrameworkHandle()
postfilter = pluginInitializer(klog.Get(), config.Get(), handle)
if postfilter == nil {
panic("nil postfilterPlugin")
p, err := pluginFactory(klog.Get(), config.Get(), handle)
if err != nil {
panic(err)
}
postfilter, ok := p.(api.PostFilterPlugin)
if !ok || postfilter == nil {
panic("nil PostFilterPlugin or a plugin is not compatible with PostFilterPlugin type")
}
plugin.MustSet(postfilter)
}
Expand Down
Loading

0 comments on commit b4fa058

Please sign in to comment.