Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GetWaitingPod #121

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9981e4a
feat: add `GetWaitingPod` support
chansuke Aug 10, 2024
69131b5
docs: update the doc comment
chansuke Sep 14, 2024
9b2069a
refactor: remove redundant return
chansuke Sep 15, 2024
7d7902f
fix: add `GetName()` to show name correctly
chansuke Sep 15, 2024
351142d
implement GetWaitingPod
chansuke Sep 18, 2024
1547b90
refactor: remove redundant print deubg
chansuke Sep 25, 2024
ad55f28
feat: Support SharedLister for NodeInfo
sanposhiho Aug 20, 2024
a368266
refactor: remove unneeded function
chansuke Sep 25, 2024
2ea9a0c
trying to complete `GetWaitingPod` function
chansuke Oct 19, 2024
9705a8a
remove print debug
chansuke Oct 19, 2024
54bb5cb
fix the condtion
chansuke Oct 19, 2024
4b79c20
update
chansuke Oct 19, 2024
0091211
fix error message of postbind
chansuke Oct 19, 2024
c40d780
Update test data
chansuke Nov 3, 2024
7add63f
Remove unused methods
chansuke Nov 3, 2024
d85fb1b
fix: typo
chansuke Nov 16, 2024
7a7f07a
fix: restore the test
chansuke Nov 16, 2024
c4ab1f6
fix: remove allow_waiting_pod
chansuke Nov 17, 2024
04172a3
fix: remove redundant code
chansuke Nov 20, 2024
ad967c7
fix: address the feedback
chansuke Jan 5, 2025
c985c7a
fix: remove unused function
chansuke Jan 9, 2025
86ac5ba
fix: remove unused function
chansuke Jan 11, 2025
6d48cc7
fix: replace redundant function
chansuke Jan 11, 2025
c342746
fix: update GetWaitingPod
chansuke Jan 11, 2025
fddccd7
fix: update testcase
chansuke Jan 11, 2025
16fc837
fix: update testdata
chansuke Jan 11, 2025
d18f9e1
fix: remove redundant value
chansuke Jan 11, 2025
ff59f79
fix: remove unnecessary function
chansuke Jan 14, 2025
437376e
fix: move waitingPod
chansuke Jan 14, 2025
6d32551
reafactor: unify waitingPod
chansuke Jan 14, 2025
7e56b9a
refactor: remove redundant function
chansuke Jan 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified examples/advanced/main.wasm
Binary file not shown.
Binary file modified examples/imagelocality/main.wasm
Binary file not shown.
Binary file modified examples/nodenumber/main.wasm
Binary file not shown.
7 changes: 7 additions & 0 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,10 @@ type NodeScore interface {
// which is keyed by the node name and valued by the score.
Map() map[string]int
}

// WaitingPod represents a pod currently waiting in the permit phase.
type WaitingPod interface {
GetPod() proto.Pod
// Reject declares the waiting pod unschedulable.
Reject(pluginName, msg string)
}
8 changes: 4 additions & 4 deletions guest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ go 1.22.0

replace sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto => ../kubernetes/proto

require sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto v0.0.0-00010101000000-000000000000

require (
github.com/google/go-cmp v0.5.9 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/protobuf v1.30.0
sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto v0.0.0-00010101000000-000000000000
)

require github.com/google/go-cmp v0.5.9 // indirect
30 changes: 28 additions & 2 deletions guest/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
limitations under the License.
*/

// Package prescore exports an api.PreScorePlugin to the host. Only import this
// package when setting Plugin, as doing otherwise will cause overhead.
// Package handle exports an api.RejectWaitingPod and GetWaitingPod to the host.
// Only import this package when setting Plugin, as doing otherwise will cause overhead.
package handle

import (
"runtime"

proto "google.golang.org/protobuf/proto"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
guestapi "sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
protoapi "sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/api"
)

