diff --git a/api/allocations.go b/api/allocations.go index b35e338c559..bf8059d32c2 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -278,6 +278,8 @@ type Allocation struct { Resources *Resources TaskResources map[string]*Resources AllocatedResources *AllocatedResources + HostVolumeIDs []string + CSIVolumeIDs []string Services map[string]string Metrics *AllocationMetric DesiredStatus string diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 2d28c003150..622af2d1325 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -11114,6 +11114,13 @@ type Allocation struct { // AllocatedResources is the total resources allocated for the task group. AllocatedResources *AllocatedResources + // HostVolumeIDs is a list of host volume IDs that this allocation + // has claimed. + HostVolumeIDs []string + + // CSIVolumeIDs is a list of CSI volume IDs that this allocation has claimed. + CSIVolumeIDs []string + // Metrics associated with this allocation Metrics *AllocMetric diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 60442f92e7f..69ab03de7c3 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -8,6 +8,7 @@ import ( "fmt" "reflect" "regexp" + "slices" "strconv" "strings" @@ -137,23 +138,28 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // HostVolumeChecker is a FeasibilityChecker which returns whether a node has // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { - ctx Context - volumeReqs []*structs.VolumeRequest - namespace string + ctx Context + volumeReqs []*structs.VolumeRequest + hostVolumeIDs []string + namespace string } // NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { return &HostVolumeChecker{ - ctx: ctx, - volumeReqs: []*structs.VolumeRequest{}, + ctx: ctx, + volumeReqs: []*structs.VolumeRequest{}, + hostVolumeIDs: []string{}, } } // SetVolumes takes the volumes required by a task group and updates the checker. -func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) { +func (h *HostVolumeChecker) SetVolumes( + allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string, +) { h.namespace = ns h.volumeReqs = []*structs.VolumeRequest{} + h.hostVolumeIDs = allocHostVolumeIDs for _, req := range volumes { if req.Type != structs.VolumeTypeHost { continue // filter CSI volumes @@ -181,7 +187,6 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { } func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { - // Fast path: Requested no volumes. No need to check further. if len(h.volumeReqs) == 0 { return true @@ -216,6 +221,15 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { if !capOk { return false } + + if req.Sticky { + if slices.Contains(h.hostVolumeIDs, vol.ID) || len(h.hostVolumeIDs) == 0 { + return true + } + + return false + } + } else if !req.ReadOnly { // this is a static host volume and can only be mounted ReadOnly, // validate that no requests for it are ReadWrite. diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 9c5a9aaf1a7..3351210c2ee 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) { alloc.NodeID = nodes[2].ID for i, c := range cases { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs) if act := checker.Feasible(c.Node); act != c.Result { t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) } @@ -359,7 +359,116 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs) + actual := checker.Feasible(tc.node) + must.Eq(t, tc.expect, actual) + }) + } +} + +func TestHostVolumeChecker_Sticky(t *testing.T) { + ci.Parallel(t) + + store, ctx := testContext(t) + + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + + hostVolCapsReadWrite := []*structs.HostVolumeCapability{ + { + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeReader, + }, + { + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeWriter, + }, + } + + dhv := &structs.HostVolume{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Name: "foo", + NodeID: nodes[1].ID, + RequestedCapabilities: hostVolCapsReadWrite, + State: structs.HostVolumeStateReady, + } + + nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{} + nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {ID: dhv.ID}} + + for _, node := range nodes { + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)) + } + must.NoError(t, store.UpsertHostVolume(1000, dhv)) + + stickyRequest := map[string]*structs.VolumeRequest{ + "foo": { + Type: "host", + Source: "foo", + Sticky: true, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }, + } + + checker := NewHostVolumeChecker(ctx) + + // alloc0 wants a previously registered volume ID that's available on node1 + alloc0 := mock.Alloc() + alloc0.NodeID = nodes[1].ID + alloc0.HostVolumeIDs = []string{dhv.ID} + + // alloc1 wants a volume ID that's available on node1 but hasn't used it + // before + alloc1 := mock.Alloc() + alloc1.NodeID = nodes[1].ID + + // alloc2 wants a volume ID that's unrelated + alloc2 := mock.Alloc() + alloc2.NodeID = nodes[1].ID + alloc2.HostVolumeIDs = []string{uuid.Generate()} + + // insert all the allocs into the state + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc0, alloc1, alloc2})) + + cases := []struct { + name string + node *structs.Node + alloc *structs.Allocation + expect bool + }{ + { + "alloc asking for a sticky volume on an infeasible node", + nodes[0], + alloc0, + false, + }, + { + "alloc asking for a sticky volume on a feasible node", + nodes[1], + alloc0, + true, + }, + { + "alloc asking for a sticky volume on a feasible node for the first time", + nodes[1], + alloc1, + true, + }, + { + "alloc asking for an unrelated volume", + nodes[1], + alloc2, + false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs) actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index f9fd669e592..60b4f7f1eed 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -6,6 +6,7 @@ package scheduler import ( "fmt" "runtime/debug" + "slices" "sort" "time" @@ -657,6 +658,18 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul "old_alloc_name", oldAllocName, "new_alloc_name", newAllocName) } + // Are there sticky volumes requested by the task group for the first time? If + // yes, make sure the allocation stores their IDs for future reschedules. + var newHostVolumeIDs []string + for _, v := range tg.Volumes { + if v.Sticky { + if missing.PreviousAllocation() != nil && len(missing.PreviousAllocation().HostVolumeIDs) > 0 { + continue + } + newHostVolumeIDs = append(newHostVolumeIDs, option.Node.HostVolumes[v.Source].ID) + } + } + // Create an allocation for this alloc := &structs.Allocation{ ID: uuid.Generate(), @@ -681,6 +694,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul }, } + if len(newHostVolumeIDs) > 0 { + alloc.HostVolumeIDs = newHostVolumeIDs + } + // If the new allocation is replacing an older allocation then we // set the record the older allocation id so that they are chained if prevAllocation != nil { @@ -689,6 +706,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul updateRescheduleTracker(alloc, prevAllocation, now) } + if len(prevAllocation.HostVolumeIDs) > 0 { + alloc.HostVolumeIDs = prevAllocation.HostVolumeIDs + } + // If the allocation has task handles, // copy them to the new allocation propagateTaskState(alloc, prevAllocation, missing.PreviousLost()) @@ -838,6 +859,10 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs } } selectOptions.PenaltyNodeIDs = penaltyNodes + + if prevAllocation.HostVolumeIDs != nil { + selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs + } } if preferredNode != nil { selectOptions.PreferredNodes = []*structs.Node{preferredNode} @@ -910,6 +935,29 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No return preferredNode, nil } } + + for _, vol := range place.TaskGroup().Volumes { + if !vol.Sticky { + continue + } + + var preferredNode *structs.Node + preferredNode, err := s.state.NodeByID(nil, prev.NodeID) + if err != nil { + return nil, err + } + + if preferredNode != nil && preferredNode.Ready() { + // if this node has at least one of the allocation volumes, it's a + // preferred one + for _, vol := range preferredNode.HostVolumes { + if slices.Contains(prev.HostVolumeIDs, vol.ID) { + return preferredNode, nil + } + } + } + } + return nil, nil } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index adda5e2cb2a..5d471423136 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -218,6 +218,121 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { } } +func TestServiceSched_JobRegister_StickyVolumes(t *testing.T) { + ci.Parallel(t) + + h := NewHarness(t) + + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + + hostVolCapsReadWrite := []*structs.HostVolumeCapability{ + { + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeReader, + }, + { + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeWriter, + }, + } + + dhv := &structs.HostVolume{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Name: "foo", + NodeID: nodes[1].ID, + RequestedCapabilities: hostVolCapsReadWrite, + State: structs.HostVolumeStateReady, + } + + nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{} + nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {ID: dhv.ID}} + + for _, node := range nodes { + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, 1000, node)) + } + must.NoError(t, h.State.UpsertHostVolume(1000, dhv)) + + stickyRequest := map[string]*structs.VolumeRequest{ + "foo": { + Type: "host", + Source: "foo", + Sticky: true, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }, + } + + // Create a job + job := mock.Job() + job.TaskGroups[0].Volumes = stickyRequest + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + must.NoError(t, h.Process(NewServiceScheduler, eval)) + + // Ensure the plan allocated + plan := h.Plans[0] + planned := make(map[string]*structs.Allocation) + for _, allocList := range plan.NodeAllocation { + for _, alloc := range allocList { + planned[alloc.ID] = alloc + } + } + must.MapLen(t, 10, planned) + + // Ensure that the allocations got the host volume ID added + for _, p := range planned { + must.Eq(t, p.PreviousAllocation, "") + must.Eq(t, p.HostVolumeIDs[0], dhv.ID) + } + + // Update the job to force a rolling upgrade + updated := job.Copy() + updated.TaskGroups[0].Tasks[0].Resources.CPU += 10 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) + + // Create a mock evaluation to handle the update + eval = &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + must.NoError(t, h.Process(NewServiceScheduler, eval)) + + // Ensure we have created only one new allocation + must.SliceLen(t, 2, h.Plans) + plan = h.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) + } + must.SliceLen(t, 10, newPlanned) + + // Ensure that the new allocations retain the host volume ID + for _, new := range newPlanned { + must.Eq(t, new.HostVolumeIDs[0], dhv.ID) + } +} + func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) { ci.Parallel(t) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 9d46edf8801..27f87e79745 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -118,8 +118,11 @@ type State interface { // CSIVolumeByID fetch CSI volumes, containing controller jobs CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error) + // HostVolumeByID fetches host volume by its ID HostVolumeByID(memdb.WatchSet, string, string, bool) (*structs.HostVolume, error) + // HostVolumesByNodeID gets an iterator with all the volumes attached to a + // given node HostVolumesByNodeID(memdb.WatchSet, string, state.SortOption) (memdb.ResultIterator, error) // LatestIndex returns the greatest index value for all indexes. diff --git a/scheduler/stack.go b/scheduler/stack.go index 1f2b6586886..f978c753f68 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -35,10 +35,11 @@ type Stack interface { } type SelectOptions struct { - PenaltyNodeIDs map[string]struct{} - PreferredNodes []*structs.Node - Preempt bool - AllocName string + PenaltyNodeIDs map[string]struct{} + PreferredNodes []*structs.Node + Preempt bool + AllocName string + AllocationHostVolumeIDs []string } // GenericStack is the Stack used for the Generic scheduler. It is @@ -156,7 +157,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) @@ -349,7 +350,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0])