From 2294a8015b072050433838d6fc4831184562d40a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 16 Dec 2024 09:36:35 -0500 Subject: [PATCH] dynamic host volumes: account for other claims in capability check When we feasibility check a dynamic host volume against a volume request, we check the attachment mode and access mode. This only ensures that the capabilities match, but doesn't enforce the semantics of the capabilities against other claims that may be made on the allocation. Add support for checking the requested capability against other allocations that the volume claimed. Ref: https://github.com/hashicorp/nomad/pull/24479 --- nomad/state/state_store_host_volumes.go | 5 + nomad/structs/host_volumes.go | 5 + scheduler/feasible.go | 114 +++++++++++++--- scheduler/feasible_test.go | 169 +++++++++++++++++++++++- scheduler/generic_sched.go | 1 + scheduler/generic_sched_test.go | 2 +- scheduler/stack.go | 5 +- 7 files changed, 274 insertions(+), 27 deletions(-) diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 7e55e6ced4..5c7845a4d3 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -30,6 +30,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs vol = vol.Copy() vol.Allocations = []*structs.AllocListStub{} + vol.Writers = []string{} // we can't use AllocsByNodeTerminal because we only want to filter out // allocs that are client-terminal, not server-terminal @@ -43,6 +44,9 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs } for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes { if volClaim.Type == structs.VolumeTypeHost && volClaim.Source == vol.Name { + if !volClaim.ReadOnly { + vol.Writers = append(vol.Writers, alloc.ID) + } vol.Allocations = append(vol.Allocations, alloc.Stub(nil)) } } @@ -101,6 +105,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err // Allocations are denormalized on read, so we don't want these to be // written to the state store. vol.Allocations = nil + vol.Writers = nil vol.ModifyIndex = index err = txn.Insert(TableHostVolumes, vol) diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 440ad95651..5a11d4da80 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -84,6 +84,11 @@ type HostVolume struct { // this host volume. They are denormalized on read and this field will be // never written to Raft Allocations []*AllocListStub `json:",omitempty"` + + // Writers and is a list of alloc IDs from the Allocations field that can + // write to this volume. This count is denormalized on read, never written + // to Raft, and never returned by the API. + Writers []string `json:"-"` } type HostVolumeState string diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 69ab03de7c..ddc4dc185d 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -138,10 +138,11 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // HostVolumeChecker is a FeasibilityChecker which returns whether a node has // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { - ctx Context - volumeReqs []*structs.VolumeRequest - hostVolumeIDs []string - namespace string + ctx Context + volumeReqs []*structs.VolumeRequest + hostVolumeIDs []string + namespace string + previousAllocationID string } // NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes @@ -155,11 +156,11 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { // SetVolumes takes the volumes required by a task group and updates the checker. func (h *HostVolumeChecker) SetVolumes( - allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string, -) { + allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string, prevAllocID string) { h.namespace = ns h.volumeReqs = []*structs.VolumeRequest{} h.hostVolumeIDs = allocHostVolumeIDs + h.previousAllocationID = prevAllocID for _, req := range volumes { if req.Type != structs.VolumeTypeHost { continue // filter CSI volumes @@ -192,6 +193,11 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { return true } + proposed, err := h.ctx.ProposedAllocs(n.ID) + if err != nil { + return false // only hit this on state store invariant failure + } + for _, req := range h.volumeReqs { volCfg, ok := n.HostVolumes[req.Source] if !ok { @@ -199,7 +205,7 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { } if volCfg.ID != "" { // dynamic host volume - vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false) + vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, true) if err != nil || vol == nil { // node fingerprint has a dynamic volume that's no longer in the // state store; this is only possible if the batched fingerprint @@ -207,18 +213,13 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { // raft entry completes return false } - if vol.State != structs.HostVolumeStateReady { - return false - } - var capOk bool - for _, cap := range vol.RequestedCapabilities { - if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) && - req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) { - capOk = true - break - } - } - if !capOk { + if !hostVolumeIsAvailable(vol, + structs.HostVolumeAccessMode(req.AccessMode), + structs.HostVolumeAttachmentMode(req.AttachmentMode), + req.ReadOnly, + h.previousAllocationID, + proposed, + ) { return false } @@ -242,6 +243,81 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { return true } +// hostVolumeIsAvailable determines if a dynamic host volume is available for a request +func hostVolumeIsAvailable(hv *structs.HostVolume, reqAccess structs.HostVolumeAccessMode, reqAttach structs.HostVolumeAttachmentMode, readOnly bool, previousAllocID string, proposed []*structs.Allocation) bool { + + if hv.State != structs.HostVolumeStateReady { + return false + } + + // pick a default capability based on the read-only flag. this happens here + // in the scheduler rather than job submit because we don't know whether a + // host volume is dynamic or not until we try to schedule it (ex. the same + // name could be static on one node and dynamic on another) + if reqAccess == structs.HostVolumeAccessModeUnknown { + if readOnly { + reqAccess = structs.HostVolumeAccessModeSingleNodeReader + } else { + reqAccess = structs.HostVolumeAccessModeSingleNodeWriter + } + } + if reqAttach == structs.HostVolumeAttachmentModeUnknown { + reqAttach = structs.HostVolumeAttachmentModeFilesystem + } + + // check that the volume has the requested capability at all + var capOk bool + for _, cap := range hv.RequestedCapabilities { + if reqAccess == cap.AccessMode && + reqAttach == cap.AttachmentMode { + capOk = true + break + } + } + if !capOk { + return false + } + + // if no other allocations claim the volume, it's first-come-first-serve to + // determine how subsequent allocs can claim it + if len(hv.Allocations) == 0 && len(proposed) == 0 { + return true + } + + switch reqAccess { + case structs.HostVolumeAccessModeSingleNodeReader: + return readOnly + case structs.HostVolumeAccessModeSingleNodeWriter: + return !readOnly + case structs.HostVolumeAccessModeSingleNodeSingleWriter: + // make a cheap check that no other persisted alloc has claimed writes + // if its not being replaced by this alloc + if len(hv.Writers) > 0 && hv.Writers[0] != previousAllocID { + return false + } + + // examine all allocs on the node, including those that might not have + // yet been persisted + for _, alloc := range proposed { + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, req := range tg.Volumes { + if req.Type == structs.VolumeTypeHost && req.Source == hv.Name { + if !req.ReadOnly { + return false + } + } + } + + } + // return len(hv.Writers) == 0 || + // (len(hv.Writers) == 1 && hv.Writers[0] == previousAllocID) + case structs.HostVolumeAccessModeSingleNodeMultiWriter: + // no contraint + } + + return true +} + type CSIVolumeChecker struct { ctx Context namespace string diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 3351210c2e..199037d48e 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -91,7 +92,7 @@ func TestRandomIterator(t *testing.T) { } } -func TestHostVolumeChecker(t *testing.T) { +func TestHostVolumeChecker_Static(t *testing.T) { ci.Parallel(t) _, ctx := testContext(t) @@ -177,14 +178,14 @@ func TestHostVolumeChecker(t *testing.T) { alloc.NodeID = nodes[2].ID for i, c := range cases { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs, "") if act := checker.Feasible(c.Node); act != c.Result { t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) } } } -func TestHostVolumeChecker_ReadOnly(t *testing.T) { +func TestHostVolumeChecker_Dynamic(t *testing.T) { ci.Parallel(t) store, ctx := testContext(t) @@ -359,7 +360,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs, "") actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) @@ -468,13 +469,171 @@ func TestHostVolumeChecker_Sticky(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs) + checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs, "") actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) } } +// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the +// dynamicHostVolumeIsAvailable function +func TestDynamicHostVolumeIsAvailable(t *testing.T) { + + allCaps := []*structs.HostVolumeCapability{} + + for _, accessMode := range []structs.HostVolumeAccessMode{ + structs.HostVolumeAccessModeSingleNodeReader, + structs.HostVolumeAccessModeSingleNodeWriter, + structs.HostVolumeAccessModeSingleNodeSingleWriter, + structs.HostVolumeAccessModeSingleNodeMultiWriter, + } { + for _, attachMode := range []structs.HostVolumeAttachmentMode{ + structs.HostVolumeAttachmentModeFilesystem, + structs.HostVolumeAttachmentModeBlockDevice, + } { + allCaps = append(allCaps, &structs.HostVolumeCapability{ + AttachmentMode: attachMode, + AccessMode: accessMode, + }) + } + } + + job0, job1 := mock.Job(), mock.Job() + job0.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ + "example": { + Type: structs.VolumeTypeHost, + Source: "example", + ReadOnly: false, + }, + } + job1.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ + "example": { + Type: structs.VolumeTypeHost, + Source: "example", + ReadOnly: true, + }, + } + + testCases := []struct { + name string + hasAllocs []string + hasWriters []string + hasProposed []*structs.Allocation + hasCaps []*structs.HostVolumeCapability + wantAccess structs.HostVolumeAccessMode + wantAttach structs.HostVolumeAttachmentMode + readOnly bool + prevAllocID string + expect bool + }{ + { + name: "enforce attachment mode", + hasCaps: []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice, + AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter, + }}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce read only", + hasAllocs: []string{"a", "b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + expect: false, + }, + { + name: "enforce read only ok", + hasAllocs: []string{"a", "b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + readOnly: true, + expect: true, + }, + { + name: "enforce single writer", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce single writer ok across deployments", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + prevAllocID: "b", + expect: true, + }, + { + name: "multi writer is always ok", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter, + expect: true, + }, + { + name: "default capabilities ok", + expect: true, + }, + { + name: "default capabilities fail", + readOnly: true, + hasCaps: []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice, + AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter, + }}, + expect: false, + }, + { + name: "other alloc in same job has writer claim", + hasAllocs: []string{"a", "b", "c"}, + hasProposed: []*structs.Allocation{{ID: "d", Job: job0, TaskGroup: "web"}}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + // note: we only see this happen in the case of a job wiht multiple + // task groups that claim the same volume with incompatible claims + name: "other alloc in same job does not have writer claim", + hasAllocs: []string{"a", "b", "c"}, + hasProposed: []*structs.Allocation{{ID: "d", Job: job1, TaskGroup: "web"}}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + allocs := helper.ConvertSlice(tc.hasAllocs, func(id string) *structs.AllocListStub { + return &structs.AllocListStub{ID: id} + }) + vol := &structs.HostVolume{ + Name: "example", + Allocations: allocs, + Writers: tc.hasWriters, + State: structs.HostVolumeStateReady, + } + if len(tc.hasCaps) > 0 { + vol.RequestedCapabilities = tc.hasCaps + } else { + vol.RequestedCapabilities = allCaps + } + + must.Eq(t, tc.expect, hostVolumeIsAvailable( + vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.prevAllocID, tc.hasProposed)) + }) + } + +} + func TestCSIVolumeChecker(t *testing.T) { ci.Parallel(t) state, ctx := testContext(t) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 60b4f7f1ee..9b8c150568 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -863,6 +863,7 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs if prevAllocation.HostVolumeIDs != nil { selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs } + selectOptions.PreviousAllocID = prevAllocation.ID } if preferredNode != nil { selectOptions.PreferredNodes = []*structs.Node{preferredNode} diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 5d47142313..3d236b5d28 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -218,7 +218,7 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { } } -func TestServiceSched_JobRegister_StickyVolumes(t *testing.T) { +func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) { ci.Parallel(t) h := NewHarness(t) diff --git a/scheduler/stack.go b/scheduler/stack.go index f978c753f6..1eacc504b0 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -40,6 +40,7 @@ type SelectOptions struct { Preempt bool AllocName string AllocationHostVolumeIDs []string + PreviousAllocID string } // GenericStack is the Stack used for the Generic scheduler. It is @@ -157,7 +158,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs, options.PreviousAllocID) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) @@ -350,7 +351,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs, options.PreviousAllocID) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0])