From 366ccbff0bf7c348964b9af1cfa513c285ffe159 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 16 Dec 2024 09:36:35 -0500 Subject: [PATCH] dynamic host volumes: account for other claims in capability check When we feasibility check a dynamic host volume against a volume request, we check the attachment mode and access mode. This only ensures that the capabilities match, but doesn't enforce the semantics of the capabilities against other claims that may be made on the allocation. Add support for checking the requested capability against other allocations that the volume claimed. Ref: https://github.com/hashicorp/nomad/pull/24479 --- nomad/state/state_store_host_volumes.go | 5 + nomad/structs/host_volumes.go | 5 + scheduler/feasible.go | 87 ++++++++++++---- scheduler/feasible_test.go | 133 +++++++++++++++++++++++- scheduler/generic_sched.go | 1 + scheduler/stack.go | 5 +- 6 files changed, 210 insertions(+), 26 deletions(-) diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 7e55e6ced4..5c7845a4d3 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -30,6 +30,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs vol = vol.Copy() vol.Allocations = []*structs.AllocListStub{} + vol.Writers = []string{} // we can't use AllocsByNodeTerminal because we only want to filter out // allocs that are client-terminal, not server-terminal @@ -43,6 +44,9 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs } for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes { if volClaim.Type == structs.VolumeTypeHost && volClaim.Source == vol.Name { + if !volClaim.ReadOnly { + vol.Writers = append(vol.Writers, alloc.ID) + } vol.Allocations = append(vol.Allocations, alloc.Stub(nil)) } } @@ -101,6 +105,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err // Allocations are denormalized on read, so we don't want these to be // written to the state store. vol.Allocations = nil + vol.Writers = nil vol.ModifyIndex = index err = txn.Insert(TableHostVolumes, vol) diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 440ad95651..5a11d4da80 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -84,6 +84,11 @@ type HostVolume struct { // this host volume. They are denormalized on read and this field will be // never written to Raft Allocations []*AllocListStub `json:",omitempty"` + + // Writers and is a list of alloc IDs from the Allocations field that can + // write to this volume. This count is denormalized on read, never written + // to Raft, and never returned by the API. + Writers []string `json:"-"` } type HostVolumeState string diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 69ab03de7c..bba185acb9 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -138,10 +138,11 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // HostVolumeChecker is a FeasibilityChecker which returns whether a node has // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { - ctx Context - volumeReqs []*structs.VolumeRequest - hostVolumeIDs []string - namespace string + ctx Context + volumeReqs []*structs.VolumeRequest + hostVolumeIDs []string + namespace string + previousAllocationID string } // NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes @@ -155,11 +156,11 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { // SetVolumes takes the volumes required by a task group and updates the checker. func (h *HostVolumeChecker) SetVolumes( - allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string, -) { + allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string, prevAllocID string) { h.namespace = ns h.volumeReqs = []*structs.VolumeRequest{} h.hostVolumeIDs = allocHostVolumeIDs + h.previousAllocationID = prevAllocID for _, req := range volumes { if req.Type != structs.VolumeTypeHost { continue // filter CSI volumes @@ -199,7 +200,7 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { } if volCfg.ID != "" { // dynamic host volume - vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false) + vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, true) if err != nil || vol == nil { // node fingerprint has a dynamic volume that's no longer in the // state store; this is only possible if the batched fingerprint @@ -207,18 +208,12 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { // raft entry completes return false } - if vol.State != structs.HostVolumeStateReady { - return false - } - var capOk bool - for _, cap := range vol.RequestedCapabilities { - if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) && - req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) { - capOk = true - break - } - } - if !capOk { + if !hostVolumeIsAvailable(vol, + structs.HostVolumeAccessMode(req.AccessMode), + structs.HostVolumeAttachmentMode(req.AttachmentMode), + req.ReadOnly, + h.previousAllocationID, + ) { return false } @@ -242,6 +237,60 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { return true } +// hostVolumeIsAvailable determines if a dynamic host volume is available for a request +func hostVolumeIsAvailable(hv *structs.HostVolume, reqAccess structs.HostVolumeAccessMode, reqAttach structs.HostVolumeAttachmentMode, readOnly bool, previousAllocID string) bool { + + if hv.State != structs.HostVolumeStateReady { + return false + } + + // pick a default capability based on the read-only flag. this happens here + // in the scheduler rather than job submit because we don't know whether a + // host volume is dynamic or not until we try to schedule it (ex. the same + // name could be static on one node and dynamic on another) + if reqAccess == structs.HostVolumeAccessModeUnknown { + if readOnly { + reqAccess = structs.HostVolumeAccessModeSingleNodeReader + } else { + reqAccess = structs.HostVolumeAccessModeSingleNodeWriter + } + } + if reqAttach == structs.HostVolumeAttachmentModeUnknown { + reqAttach = structs.HostVolumeAttachmentModeFilesystem + } + + // check that the volume has the requested capability at all + var capOk bool + for _, cap := range hv.RequestedCapabilities { + if reqAccess == cap.AccessMode && + reqAttach == cap.AttachmentMode { + capOk = true + break + } + } + if !capOk { + return false + } + + // if no other allocations claim the volume, it's first-come-first-serve to + // determine how subsequent allocs can claim it + if len(hv.Allocations) == 0 { + return true + } + + switch reqAccess { + case structs.HostVolumeAccessModeSingleNodeReader: + return readOnly + case structs.HostVolumeAccessModeSingleNodeWriter, structs.HostVolumeAccessModeSingleNodeSingleWriter: + return len(hv.Writers) == 0 || + (len(hv.Writers) == 1 && hv.Writers[0] == previousAllocID) + case structs.HostVolumeAccessModeSingleNodeMultiWriter: + // no contraint + } + + return true +} + type CSIVolumeChecker struct { ctx Context namespace string diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 3351210c2e..3f40b63fed 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -91,7 +92,7 @@ func TestRandomIterator(t *testing.T) { } } -func TestHostVolumeChecker(t *testing.T) { +func TestHostVolumeChecker_Static(t *testing.T) { ci.Parallel(t) _, ctx := testContext(t) @@ -177,14 +178,14 @@ func TestHostVolumeChecker(t *testing.T) { alloc.NodeID = nodes[2].ID for i, c := range cases { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs, "") if act := checker.Feasible(c.Node); act != c.Result { t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) } } } -func TestHostVolumeChecker_ReadOnly(t *testing.T) { +func TestHostVolumeChecker_Dynamic(t *testing.T) { ci.Parallel(t) store, ctx := testContext(t) @@ -359,7 +360,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs, "") actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) @@ -468,13 +469,135 @@ func TestHostVolumeChecker_Sticky(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs) + checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs, "") actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) } } +// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the +// dynamicHostVolumeIsAvailable function +func TestDynamicHostVolumeIsAvailable(t *testing.T) { + + allCaps := []*structs.HostVolumeCapability{} + + for _, accessMode := range []structs.HostVolumeAccessMode{ + structs.HostVolumeAccessModeSingleNodeReader, + structs.HostVolumeAccessModeSingleNodeWriter, + structs.HostVolumeAccessModeSingleNodeSingleWriter, + structs.HostVolumeAccessModeSingleNodeMultiWriter, + } { + for _, attachMode := range []structs.HostVolumeAttachmentMode{ + structs.HostVolumeAttachmentModeFilesystem, + structs.HostVolumeAttachmentModeBlockDevice, + } { + allCaps = append(allCaps, &structs.HostVolumeCapability{ + AttachmentMode: attachMode, + AccessMode: accessMode, + }) + } + } + + testCases := []struct { + name string + hasAllocs []string + hasWriters []string + hasCaps []*structs.HostVolumeCapability + wantAccess structs.HostVolumeAccessMode + wantAttach structs.HostVolumeAttachmentMode + readOnly bool + prevAllocID string + expect bool + }{ + { + name: "enforce attachment mode", + hasCaps: []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice, + AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter, + }}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce read only", + hasAllocs: []string{"a", "b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + expect: false, + }, + { + name: "enforce read only ok", + hasAllocs: []string{"a", "b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + readOnly: true, + expect: true, + }, + { + name: "enforce single writer", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce single writer ok across deployments", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + prevAllocID: "b", + expect: true, + }, + { + name: "multi writer is always ok", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter, + expect: true, + }, + { + name: "default capabilities ok", + expect: true, + }, + { + name: "default capabilities fail", + readOnly: true, + hasCaps: []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice, + AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter, + }}, + expect: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + allocs := helper.ConvertSlice(tc.hasAllocs, func(id string) *structs.AllocListStub { + return &structs.AllocListStub{ID: id} + }) + vol := &structs.HostVolume{ + Allocations: allocs, + Writers: tc.hasWriters, + State: structs.HostVolumeStateReady, + } + if len(tc.hasCaps) > 0 { + vol.RequestedCapabilities = tc.hasCaps + } else { + vol.RequestedCapabilities = allCaps + } + + must.Eq(t, tc.expect, hostVolumeIsAvailable( + vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.prevAllocID)) + }) + } + +} + func TestCSIVolumeChecker(t *testing.T) { ci.Parallel(t) state, ctx := testContext(t) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 60b4f7f1ee..9b8c150568 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -863,6 +863,7 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs if prevAllocation.HostVolumeIDs != nil { selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs } + selectOptions.PreviousAllocID = prevAllocation.ID } if preferredNode != nil { selectOptions.PreferredNodes = []*structs.Node{preferredNode} diff --git a/scheduler/stack.go b/scheduler/stack.go index f978c753f6..1eacc504b0 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -40,6 +40,7 @@ type SelectOptions struct { Preempt bool AllocName string AllocationHostVolumeIDs []string + PreviousAllocID string } // GenericStack is the Stack used for the Generic scheduler. It is @@ -157,7 +158,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs, options.PreviousAllocID) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) @@ -350,7 +351,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs, options.PreviousAllocID) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0])