Skip to content

Commit

Permalink
dynamic host volumes: account for other claims in capability check
Browse files Browse the repository at this point in the history
When we feasibility check a dynamic host volume against a volume request, we
check the attachment mode and access mode. This only ensures that the
capabilities match, but doesn't enforce the semantics of the capabilities
against other claims that may be made on the allocation.

Add support for checking the requested capability against other allocations that
the volume claimed.

Ref: #24479
  • Loading branch information
tgross committed Dec 18, 2024
1 parent eec24f1 commit 366ccbf
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 26 deletions.
5 changes: 5 additions & 0 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs

vol = vol.Copy()
vol.Allocations = []*structs.AllocListStub{}
vol.Writers = []string{}

// we can't use AllocsByNodeTerminal because we only want to filter out
// allocs that are client-terminal, not server-terminal
Expand All @@ -43,6 +44,9 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs
}
for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes {
if volClaim.Type == structs.VolumeTypeHost && volClaim.Source == vol.Name {
if !volClaim.ReadOnly {
vol.Writers = append(vol.Writers, alloc.ID)
}
vol.Allocations = append(vol.Allocations, alloc.Stub(nil))
}
}
Expand Down Expand Up @@ -101,6 +105,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err
// Allocations are denormalized on read, so we don't want these to be
// written to the state store.
vol.Allocations = nil
vol.Writers = nil
vol.ModifyIndex = index

err = txn.Insert(TableHostVolumes, vol)
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type HostVolume struct {
// this host volume. They are denormalized on read and this field will be
// never written to Raft
Allocations []*AllocListStub `json:",omitempty"`

// Writers and is a list of alloc IDs from the Allocations field that can
// write to this volume. This count is denormalized on read, never written
// to Raft, and never returned by the API.
Writers []string `json:"-"`
}

type HostVolumeState string
Expand Down
87 changes: 68 additions & 19 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumeReqs []*structs.VolumeRequest
hostVolumeIDs []string
namespace string
ctx Context
volumeReqs []*structs.VolumeRequest
hostVolumeIDs []string
namespace string
previousAllocationID string
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
Expand All @@ -155,11 +156,11 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string,
) {
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string, prevAllocID string) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
h.hostVolumeIDs = allocHostVolumeIDs
h.previousAllocationID = prevAllocID
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue // filter CSI volumes
Expand Down Expand Up @@ -199,26 +200,20 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
}

if volCfg.ID != "" { // dynamic host volume
vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false)
vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, true)
if err != nil || vol == nil {
// node fingerprint has a dynamic volume that's no longer in the
// state store; this is only possible if the batched fingerprint
// update from a delete RPC is written before the delete RPC's
// raft entry completes
return false
}
if vol.State != structs.HostVolumeStateReady {
return false
}
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
capOk = true
break
}
}
if !capOk {
if !hostVolumeIsAvailable(vol,
structs.HostVolumeAccessMode(req.AccessMode),
structs.HostVolumeAttachmentMode(req.AttachmentMode),
req.ReadOnly,
h.previousAllocationID,
) {
return false
}

Expand All @@ -242,6 +237,60 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

// hostVolumeIsAvailable determines if a dynamic host volume is available for a request
func hostVolumeIsAvailable(hv *structs.HostVolume, reqAccess structs.HostVolumeAccessMode, reqAttach structs.HostVolumeAttachmentMode, readOnly bool, previousAllocID string) bool {

if hv.State != structs.HostVolumeStateReady {
return false
}

// pick a default capability based on the read-only flag. this happens here
// in the scheduler rather than job submit because we don't know whether a
// host volume is dynamic or not until we try to schedule it (ex. the same
// name could be static on one node and dynamic on another)
if reqAccess == structs.HostVolumeAccessModeUnknown {
if readOnly {
reqAccess = structs.HostVolumeAccessModeSingleNodeReader
} else {
reqAccess = structs.HostVolumeAccessModeSingleNodeWriter
}
}
if reqAttach == structs.HostVolumeAttachmentModeUnknown {
reqAttach = structs.HostVolumeAttachmentModeFilesystem
}

// check that the volume has the requested capability at all
var capOk bool
for _, cap := range hv.RequestedCapabilities {
if reqAccess == cap.AccessMode &&
reqAttach == cap.AttachmentMode {
capOk = true
break
}
}
if !capOk {
return false
}

// if no other allocations claim the volume, it's first-come-first-serve to
// determine how subsequent allocs can claim it
if len(hv.Allocations) == 0 {
return true
}

switch reqAccess {
case structs.HostVolumeAccessModeSingleNodeReader:
return readOnly
case structs.HostVolumeAccessModeSingleNodeWriter, structs.HostVolumeAccessModeSingleNodeSingleWriter:
return len(hv.Writers) == 0 ||
(len(hv.Writers) == 1 && hv.Writers[0] == previousAllocID)
case structs.HostVolumeAccessModeSingleNodeMultiWriter:
// no contraint
}

return true
}

type CSIVolumeChecker struct {
ctx Context
namespace string
Expand Down
133 changes: 128 additions & 5 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestRandomIterator(t *testing.T) {
}
}

func TestHostVolumeChecker(t *testing.T) {
func TestHostVolumeChecker_Static(t *testing.T) {
ci.Parallel(t)

_, ctx := testContext(t)
Expand Down Expand Up @@ -177,14 +178,14 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs, "")
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
}
}

