Skip to content

Commit

Permalink
stateful deployments: find feasible node for sticky host volumes (#24558
Browse files Browse the repository at this point in the history
)

This changeset implements node feasibility checks for sticky host volumes.
  • Loading branch information
pkazmierczak authored Dec 18, 2024
1 parent 2545932 commit eec24f1
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 15 deletions.
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ type Allocation struct {
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
HostVolumeIDs []string
CSIVolumeIDs []string
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11114,6 +11114,13 @@ type Allocation struct {
// AllocatedResources is the total resources allocated for the task group.
AllocatedResources *AllocatedResources

// HostVolumeIDs is a list of host volume IDs that this allocation
// has claimed.
HostVolumeIDs []string

// CSIVolumeIDs is a list of CSI volume IDs that this allocation has claimed.
CSIVolumeIDs []string

// Metrics associated with this allocation
Metrics *AllocMetric

Expand Down
28 changes: 21 additions & 7 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"reflect"
"regexp"
"slices"
"strconv"
"strings"

Expand Down Expand Up @@ -137,23 +138,28 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumeReqs []*structs.VolumeRequest
namespace string
ctx Context
volumeReqs []*structs.VolumeRequest
hostVolumeIDs []string
namespace string
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
return &HostVolumeChecker{
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
hostVolumeIDs: []string{},
}
}

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) {
func (h *HostVolumeChecker) SetVolumes(
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string,
) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
h.hostVolumeIDs = allocHostVolumeIDs
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue // filter CSI volumes
Expand Down Expand Up @@ -181,7 +187,6 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
}

func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {

// Fast path: Requested no volumes. No need to check further.
if len(h.volumeReqs) == 0 {
return true
Expand Down Expand Up @@ -216,6 +221,15 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
if !capOk {
return false
}

if req.Sticky {
if slices.Contains(h.hostVolumeIDs, vol.ID) || len(h.hostVolumeIDs) == 0 {
return true
}

return false
}

} else if !req.ReadOnly {
// this is a static host volume and can only be mounted ReadOnly,
// validate that no requests for it are ReadWrite.
Expand Down
113 changes: 111 additions & 2 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand Down Expand Up @@ -359,7 +359,116 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
}
}

func TestHostVolumeChecker_Sticky(t *testing.T) {
ci.Parallel(t)

store, ctx := testContext(t)

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}

hostVolCapsReadWrite := []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
}

dhv := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[1].ID,
RequestedCapabilities: hostVolCapsReadWrite,
State: structs.HostVolumeStateReady,
}

nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{}
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {ID: dhv.ID}}

for _, node := range nodes {
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node))
}
must.NoError(t, store.UpsertHostVolume(1000, dhv))

stickyRequest := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
Sticky: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}

checker := NewHostVolumeChecker(ctx)

// alloc0 wants a previously registered volume ID that's available on node1
alloc0 := mock.Alloc()
alloc0.NodeID = nodes[1].ID
alloc0.HostVolumeIDs = []string{dhv.ID}

// alloc1 wants a volume ID that's available on node1 but hasn't used it
// before
alloc1 := mock.Alloc()
alloc1.NodeID = nodes[1].ID

// alloc2 wants a volume ID that's unrelated
alloc2 := mock.Alloc()
alloc2.NodeID = nodes[1].ID
alloc2.HostVolumeIDs = []string{uuid.Generate()}

// insert all the allocs into the state
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc0, alloc1, alloc2}))

cases := []struct {
name string
node *structs.Node
alloc *structs.Allocation
expect bool
}{
{
"alloc asking for a sticky volume on an infeasible node",
nodes[0],
alloc0,
false,
},
{
"alloc asking for a sticky volume on a feasible node",
nodes[1],
alloc0,
true,
},
{
"alloc asking for a sticky volume on a feasible node for the first time",
nodes[1],
alloc1,
true,
},
{
"alloc asking for an unrelated volume",
nodes[1],
alloc2,
false,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
Expand Down
48 changes: 48 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package scheduler
import (
"fmt"
"runtime/debug"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -657,6 +658,18 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
"old_alloc_name", oldAllocName, "new_alloc_name", newAllocName)
}

// Are there sticky volumes requested by the task group for the first time? If
// yes, make sure the allocation stores their IDs for future reschedules.
var newHostVolumeIDs []string
for _, v := range tg.Volumes {
if v.Sticky {
if missing.PreviousAllocation() != nil && len(missing.PreviousAllocation().HostVolumeIDs) > 0 {
continue
}
newHostVolumeIDs = append(newHostVolumeIDs, option.Node.HostVolumes[v.Source].ID)
}
}

// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
Expand All @@ -681,6 +694,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
},
}

if len(newHostVolumeIDs) > 0 {
alloc.HostVolumeIDs = newHostVolumeIDs
}

// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if prevAllocation != nil {
Expand All @@ -689,6 +706,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
updateRescheduleTracker(alloc, prevAllocation, now)
}

if len(prevAllocation.HostVolumeIDs) > 0 {
alloc.HostVolumeIDs = prevAllocation.HostVolumeIDs
}

// If the allocation has task handles,
// copy them to the new allocation
propagateTaskState(alloc, prevAllocation, missing.PreviousLost())
Expand Down Expand Up @@ -838,6 +859,10 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes

if prevAllocation.HostVolumeIDs != nil {
selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs
}
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
Expand Down Expand Up @@ -910,6 +935,29 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
return preferredNode, nil
}
}

for _, vol := range place.TaskGroup().Volumes {
if !vol.Sticky {
continue
}

var preferredNode *structs.Node
preferredNode, err := s.state.NodeByID(nil, prev.NodeID)
if err != nil {
return nil, err
}

if preferredNode != nil && preferredNode.Ready() {
// if this node has at least one of the allocation volumes, it's a
// preferred one
for _, vol := range preferredNode.HostVolumes {
if slices.Contains(prev.HostVolumeIDs, vol.ID) {
return preferredNode, nil
}
}
}
}

return nil, nil
}

Expand Down
Loading

0 comments on commit eec24f1

Please sign in to comment.