Skip to content

Commit

Permalink
dynamic host volumes: account for other claims in capability check
Browse files Browse the repository at this point in the history
When we feasibility check a dynamic host volume against a volume request, we
check the attachment mode and access mode. This only ensures that the
capabilities match, but doesn't enforce the semantics of the capabilities
against other claims that may be made on the allocation.

Add support for checking the requested capability against other allocations that
the volume claimed.

Ref: #24479
  • Loading branch information
tgross committed Dec 18, 2024
1 parent 2545932 commit 447e41a
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 27 deletions.
5 changes: 5 additions & 0 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs

vol = vol.Copy()
vol.Allocations = []*structs.AllocListStub{}
vol.Writers = []string{}

// we can't use AllocsByNodeTerminal because we only want to filter out
// allocs that are client-terminal, not server-terminal
Expand All @@ -43,6 +44,9 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs
}
for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes {
if volClaim.Type == structs.VolumeTypeHost && volClaim.Source == vol.Name {
if !volClaim.ReadOnly {
vol.Writers = append(vol.Writers, alloc.ID)
}
vol.Allocations = append(vol.Allocations, alloc.Stub(nil))
}
}
Expand Down Expand Up @@ -101,6 +105,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err
// Allocations are denormalized on read, so we don't want these to be
// written to the state store.
vol.Allocations = nil
vol.Writers = nil
vol.ModifyIndex = index

err = txn.Insert(TableHostVolumes, vol)
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type HostVolume struct {
// this host volume. They are denormalized on read and this field will be
// never written to Raft
Allocations []*AllocListStub `json:",omitempty"`

// Writers and is a list of alloc IDs from the Allocations field that can
// write to this volume. This count is denormalized on read, never written
// to Raft, and never returned by the API.
Writers []string `json:"-"`
}

type HostVolumeState string
Expand Down
71 changes: 54 additions & 17 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumeReqs []*structs.VolumeRequest
namespace string
ctx Context
volumeReqs []*structs.VolumeRequest
namespace string
previousAllocationID string
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
Expand All @@ -151,9 +152,10 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
}

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) {
func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest, prevAllocID string) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
h.previousAllocationID = prevAllocID
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue // filter CSI volumes
Expand Down Expand Up @@ -194,26 +196,20 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
}

if volCfg.ID != "" { // dynamic host volume
vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false)
vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, true)
if err != nil || vol == nil {
// node fingerprint has a dynamic volume that's no longer in the
// state store; this is only possible if the batched fingerprint
// update from a delete RPC is written before the delete RPC's
// raft entry completes
return false
}
if vol.State != structs.HostVolumeStateReady {
return false
}
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
capOk = true
break
}
}
if !capOk {
if !hostVolumeIsAvailable(vol,
structs.HostVolumeAccessMode(req.AccessMode),
structs.HostVolumeAttachmentMode(req.AttachmentMode),
req.ReadOnly,
h.previousAllocationID,
) {
return false
}
} else if !req.ReadOnly {
Expand All @@ -228,6 +224,47 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

// hostVolumeIsAvailable determines if a dynamic host volume is available for a request
func hostVolumeIsAvailable(hv *structs.HostVolume, reqAccess structs.HostVolumeAccessMode, reqAttach structs.HostVolumeAttachmentMode, readOnly bool, previousAllocID string) bool {

if hv.State != structs.HostVolumeStateReady {
return false
}

// check that the volume has the requested capability at all
var capOk bool
for _, cap := range hv.RequestedCapabilities {
if reqAccess == cap.AccessMode &&
reqAttach == cap.AttachmentMode {
capOk = true
break
}
}
if !capOk {
return false
}

// if no other allocations claim the volume, it's first-come-first-serve to
// determine how subsequent allocs can claim it
if len(hv.Allocations) == 0 {
return true
}

switch reqAccess {
case structs.HostVolumeAccessModeUnknown:
return false // invalid mode
case structs.HostVolumeAccessModeSingleNodeReader:
return readOnly
case structs.HostVolumeAccessModeSingleNodeWriter, structs.HostVolumeAccessModeSingleNodeSingleWriter:
return len(hv.Writers) == 0 ||
(len(hv.Writers) == 1 && hv.Writers[0] == previousAllocID)
case structs.HostVolumeAccessModeSingleNodeMultiWriter:
// no contraint
}

return true
}

type CSIVolumeChecker struct {
ctx Context
namespace string
Expand Down
118 changes: 114 additions & 4 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestRandomIterator(t *testing.T) {
}
}

func TestHostVolumeChecker(t *testing.T) {
func TestHostVolumeChecker_Static(t *testing.T) {
ci.Parallel(t)

_, ctx := testContext(t)
Expand Down Expand Up @@ -177,14 +178,14 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, "")
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
}
}

