Skip to content

Commit

Permalink
dynamic host volumes: account for other claims in capability check
Browse files Browse the repository at this point in the history
When we feasibility check a dynamic host volume against a volume request, we
check the attachment mode and access mode. This only ensures that the
capabilities match, but doesn't enforce the semantics of the capabilities
against other claims that may be made on the allocation.

Add support for checking the requested capability against other allocations that
the volume claimed.

Ref: #24479
  • Loading branch information
tgross committed Dec 18, 2024
1 parent eec24f1 commit 2294a80
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 27 deletions.
5 changes: 5 additions & 0 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs

vol = vol.Copy()
vol.Allocations = []*structs.AllocListStub{}
vol.Writers = []string{}

// we can't use AllocsByNodeTerminal because we only want to filter out
// allocs that are client-terminal, not server-terminal
Expand All @@ -43,6 +44,9 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs
}
for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes {
if volClaim.Type == structs.VolumeTypeHost && volClaim.Source == vol.Name {
if !volClaim.ReadOnly {
vol.Writers = append(vol.Writers, alloc.ID)
}
vol.Allocations = append(vol.Allocations, alloc.Stub(nil))
}
}
Expand Down Expand Up @@ -101,6 +105,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err
// Allocations are denormalized on read, so we don't want these to be
// written to the state store.
vol.Allocations = nil
vol.Writers = nil
vol.ModifyIndex = index

err = txn.Insert(TableHostVolumes, vol)
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type HostVolume struct {
// this host volume. They are denormalized on read and this field will be
// never written to Raft
Allocations []*AllocListStub `json:",omitempty"`

// Writers and is a list of alloc IDs from the Allocations field that can
// write to this volume. This count is denormalized on read, never written
// to Raft, and never returned by the API.
Writers []string `json:"-"`
}

type HostVolumeState string
Expand Down
114 changes: 95 additions & 19 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumeReqs []*structs.VolumeRequest
hostVolumeIDs []string
namespace string
ctx Context
volumeReqs []*structs.VolumeRequest
hostVolumeIDs []string
namespace string
previousAllocationID string
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
Expand All @@ -155,11 +156,11 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string,
) {
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string, prevAllocID string) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
h.hostVolumeIDs = allocHostVolumeIDs
h.previousAllocationID = prevAllocID
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue // filter CSI volumes
Expand Down Expand Up @@ -192,33 +193,33 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

proposed, err := h.ctx.ProposedAllocs(n.ID)
if err != nil {
return false // only hit this on state store invariant failure
}

for _, req := range h.volumeReqs {
volCfg, ok := n.HostVolumes[req.Source]
if !ok {
return false
}

if volCfg.ID != "" { // dynamic host volume
vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false)
vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, true)
if err != nil || vol == nil {
// node fingerprint has a dynamic volume that's no longer in the
// state store; this is only possible if the batched fingerprint
// update from a delete RPC is written before the delete RPC's
// raft entry completes
return false
}
if vol.State != structs.HostVolumeStateReady {
return false
}
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
capOk = true
break
}
}
if !capOk {
if !hostVolumeIsAvailable(vol,
structs.HostVolumeAccessMode(req.AccessMode),
structs.HostVolumeAttachmentMode(req.AttachmentMode),
req.ReadOnly,
h.previousAllocationID,
proposed,
) {
return false
}

Expand All @@ -242,6 +243,81 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

// hostVolumeIsAvailable determines if a dynamic host volume is available for a request
func hostVolumeIsAvailable(hv *structs.HostVolume, reqAccess structs.HostVolumeAccessMode, reqAttach structs.HostVolumeAttachmentMode, readOnly bool, previousAllocID string, proposed []*structs.Allocation) bool {

if hv.State != structs.HostVolumeStateReady {
return false
}

// pick a default capability based on the read-only flag. this happens here
// in the scheduler rather than job submit because we don't know whether a
// host volume is dynamic or not until we try to schedule it (ex. the same
// name could be static on one node and dynamic on another)
if reqAccess == structs.HostVolumeAccessModeUnknown {
if readOnly {
reqAccess = structs.HostVolumeAccessModeSingleNodeReader
} else {
reqAccess = structs.HostVolumeAccessModeSingleNodeWriter
}
}
if reqAttach == structs.HostVolumeAttachmentModeUnknown {
reqAttach = structs.HostVolumeAttachmentModeFilesystem
}

// check that the volume has the requested capability at all
var capOk bool
for _, cap := range hv.RequestedCapabilities {
if reqAccess == cap.AccessMode &&
reqAttach == cap.AttachmentMode {
capOk = true
break
}
}
if !capOk {
return false
}

// if no other allocations claim the volume, it's first-come-first-serve to
// determine how subsequent allocs can claim it
if len(hv.Allocations) == 0 && len(proposed) == 0 {
return true
}

switch reqAccess {
case structs.HostVolumeAccessModeSingleNodeReader:
return readOnly
case structs.HostVolumeAccessModeSingleNodeWriter:
return !readOnly
case structs.HostVolumeAccessModeSingleNodeSingleWriter:
// make a cheap check that no other persisted alloc has claimed writes
// if its not being replaced by this alloc
if len(hv.Writers) > 0 && hv.Writers[0] != previousAllocID {
return false
}

// examine all allocs on the node, including those that might not have
// yet been persisted
for _, alloc := range proposed {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
for _, req := range tg.Volumes {
if req.Type == structs.VolumeTypeHost && req.Source == hv.Name {
if !req.ReadOnly {
return false
}
}
}

}
// return len(hv.Writers) == 0 ||
// (len(hv.Writers) == 1 && hv.Writers[0] == previousAllocID)
case structs.HostVolumeAccessModeSingleNodeMultiWriter:
// no contraint
}

return true
}

type CSIVolumeChecker struct {
ctx Context
namespace string
Expand Down
Loading

0 comments on commit 2294a80

Please sign in to comment.