func RejectWaitingPod(uid string) bool {
Expand All @@ -34,3 +39,24 @@ func RejectWaitingPod(uid string) bool {
runtime.KeepAlive(uid)
return wasmBool == 1
}

func GetWaitingPod(uid string) guestapi.WaitingPod {
ptr, size := mem.StringToPtr(uid)

var pod protoapi.Pod
err := mem.Update(
func(outPtr uint32, limit mem.BufLimit) (len uint32) {
getWaitingPod(ptr, size, outPtr, limit)
return limit
},
func(data []byte) error {
return proto.Unmarshal(data, &pod)
},
)
if err != nil {
return nil
}

waitingPod := make([]api.WaitingPod, size)
return waitingPod[0]
}
3 changes: 3 additions & 0 deletions guest/handle/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ package handle

import "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"

//go:wasmimport k8s.io/scheduler handle.get_waiting_pod
chansuke marked this conversation as resolved.
Show resolved Hide resolved
func getWaitingPod(input_ptr, input_size, ptr uint32, limit mem.BufLimit)

//go:wasmimport k8s.io/scheduler handle.reject_waiting_pod
func rejectWaitingPod(input_ptr, input_size, ptr uint32, limit mem.BufLimit)
3 changes: 3 additions & 0 deletions guest/handle/imports_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ package handle

import "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"

// getWaitingPod is stubbed for compilation outside TinyGo.
func getWaitingPod(uint32, uint32, uint32, mem.BufLimit) {}

// rejectWaitingPod is stubbed for compilation outside TinyGo.
func rejectWaitingPod(uint32, uint32, uint32, mem.BufLimit) {}
21 changes: 21 additions & 0 deletions guest/internal/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,24 @@ func SendAndGetUint64(input_ptr uint32, input_size uint32, fn func(input_ptr, in
fn(input_ptr, input_size, uint32(readBufPtr), readBufLimit)
return binary.LittleEndian.Uint64(readBuf)
}

func SendAndGetPodBytes(input_ptr uint32, input_size uint32, fn func(input_ptr, input_size, ptr uint32, limit BufLimit)) []byte {
chansuke marked this conversation as resolved.
Show resolved Hide resolved
readBuf := make([]byte, readBufLimit)
readBufPtr := uint32(uintptr(unsafe.Pointer(&readBuf[0])))

fn(input_ptr, input_size, readBufPtr, readBufLimit)

var length int
for i := 0; i < int(readBufLimit); i++ {
if readBuf[i] == 0 {
length = i
break
}
}

if length == 0 {
return nil
}

return readBuf[:length]
}
Binary file modified guest/testdata/bind/main.wasm
Binary file not shown.
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
Binary file modified guest/testdata/filter/main.wasm
Binary file not shown.
21 changes: 21 additions & 0 deletions guest/testdata/handle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func main() {
switch os.Args[1] {
case "rejectWaitingPod":
plugin = pluginForReject{}
case "getWaitingPod":
plugin = pluginForGet{}
}
}
prefilter.SetPlugin(plugin)
Expand Down Expand Up @@ -82,3 +84,22 @@ func (pluginForReject) Filter(_ api.CycleState, pod proto.Pod, nodeInfo api.Node
Code: api.StatusCodeSuccess,
}
}

// pluginForGet checks the function of GetWaitingPod
type pluginForGet struct{ noopPlugin }

func (pluginForGet) Filter(_ api.CycleState, pod proto.Pod, nodeInfo api.NodeInfo) *api.Status {
// Call GetWaitingPod first
waitingPod := handle.GetWaitingPod(pod.GetUid())

if waitingPod == nil {
return &api.Status{
Code: api.StatusCodeError,
Reason: "No waiting pod found for UID: " + pod.GetUid(),
}
}

return &api.Status{
Code: api.StatusCodeSuccess,
}
}
Binary file modified guest/testdata/handle/main.wasm
Binary file not shown.
Binary file modified guest/testdata/permit/main.wasm
Binary file not shown.
Binary file modified guest/testdata/reserve/main.wasm
Binary file not shown.
Binary file modified guest/testdata/score/main.wasm
Binary file not shown.
35 changes: 35 additions & 0 deletions scheduler/plugin/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
k8sSchedulerResultNormalizedScoreList = "result.normalized_score_list"
k8sSchedulerHandleEventRecorderEventf = "handle.eventrecorder.eventf"
k8sSchedulerHandleRejectWaitingPod = "handle.reject_waiting_pod"
k8sSchedulerHandleGetWaitingPod = "handle.get_waiting_pod"
)