func TestHostVolumeChecker_ReadOnly(t *testing.T) {
func TestHostVolumeChecker_Dynamic(t *testing.T) {
ci.Parallel(t)

store, ctx := testContext(t)
Expand Down Expand Up @@ -359,13 +360,122 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, "")
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
}
}

// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the
// dynamicHostVolumeIsAvailable function
func TestDynamicHostVolumeIsAvailable(t *testing.T) {

allCaps := []*structs.HostVolumeCapability{}

for _, accessMode := range []structs.HostVolumeAccessMode{
structs.HostVolumeAccessModeSingleNodeReader,
structs.HostVolumeAccessModeSingleNodeWriter,
structs.HostVolumeAccessModeSingleNodeSingleWriter,
structs.HostVolumeAccessModeSingleNodeMultiWriter,
} {
for _, attachMode := range []structs.HostVolumeAttachmentMode{
structs.HostVolumeAttachmentModeFilesystem,
structs.HostVolumeAttachmentModeBlockDevice,
} {
allCaps = append(allCaps, &structs.HostVolumeCapability{
AttachmentMode: attachMode,
AccessMode: accessMode,
})
}
}

testCases := []struct {
name string
hasAllocs []string
hasWriters []string
hasCaps []*structs.HostVolumeCapability
wantAccess structs.HostVolumeAccessMode
wantAttach structs.HostVolumeAttachmentMode
readOnly bool
prevAllocID string
expect bool
}{
{
name: "enforce attachment mode",
hasCaps: []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice,
AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter,
}},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce read only",
hasAllocs: []string{"a", "b", "c"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
expect: false,
},
{
name: "enforce read only ok",
hasAllocs: []string{"a", "b", "c"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
readOnly: true,
expect: true,
},
{
name: "enforce single writer",
hasAllocs: []string{"a", "b", "c"},
hasWriters: []string{"b"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce single writer ok across deployments",
hasAllocs: []string{"a", "b", "c"},
hasWriters: []string{"b"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
prevAllocID: "b",
expect: true,
},
{
name: "multi writer is always ok",
hasAllocs: []string{"a", "b", "c"},
hasWriters: []string{"b", "c"},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter,
expect: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
allocs := helper.ConvertSlice(tc.hasAllocs, func(id string) *structs.AllocListStub {
return &structs.AllocListStub{ID: id}
})
vol := &structs.HostVolume{
Allocations: allocs,
Writers: tc.hasWriters,
State: structs.HostVolumeStateReady,
}
if len(tc.hasCaps) > 0 {
vol.RequestedCapabilities = tc.hasCaps
} else {
vol.RequestedCapabilities = allCaps
}

must.Eq(t, tc.expect, hostVolumeIsAvailable(
vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.prevAllocID))
})
}

}

func TestCSIVolumeChecker(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
Expand Down
1 change: 1 addition & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes
selectOptions.PreviousAllocID = prevAllocation.ID
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
Expand Down
13 changes: 7 additions & 6 deletions scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ type Stack interface {
}

type SelectOptions struct {
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
PreviousAllocID string
}

// GenericStack is the Stack used for the Generic scheduler. It is
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.PreviousAllocID)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
Expand Down Expand Up @@ -349,7 +350,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.PreviousAllocID)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
Expand Down

0 comments on commit 447e41a

Please sign in to comment.