func TestHostVolumeChecker_ReadOnly(t *testing.T) {
func TestHostVolumeChecker_Dynamic(t *testing.T) {
ci.Parallel(t)

store, ctx := testContext(t)
Expand Down Expand Up @@ -359,7 +360,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs, "")
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
Expand Down Expand Up @@ -468,13 +469,135 @@ func TestHostVolumeChecker_Sticky(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs)
checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs, "")
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
}
}

// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the
// dynamicHostVolumeIsAvailable function
func TestDynamicHostVolumeIsAvailable(t *testing.T) {

allCaps := []*structs.HostVolumeCapability{}

for _, accessMode := range []structs.HostVolumeAccessMode{
structs.HostVolumeAccessModeSingleNodeReader,
structs.HostVolumeAccessModeSingleNodeWriter,
structs.HostVolumeAccessModeSingleNodeSingleWriter,
structs.HostVolumeAccessModeSingleNodeMultiWriter,
} {
for _, attachMode := range []structs.HostVolumeAttachmentMode{
structs.HostVolumeAttachmentModeFilesystem,
structs.HostVolumeAttachmentModeBlockDevice,
} {
allCaps = append(allCaps, &structs.HostVolumeCapability{
AttachmentMode: attachMode,
AccessMode: accessMode,
})
}
}

testCases := []struct {
name string
hasAllocs []string
hasWriters []string
hasCaps []*structs.HostVolumeCapability
wantAccess structs.HostVolumeAccessMode
wantAttach structs.HostVolumeAttachmentMode
readOnly bool
prevAllocID string
expect bool
}{
{
name: "enforce attachment mode",
hasCaps: []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice,
AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter,
}},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce read only",
hasAllocs: []string{"a", "b", "c"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
expect: false,
},
{
name: "enforce read only ok",
hasAllocs: []string{"a", "b", "c"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
readOnly: true,
expect: true,
},
{
name: "enforce single writer",
hasAllocs: []string{"a", "b", "c"},
hasWriters: []string{"b"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce single writer ok across deployments",
hasAllocs: []string{"a", "b", "c"},
hasWriters: []string{"b"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
prevAllocID: "b",
expect: true,
},
{
name: "multi writer is always ok",
hasAllocs: []string{"a", "b", "c"},
hasWriters: []string{"b", "c"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter,
expect: true,
},
{
name: "default capabilities ok",
expect: true,
},
{
name: "default capabilities fail",
readOnly: true,
hasCaps: []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice,
AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter,
}},
expect: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
allocs := helper.ConvertSlice(tc.hasAllocs, func(id string) *structs.AllocListStub {
return &structs.AllocListStub{ID: id}
})
vol := &structs.HostVolume{
Allocations: allocs,
Writers: tc.hasWriters,
State: structs.HostVolumeStateReady,
}
if len(tc.hasCaps) > 0 {
vol.RequestedCapabilities = tc.hasCaps
} else {
vol.RequestedCapabilities = allCaps
}

must.Eq(t, tc.expect, hostVolumeIsAvailable(
vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.prevAllocID))
})
}

}

func TestCSIVolumeChecker(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
Expand Down
1 change: 1 addition & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
if prevAllocation.HostVolumeIDs != nil {
selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs
}
selectOptions.PreviousAllocID = prevAllocation.ID
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
Expand Down
5 changes: 3 additions & 2 deletions scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type SelectOptions struct {
Preempt bool
AllocName string
AllocationHostVolumeIDs []string
PreviousAllocID string
}

// GenericStack is the Stack used for the Generic scheduler. It is
Expand Down Expand Up @@ -157,7 +158,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs, options.PreviousAllocID)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
Expand Down Expand Up @@ -350,7 +351,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs, options.PreviousAllocID)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
Expand Down

0 comments on commit 366ccbf

Please sign in to comment.