Skip to content

Commit

Permalink
Only enqueue tasks that fit on the node in AskForMoreWork
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany committed Feb 18, 2025
1 parent 05356bf commit 2911775
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 47 deletions.
1 change: 1 addition & 0 deletions enterprise/server/scheduling/scheduler_server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_test(
deps = [
"//enterprise/server/remote_execution/execution_server",
"//enterprise/server/remote_execution/platform",
"//enterprise/server/tasksize",
"//enterprise/server/testutil/enterprise_testenv",
"//enterprise/server/testutil/testredis",
"//proto:remote_execution_go_proto",
Expand Down
28 changes: 20 additions & 8 deletions enterprise/server/scheduling/scheduler_server/scheduler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (h *executorHandle) Serve(ctx context.Context) error {
log.CtxWarningf(ctx, "Could not assign more work to executor %q: %s", executorID, err)
continue
}
log.CtxDebugf(ctx, "Assigned %d tasks to executor %s in response to AskForMoreWorkRequest", numEnqueued, executorID)
if numEnqueued == 0 {
newDelay := clampDuration(timeSinceLastWork*2, 5*time.Second, time.Minute)
h.setMoreWorkDelay(newDelay) // exponential backoff.
Expand Down Expand Up @@ -592,22 +593,22 @@ type rankedExecutionNode struct {
preferred bool
}

func (en *executionNode) CanFit(size *scpb.TaskSize) bool {
func nodeCanFitTask(en *scpb.ExecutionNode, size *scpb.TaskSize) bool {
if size.GetEstimatedMemoryBytes() > int64(float64(en.GetAssignableMemoryBytes())*tasksize.MaxResourceCapacityRatio) {
return false
}
if size.GetEstimatedMilliCpu() > int64(float64(en.GetAssignableMilliCpu())*tasksize.MaxResourceCapacityRatio) {
return false
}
for _, r := range size.GetCustomResources() {
if r.GetValue() > en.GetAssignableCustomResource(r.GetName()) {
if r.GetValue() > getAssignableCustomResource(en, r.GetName()) {
return false
}
}
return true
}

func (en *executionNode) GetAssignableCustomResource(name string) float32 {
func getAssignableCustomResource(en *scpb.ExecutionNode, name string) float32 {
for _, r := range en.GetAssignableCustomResources() {
if r.Name == name {
return r.Value
Expand All @@ -626,7 +627,7 @@ func (en *executionNode) String() string {
func nodesThatFit(nodes []*executionNode, taskSize *scpb.TaskSize) []*executionNode {
var out []*executionNode
for _, node := range nodes {
if node.CanFit(taskSize) {
if nodeCanFitTask(node.ExecutionNode, taskSize) {
out = append(out, node)
}
}
Expand Down Expand Up @@ -772,7 +773,7 @@ func (np *nodePool) NodeCount(ctx context.Context, taskSize *scpb.TaskSize) (int

fitCount := 0
for _, node := range np.nodes {
if node.CanFit(taskSize) {
if nodeCanFitTask(node.ExecutionNode, taskSize) {
fitCount++
}
}
Expand Down Expand Up @@ -1248,7 +1249,7 @@ func (s *SchedulerServer) RegisterAndStreamWork(stream scpb.Scheduler_RegisterAn
}

func (s *SchedulerServer) assignWorkToNode(ctx context.Context, handle *executorHandle, nodePoolKey nodePoolKey) (int, error) {
tasks, err := s.sampleUnclaimedTasks(ctx, tasksToEnqueueOnJoin, nodePoolKey)
tasks, err := s.sampleUnclaimedTasks(ctx, tasksToEnqueueOnJoin, nodePoolKey, handle.getRegistration())
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1446,7 +1447,7 @@ func (s *SchedulerServer) getOrCreatePool(key nodePoolKey) *nodePool {
return nodePool
}

func (s *SchedulerServer) sampleUnclaimedTasks(ctx context.Context, count int, nodePoolKey nodePoolKey) ([]*persistedTask, error) {
func (s *SchedulerServer) sampleUnclaimedTasks(ctx context.Context, count int, nodePoolKey nodePoolKey, node *scpb.ExecutionNode) ([]*persistedTask, error) {
nodePool, ok := s.getPool(nodePoolKey)
if !ok {
return nil, nil
Expand All @@ -1459,7 +1460,18 @@ func (s *SchedulerServer) sampleUnclaimedTasks(ctx context.Context, count int, n
if err != nil {
return nil, err
}
return tasks, nil
// Filter to tasks which can fit on the node.
//
// TODO: sample only from tasks that fit. Currently we randomly sample then
// do this filtering, which means that even if there are `count` tasks that
// can fit, we might wind up returning an empty list here.
tasksThatFit := make([]*persistedTask, 0, len(tasks))
for _, task := range tasks {
if nodeCanFitTask(node, task.metadata.GetTaskSize()) {
tasksThatFit = append(tasksThatFit, task)
}
}
return tasksThatFit, nil
}

func (s *SchedulerServer) readTask(ctx context.Context, taskID string) (*persistedTask, error) {
Expand Down
174 changes: 135 additions & 39 deletions enterprise/server/scheduling/scheduler_server/scheduler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_execution/execution_server"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_execution/platform"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/tasksize"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/testutil/enterprise_testenv"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/testutil/testredis"
"github.com/buildbuddy-io/buildbuddy/server/environment"
Expand Down Expand Up @@ -268,18 +269,36 @@ type task struct {
delay time.Duration
}

type Result[T any] struct {
Value T
Err error
}

type schedulerRequest struct {
request *scpb.RegisterAndStreamWorkRequest
reply chan error
}

type fakeExecutor struct {
t *testing.T
schedulerClient scpb.SchedulerClient

id string
id string
node *scpb.ExecutionNode

ctx context.Context

unhealthy atomic.Bool

mu sync.Mutex
tasks map[string]task

send chan *scpb.RegisterAndStreamWorkRequest
// Channel for inspecting replies from the scheduler within test cases. This
// is a buffered channel - sends are non-blocking to avoid blocking the
// scheduler goroutine. Tests that aren't interested in asserting on the
// scheduler's replies don't need to receive from this channel.
schedulerMessages chan *scpb.RegisterAndStreamWorkResponse
}

func newFakeExecutor(ctx context.Context, t *testing.T, schedulerClient scpb.SchedulerClient) *fakeExecutor {
Expand All @@ -289,56 +308,89 @@ func newFakeExecutor(ctx context.Context, t *testing.T, schedulerClient scpb.Sch
}

func newFakeExecutorWithId(ctx context.Context, t *testing.T, id string, schedulerClient scpb.SchedulerClient) *fakeExecutor {
node := &scpb.ExecutionNode{
ExecutorId: id,
Os: defaultOS,
Arch: defaultArch,
Host: "foo",
AssignableMemoryBytes: 64_000_000_000,
AssignableMilliCpu: 32_000,
}
ctx = log.EnrichContext(ctx, "executor_id", id)
return &fakeExecutor{
t: t,
schedulerClient: schedulerClient,
id: id,
ctx: ctx,
tasks: make(map[string]task),
t: t,
schedulerClient: schedulerClient,
id: id,
ctx: ctx,
tasks: make(map[string]task),
node: node,
send: make(chan *scpb.RegisterAndStreamWorkRequest),
schedulerMessages: make(chan *scpb.RegisterAndStreamWorkResponse, 128),
}
}

func (e *fakeExecutor) markUnhealthy() {
e.unhealthy.Store(true)
}

// Send sends a request to the scheduler.
func (e *fakeExecutor) Send(req *scpb.RegisterAndStreamWorkRequest) {
// Send via channel to the goroutine managing the stream, since it's not
// safe to send on the stream from multiple goroutines.
e.send <- req
}

func (e *fakeExecutor) Register() {
ctx := e.ctx
stream, err := e.schedulerClient.RegisterAndStreamWork(e.ctx)
require.NoError(e.t, err)
err = stream.Send(&scpb.RegisterAndStreamWorkRequest{
RegisterExecutorRequest: &scpb.RegisterExecutorRequest{
Node: &scpb.ExecutionNode{
ExecutorId: e.id,
Os: defaultOS,
Arch: defaultArch,
Host: "foo",
AssignableMemoryBytes: 1000000,
AssignableMilliCpu: 1000000,
}},
Node: e.node,
},
})
require.NoError(e.t, err)

recvChan := make(chan Result[*scpb.RegisterAndStreamWorkResponse])
go func() {
for {
req, err := stream.Recv()
if status.IsUnavailableError(err) {
return
}
require.NoError(e.t, err)
log.Infof("received req: %+v", req)
if e.unhealthy.Load() {
log.Infof("executor %s got task %q but is unhealthy -- ignoring so it times out", e.id, req.GetEnqueueTaskReservationRequest().GetTaskId())
} else {
err = stream.Send(&scpb.RegisterAndStreamWorkRequest{
EnqueueTaskReservationResponse: &scpb.EnqueueTaskReservationResponse{
TaskId: req.GetEnqueueTaskReservationRequest().GetTaskId(),
},
})
msg, err := stream.Recv()
recvChan <- Result[*scpb.RegisterAndStreamWorkResponse]{Value: msg, Err: err}
}
}()
go func() {
for {
select {
case msg := <-recvChan:
rsp, err := msg.Value, msg.Err
if status.IsUnavailableError(err) {
return
}
require.NoError(e.t, err)
log.CtxInfof(ctx, "Received scheduler message: %+v", rsp)
if e.unhealthy.Load() {
log.CtxInfof(ctx, "Executor %s got task %q but is unhealthy -- ignoring so it times out", e.id, rsp.GetEnqueueTaskReservationRequest().GetTaskId())
} else {
err = stream.Send(&scpb.RegisterAndStreamWorkRequest{
EnqueueTaskReservationResponse: &scpb.EnqueueTaskReservationResponse{
TaskId: rsp.GetEnqueueTaskReservationRequest().GetTaskId(),
},
})
require.NoError(e.t, err)
e.mu.Lock()
log.CtxInfof(ctx, "Executor %s got task %q with scheduling delay %s", e.id, rsp.GetEnqueueTaskReservationRequest().GetTaskId(), rsp.GetEnqueueTaskReservationRequest().GetDelay())
taskID := rsp.GetEnqueueTaskReservationRequest().GetTaskId()
e.tasks[taskID] = task{delay: rsp.GetEnqueueTaskReservationRequest().GetDelay().AsDuration()}
e.mu.Unlock()
// Best effort: notify the test of every scheduler reply.
select {
case e.schedulerMessages <- rsp:
default:
}
}
case req := <-e.send:
err := stream.Send(req)
require.NoError(e.t, err)
e.mu.Lock()
log.Infof("executor %s got task %q with scheduling delay %s", e.id, req.GetEnqueueTaskReservationRequest().GetTaskId(), req.GetEnqueueTaskReservationRequest().GetDelay())
e.tasks[req.GetEnqueueTaskReservationRequest().GetTaskId()] = task{delay: req.GetEnqueueTaskReservationRequest().GetDelay().AsDuration()}
e.mu.Unlock()
}
}
}()
Expand Down Expand Up @@ -458,18 +510,15 @@ func scheduleTask(ctx context.Context, t *testing.T, env environment.Env, props
for k, v := range props {
task.Command.Platform.Properties = append(task.Command.Platform.Properties, &repb.Platform_Property{Name: k, Value: v})
}
size := tasksize.Override(tasksize.Default(task), tasksize.Requested(task))
taskBytes, err := proto.Marshal(task)
require.NoError(t, err)
_, err = env.GetSchedulerService().ScheduleTask(ctx, &scpb.ScheduleTaskRequest{
TaskId: taskID,
Metadata: &scpb.SchedulingMetadata{
Os: defaultOS,
Arch: defaultArch,
TaskSize: &scpb.TaskSize{
EstimatedMemoryBytes: 100,
EstimatedMilliCpu: 100,
EstimatedFreeDiskBytes: 100,
},
Os: defaultOS,
Arch: defaultArch,
TaskSize: size,
},
SerializedTask: taskBytes,
})
Expand Down Expand Up @@ -741,3 +790,50 @@ func TestEnqueueTaskReservation_Exists(t *testing.T) {
require.Nil(t, err)
require.False(t, resp.GetExists())
}

func TestAskForMoreWorkOnlyEnqueuesTasksThatFitOnNode(t *testing.T) {
clock := clockwork.NewFakeClock()
env, ctx := getEnv(t, &schedulerOpts{clock: clock}, "user1")

// Register two nodes with different capacities.
largeExecutor := newFakeExecutorWithId(ctx, t, "large", env.GetSchedulerClient())
largeExecutor.node.AssignableMilliCpu = 32_000
largeExecutor.Register()

smallExecutor := newFakeExecutorWithId(ctx, t, "small", env.GetSchedulerClient())
smallExecutor.node.AssignableMilliCpu = 1000
smallExecutor.Register()

var rsp *scpb.RegisterAndStreamWorkResponse

// Schedule a task that only fits on largeExecutor.
taskID := scheduleTask(ctx, t, env, map[string]string{"EstimatedCPU": "8000m"})
// Ensure the task was enqueued on largeExecutor, but don't have
// largeExecutor claim the task, so that it's eligible to be enqueued
// as part of AskForMoreWork.
rsp = <-largeExecutor.schedulerMessages
require.Equal(t, taskID, rsp.GetEnqueueTaskReservationRequest().GetTaskId())

// Now have largeExecutor ask for more work. The scheduler should enqueue
// the unclaimed task.
largeExecutor.Send(&scpb.RegisterAndStreamWorkRequest{
AskForMoreWorkRequest: &scpb.AskForMoreWorkRequest{},
})
// Make sure we get an EnqueueTaskReservationRequest. The scheduler doesn't
// know what tasks are currently enqueued on largeExecutor, so it's fair
// to expect the task to be enqueued again.
// Note: we don't expect an AskForMoreWorkResponse here - the scheduler only
// sends a response to increase the client backoff after enqueuing 0 tasks.
rsp = <-largeExecutor.schedulerMessages
require.Equal(t, taskID, rsp.GetEnqueueTaskReservationRequest().GetTaskId())

// Have smallExecutor ask for more work now. The scheduler should not
// schedule any work on smallExecutor because the task doesn't fit. It
// should only reply with an AskForMoreWorkResponse to increase the client
// backoff.
smallExecutor.Send(&scpb.RegisterAndStreamWorkRequest{
AskForMoreWorkRequest: &scpb.AskForMoreWorkRequest{},
})
rsp = <-smallExecutor.schedulerMessages
require.Greater(t, rsp.GetAskForMoreWorkResponse().GetDelay().AsDuration(), time.Duration(0))
}

0 comments on commit 2911775

Please sign in to comment.