func instantiateHostApi(ctx context.Context, runtime wazero.Runtime, handle framework.Handle) (wazeroapi.Module, error) {
Expand Down Expand Up @@ -131,6 +132,9 @@ func instantiateHostScheduler(ctx context.Context, runtime wazero.Runtime, guest
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(host.k8sHandleRejectWaitingPodFn), []wazeroapi.ValueType{i32, i32, i32, i32}, []wazeroapi.ValueType{}).
WithParameterNames("buf", "buf_len").Export(k8sSchedulerHandleRejectWaitingPod).
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(host.k8sHandleGetWaitingPodFn), []wazeroapi.ValueType{i32, i32, i32, i32}, []wazeroapi.ValueType{}).
WithParameterNames("buf", "buf_len").Export(k8sSchedulerHandleGetWaitingPod).
Instantiate(ctx)
}

Expand Down Expand Up @@ -604,3 +608,34 @@ func (h host) k8sHandleRejectWaitingPodFn(ctx context.Context, mod wazeroapi.Mod
}
writeUint64(mod.Memory(), wasmBool, oBuf, oBufLimit)
}

func (h host) k8sHandleGetWaitingPodFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
iBuf := uint32(stack[0])
iBufLen := uint32(stack[1])
oBuf := uint32(stack[2])
oBufLimit := uint32(stack[3])

// Safely read UID from memory
b, ok := mod.Memory().Read(iBuf, iBufLen)
if !ok {
panic("out of memory reading getWaitingPod")
}
uid := types.UID(b)

// Fetch the waiting pod using the handle
waitingPod := h.handle.GetWaitingPod(uid)
if waitingPod == nil {
// Write an error message or handle this scenario in a non-crashing way
writeUint64(mod.Memory(), uint64(0), oBuf, oBufLimit)
stack[0] = uint64(0) // Indicate failure (return 0 to WebAssembly)
return
}

// Marshal the found pod and ensure no out-of-bounds memory writes
vLen := marshalIfUnderLimit(mod.Memory(), waitingPod.GetPod(), oBuf, oBufLimit)
if vLen == 0 {
panic("out of memory writing getWaitingPod result")
}
// Indicate success (return 1=success to WebAssembly)
stack[0] = uint64(1)
}
28 changes: 28 additions & 0 deletions scheduler/plugin/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,34 @@ func Test_k8sHandleEventRecorderEventFn(t *testing.T) {
}
}

func Test_k8sHandleGetWaitingPodFn(t *testing.T) {
recorder := &test.FakeRecorder{EventMsg: ""}
handle := &test.FakeHandle{Recorder: recorder}
h := host{handle: handle}

// Create a fake wasm module, which has data the guest should write.
mem := wazerotest.NewMemory(wazerotest.PageSize)
mod := wazerotest.NewModule(mem)
uid := types.UID("c6feae3a-7082-42a5-a5ec-6ae2e1603727")
copy(mem.Bytes, uid)

// Invoke the host function in the same way the guest would have.
h.k8sHandleGetWaitingPodFn(context.Background(), mod, []uint64{
0,
uint64(len(uid)),
0, // Ideally we should define some value, but we don't define it for now.
0, // Ideally we should define some value, but we don't define it for now.
})

// Checking the value stored on handle
have := handle.GetWaitingPodValue
want := uid

if want != have.GetPod().UID {
t.Fatalf("unexpected uid: %v != %v", want, have)
}
}

func Test_k8sHandleRejectWaitingPodFn(t *testing.T) {
recorder := &test.FakeRecorder{EventMsg: ""}
handle := &test.FakeHandle{Recorder: recorder}
Expand Down
1 change: 0 additions & 1 deletion scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func newWasmPlugin(ctx context.Context, pluginName string, runtime wazero.Runtim
guestModuleConfig: wazero.NewModuleConfig(),
instanceCounter: atomic.Uint64{},
}

if pl.pool, err = newGuestPool(ctx, pl.newGuest); err != nil {
return nil, fmt.Errorf("failed to create a guest pool: %w", err)
}
Expand Down
Loading
Loading