Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stateful deployments: find feasible node for sticky host volumes #24558

Merged
merged 31 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
534552f
host volume struct update
pkazmierczak Nov 27, 2024
855a267
Allocation update
pkazmierczak Nov 27, 2024
ca1640f
notes
pkazmierczak Dec 3, 2024
ecdacca
VolumeRequest and VolumeMount update
pkazmierczak Dec 3, 2024
910afb1
super hacky prototype
pkazmierczak Dec 4, 2024
1712ace
wip findPreferredNode
pkazmierczak Dec 5, 2024
1e27b73
CSI vols can be sticky too
pkazmierczak Dec 5, 2024
6d60520
refactor hasVolumes
pkazmierczak Dec 5, 2024
9de1963
findPreferredNode
pkazmierczak Dec 5, 2024
0352539
separate CSI and host volumes
pkazmierczak Dec 9, 2024
f7fcccf
accidental git snafu
pkazmierczak Dec 10, 2024
fa54f4d
correct findPreferredNode
pkazmierczak Dec 10, 2024
d2ab7ea
Tim's comment
pkazmierczak Dec 11, 2024
fa83314
hasVolumes
pkazmierczak Dec 11, 2024
33c3788
simplify
pkazmierczak Dec 11, 2024
2fddeb4
hasVolumes and tests
pkazmierczak Dec 11, 2024
bac67b2
Update nomad/structs/structs.go
pkazmierczak Dec 11, 2024
0a41c43
don't return too early
pkazmierczak Dec 11, 2024
820a7fc
make alloc ID available to the host volume checker
pkazmierczak Dec 12, 2024
865651c
fix returns
pkazmierczak Dec 12, 2024
e8df1bb
adjust computePlacements
pkazmierczak Dec 13, 2024
5addffd
Tim's comments
pkazmierczak Dec 16, 2024
43b8885
cleanup feasible.go
pkazmierczak Dec 16, 2024
a442d86
clean up reconciler
pkazmierczak Dec 16, 2024
699cd64
test
pkazmierczak Dec 16, 2024
62c5ba5
don't need CNI here
pkazmierczak Dec 16, 2024
fffe256
extra check
pkazmierczak Dec 16, 2024
5a33b7c
Tim's comments
pkazmierczak Dec 16, 2024
5e06fc4
refactor HostVolumeChecker
pkazmierczak Dec 17, 2024
067b8e7
debugging info for testing and demos
pkazmierczak Dec 18, 2024
9782b60
revert debug/demos stuff
pkazmierczak Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ type Allocation struct {
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
HostVolumeIDs []string
CSIVolumeIDs []string
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11114,6 +11114,13 @@ type Allocation struct {
// AllocatedResources is the total resources allocated for the task group.
AllocatedResources *AllocatedResources

// HostVolumeIDs is a list of host volume IDs that this allocation
// has claimed.
HostVolumeIDs []string

// CSIVolumeIDs is a list of CSI volume IDs that this allocation has claimed.
CSIVolumeIDs []string

// Metrics associated with this allocation
Metrics *AllocMetric

Expand Down
28 changes: 21 additions & 7 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"reflect"
"regexp"
"slices"
"strconv"
"strings"

Expand Down Expand Up @@ -137,23 +138,28 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumeReqs []*structs.VolumeRequest
namespace string
ctx Context
volumeReqs []*structs.VolumeRequest
hostVolumeIDs []string
namespace string
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
return &HostVolumeChecker{
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
hostVolumeIDs: []string{},
}
}

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) {
func (h *HostVolumeChecker) SetVolumes(
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string,
) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
h.hostVolumeIDs = allocHostVolumeIDs
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue // filter CSI volumes
Expand Down Expand Up @@ -181,7 +187,6 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
}

func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {

// Fast path: Requested no volumes. No need to check further.
if len(h.volumeReqs) == 0 {
return true
Expand Down Expand Up @@ -216,6 +221,15 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
if !capOk {
return false
}

if req.Sticky {
if slices.Contains(h.hostVolumeIDs, vol.ID) || len(h.hostVolumeIDs) == 0 {
return true
}

return false
}

} else if !req.ReadOnly {
// this is a static host volume and can only be mounted ReadOnly,
// validate that no requests for it are ReadWrite.
Expand Down
113 changes: 111 additions & 2 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand Down Expand Up @@ -359,7 +359,116 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
}
}

