Skip to content

Commit

Permalink
super hacky prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
pkazmierczak committed Dec 4, 2024
1 parent 3e1672e commit bd50ea9
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
6 changes: 3 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11114,9 +11114,9 @@ type Allocation struct {
// AllocatedResources is the total resources allocated for the task group.
AllocatedResources *AllocatedResources

// VolumeID is the ID of the host volume that this allocation requires.
// FIXME:could be multiple, could be CSI? can't just be a string
VolumeID *string
// VolumeIDs is a list of volume IDs (host or CSI) that this allocation
// requires.
VolumeIDs []string

// Metrics associated with this allocation
Metrics *AllocMetric
Expand Down
42 changes: 32 additions & 10 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ import (
"fmt"
"reflect"
"regexp"
"slices"
"strconv"
"strings"

"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/constraints/semver"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)

const (
FilterConstraintHostVolumes = "missing compatible host volumes"
FilterConstraintHostVolumesLookupFailed = "host volume lookup failed"
FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s"
FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s"
FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s"
Expand Down Expand Up @@ -161,8 +164,6 @@ func (h *HostVolumeChecker) SetVolumes(allocName string, volumes map[string]*str
continue
}

// FIXME: if there's a sticky vol set, adjust this to look for an ID

if req.PerAlloc {
// provide a unique volume source per allocation
copied := req.Copy()
Expand All @@ -176,32 +177,33 @@ func (h *HostVolumeChecker) SetVolumes(allocName string, volumes map[string]*str
}

func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
if h.hasVolumes(candidate) {
ok, failure := h.hasVolumes(candidate)
if ok {
return true
}

h.ctx.Metrics().FilterNode(candidate, FilterConstraintHostVolumes)
h.ctx.Metrics().FilterNode(candidate, failure)
return false
}

func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
func (h *HostVolumeChecker) hasVolumes(n *structs.Node) (bool, string) {
rLen := len(h.volumes)
hLen := len(n.HostVolumes)

// Fast path: Requested no volumes. No need to check further.
if rLen == 0 {
return true
return true, ""
}

// Fast path: Requesting more volumes than the node has, can't meet the criteria.
if rLen > hLen {
return false
return false, FilterConstraintHostVolumes
}

for source, requests := range h.volumes {
nodeVolume, ok := n.HostVolumes[source]
if !ok {
return false
return false, FilterConstraintHostVolumes
}

// If the volume supports being mounted as ReadWrite, we do not need to
Expand All @@ -210,16 +212,36 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
continue
}

ws := memdb.NewWatchSet()
// The Volume can only be mounted ReadOnly, validate that no requests for
// it are ReadWrite.
for _, req := range requests {
if !req.ReadOnly {
return false
return false, FilterConstraintHostVolumes
}

// Sticky volumes must always be paired with the right allocation;
// validate that this node has the right volume ID present.
if req.Sticky {
volumes, err := h.ctx.State().HostVolumesByNodeID(ws, n.ID, state.SortDefault)
if err != nil {
return false, FilterConstraintHostVolumesLookupFailed
}
for raw := volumes.Next(); raw != nil; raw = volumes.Next() {
vol := raw.(*structs.HostVolume)
if vol.Name != nodeVolume.Name {
continue
}
allocs, _ := h.ctx.ProposedAllocs(n.ID)
return slices.ContainsFunc(allocs, func(a *structs.Allocation) bool {
return !slices.Contains(a.VolumeIDs, vol.ID)
}), FilterConstraintHostVolumes
}
}
}
}

return true
return true, ""
}

type CSIVolumeChecker struct {
Expand Down
6 changes: 6 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ type State interface {
// CSIVolumeByID fetch CSI volumes, containing controller jobs
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)

// HostVolumesByNodeID returns an iterator over all volumes of the node
HostVolumesByNodeID(memdb.WatchSet, string, state.SortOption) (memdb.ResultIterator, error)

// HostVolumesByName returns an iterator over all volumes by name
HostVolumesByName(memdb.WatchSet, string, string, state.SortOption) (memdb.ResultIterator, error)

// LatestIndex returns the greatest index value for all indexes.
LatestIndex() (uint64, error)
}
Expand Down

0 comments on commit bd50ea9

Please sign in to comment.