func TestHostVolumeChecker_Sticky(t *testing.T) {
ci.Parallel(t)

store, ctx := testContext(t)

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}

hostVolCapsReadWrite := []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
}

dhv := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[1].ID,
RequestedCapabilities: hostVolCapsReadWrite,
State: structs.HostVolumeStateReady,
}

nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{}
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {ID: dhv.ID}}

for _, node := range nodes {
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node))
}
must.NoError(t, store.UpsertHostVolume(1000, dhv))

stickyRequest := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
Sticky: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}

checker := NewHostVolumeChecker(ctx)

// alloc0 wants a previously registered volume ID that's available on node1
alloc0 := mock.Alloc()
alloc0.NodeID = nodes[1].ID
alloc0.HostVolumeIDs = []string{dhv.ID}

// alloc1 wants a volume ID that's available on node1 but hasn't used it
// before
alloc1 := mock.Alloc()
alloc1.NodeID = nodes[1].ID

// alloc2 wants a volume ID that's unrelated
alloc2 := mock.Alloc()
alloc2.NodeID = nodes[1].ID
alloc2.HostVolumeIDs = []string{uuid.Generate()}

// insert all the allocs into the state
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc0, alloc1, alloc2}))

cases := []struct {
name string
node *structs.Node
alloc *structs.Allocation
expect bool
}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include a case for a sticky volume request that hasn't been previously claimed.

{
"alloc asking for a sticky volume on an infeasible node",
nodes[0],
alloc0,
false,
},
{
"alloc asking for a sticky volume on a feasible node",
nodes[1],
alloc0,
true,
},
{
"alloc asking for a sticky volume on a feasible node for the first time",
nodes[1],
alloc1,
true,
},
{
"alloc asking for an unrelated volume",
nodes[1],
alloc2,
false,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
Expand Down
48 changes: 48 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package scheduler
import (
"fmt"
"runtime/debug"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -657,6 +658,18 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
"old_alloc_name", oldAllocName, "new_alloc_name", newAllocName)
}

// Are there sticky volumes requested by the task group for the first time? If
// yes, make sure the allocation stores their IDs for future reschedules.
var newHostVolumeIDs []string
for _, v := range tg.Volumes {
if v.Sticky {
if missing.PreviousAllocation() != nil && len(missing.PreviousAllocation().HostVolumeIDs) > 0 {
continue
}
newHostVolumeIDs = append(newHostVolumeIDs, option.Node.HostVolumes[v.Source].ID)
}
}

// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
Expand All @@ -681,6 +694,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
},
}

if len(newHostVolumeIDs) > 0 {
alloc.HostVolumeIDs = newHostVolumeIDs
}

// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if prevAllocation != nil {
Expand All @@ -689,6 +706,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
updateRescheduleTracker(alloc, prevAllocation, now)
}

if len(prevAllocation.HostVolumeIDs) > 0 {
alloc.HostVolumeIDs = prevAllocation.HostVolumeIDs
}

// If the allocation has task handles,
// copy them to the new allocation
propagateTaskState(alloc, prevAllocation, missing.PreviousLost())
Expand Down Expand Up @@ -838,6 +859,10 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes

if prevAllocation.HostVolumeIDs != nil {
selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs
}
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
Expand Down Expand Up @@ -910,6 +935,29 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
return preferredNode, nil
}
}

for _, vol := range place.TaskGroup().Volumes {
if !vol.Sticky {
continue
}

var preferredNode *structs.Node
preferredNode, err := s.state.NodeByID(nil, prev.NodeID)
if err != nil {
return nil, err
}

if preferredNode != nil && preferredNode.Ready() {
// if this node has at least one of the allocation volumes, it's a
// preferred one
for _, vol := range preferredNode.HostVolumes {
if slices.Contains(prev.HostVolumeIDs, vol.ID) {
return preferredNode, nil
}
}
}
}

return nil, nil
}

Expand Down
Loading
Loading