diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 7e55e6ced4..5c7845a4d3 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -30,6 +30,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs vol = vol.Copy() vol.Allocations = []*structs.AllocListStub{} + vol.Writers = []string{} // we can't use AllocsByNodeTerminal because we only want to filter out // allocs that are client-terminal, not server-terminal @@ -43,6 +44,9 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs } for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes { if volClaim.Type == structs.VolumeTypeHost && volClaim.Source == vol.Name { + if !volClaim.ReadOnly { + vol.Writers = append(vol.Writers, alloc.ID) + } vol.Allocations = append(vol.Allocations, alloc.Stub(nil)) } } @@ -101,6 +105,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err // Allocations are denormalized on read, so we don't want these to be // written to the state store. vol.Allocations = nil + vol.Writers = nil vol.ModifyIndex = index err = txn.Insert(TableHostVolumes, vol) diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 440ad95651..5a11d4da80 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -84,6 +84,11 @@ type HostVolume struct { // this host volume. They are denormalized on read and this field will be // never written to Raft Allocations []*AllocListStub `json:",omitempty"` + + // Writers and is a list of alloc IDs from the Allocations field that can + // write to this volume. This count is denormalized on read, never written + // to Raft, and never returned by the API. + Writers []string `json:"-"` } type HostVolumeState string diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 60442f92e7..0477a82e5a 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -137,9 +137,10 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // HostVolumeChecker is a FeasibilityChecker which returns whether a node has // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { - ctx Context - volumeReqs []*structs.VolumeRequest - namespace string + ctx Context + volumeReqs []*structs.VolumeRequest + namespace string + previousAllocationID string } // NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes @@ -151,9 +152,10 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { } // SetVolumes takes the volumes required by a task group and updates the checker. -func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) { +func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest, prevAllocID string) { h.namespace = ns h.volumeReqs = []*structs.VolumeRequest{} + h.previousAllocationID = prevAllocID for _, req := range volumes { if req.Type != structs.VolumeTypeHost { continue // filter CSI volumes @@ -194,7 +196,7 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { } if volCfg.ID != "" { // dynamic host volume - vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false) + vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, true) if err != nil || vol == nil { // node fingerprint has a dynamic volume that's no longer in the // state store; this is only possible if the batched fingerprint @@ -202,18 +204,12 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { // raft entry completes return false } - if vol.State != structs.HostVolumeStateReady { - return false - } - var capOk bool - for _, cap := range vol.RequestedCapabilities { - if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) && - req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) { - capOk = true - break - } - } - if !capOk { + if !hostVolumeIsAvailable(vol, + structs.HostVolumeAccessMode(req.AccessMode), + structs.HostVolumeAttachmentMode(req.AttachmentMode), + req.ReadOnly, + h.previousAllocationID, + ) { return false } } else if !req.ReadOnly { @@ -228,6 +224,47 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { return true } +// hostVolumeIsAvailable determines if a dynamic host volume is available for a request +func hostVolumeIsAvailable(hv *structs.HostVolume, reqAccess structs.HostVolumeAccessMode, reqAttach structs.HostVolumeAttachmentMode, readOnly bool, previousAllocID string) bool { + + if hv.State != structs.HostVolumeStateReady { + return false + } + + // check that the volume has the requested capability at all + var capOk bool + for _, cap := range hv.RequestedCapabilities { + if reqAccess == cap.AccessMode && + reqAttach == cap.AttachmentMode { + capOk = true + break + } + } + if !capOk { + return false + } + + // if no other allocations claim the volume, it's first-come-first-serve to + // determine how subsequent allocs can claim it + if len(hv.Allocations) == 0 { + return true + } + + switch reqAccess { + case structs.HostVolumeAccessModeUnknown: + return false // invalid mode + case structs.HostVolumeAccessModeSingleNodeReader: + return readOnly + case structs.HostVolumeAccessModeSingleNodeWriter, structs.HostVolumeAccessModeSingleNodeSingleWriter: + return len(hv.Writers) == 0 || + (len(hv.Writers) == 1 && hv.Writers[0] == previousAllocID) + case structs.HostVolumeAccessModeSingleNodeMultiWriter: + // no contraint + } + + return true +} + type CSIVolumeChecker struct { ctx Context namespace string diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 9c5a9aaf1a..d560e39e28 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -91,7 +92,7 @@ func TestRandomIterator(t *testing.T) { } } -func TestHostVolumeChecker(t *testing.T) { +func TestHostVolumeChecker_Static(t *testing.T) { ci.Parallel(t) _, ctx := testContext(t) @@ -177,14 +178,14 @@ func TestHostVolumeChecker(t *testing.T) { alloc.NodeID = nodes[2].ID for i, c := range cases { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, "") if act := checker.Feasible(c.Node); act != c.Result { t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) } } } -func TestHostVolumeChecker_ReadOnly(t *testing.T) { +func TestHostVolumeChecker_Dynamic(t *testing.T) { ci.Parallel(t) store, ctx := testContext(t) @@ -359,13 +360,122 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, "") actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) } } +// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the +// dynamicHostVolumeIsAvailable function +func TestDynamicHostVolumeIsAvailable(t *testing.T) { + + allCaps := []*structs.HostVolumeCapability{} + + for _, accessMode := range []structs.HostVolumeAccessMode{ + structs.HostVolumeAccessModeSingleNodeReader, + structs.HostVolumeAccessModeSingleNodeWriter, + structs.HostVolumeAccessModeSingleNodeSingleWriter, + structs.HostVolumeAccessModeSingleNodeMultiWriter, + } { + for _, attachMode := range []structs.HostVolumeAttachmentMode{ + structs.HostVolumeAttachmentModeFilesystem, + structs.HostVolumeAttachmentModeBlockDevice, + } { + allCaps = append(allCaps, &structs.HostVolumeCapability{ + AttachmentMode: attachMode, + AccessMode: accessMode, + }) + } + } + + testCases := []struct { + name string + hasAllocs []string + hasWriters []string + hasCaps []*structs.HostVolumeCapability + wantAccess structs.HostVolumeAccessMode + wantAttach structs.HostVolumeAttachmentMode + readOnly bool + prevAllocID string + expect bool + }{ + { + name: "enforce attachment mode", + hasCaps: []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice, + AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter, + }}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce read only", + hasAllocs: []string{"a", "b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + expect: false, + }, + { + name: "enforce read only ok", + hasAllocs: []string{"a", "b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + readOnly: true, + expect: true, + }, + { + name: "enforce single writer", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce single writer ok across deployments", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + prevAllocID: "b", + expect: true, + }, + { + name: "multi writer is always ok", + hasAllocs: []string{"a", "b", "c"}, + hasWriters: []string{"b", "c"}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter, + expect: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + allocs := helper.ConvertSlice(tc.hasAllocs, func(id string) *structs.AllocListStub { + return &structs.AllocListStub{ID: id} + }) + vol := &structs.HostVolume{ + Allocations: allocs, + Writers: tc.hasWriters, + State: structs.HostVolumeStateReady, + } + if len(tc.hasCaps) > 0 { + vol.RequestedCapabilities = tc.hasCaps + } else { + vol.RequestedCapabilities = allCaps + } + + must.Eq(t, tc.expect, hostVolumeIsAvailable( + vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.prevAllocID)) + }) + } + +} + func TestCSIVolumeChecker(t *testing.T) { ci.Parallel(t) state, ctx := testContext(t) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index f9fd669e59..525320bcce 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -838,6 +838,7 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs } } selectOptions.PenaltyNodeIDs = penaltyNodes + selectOptions.PreviousAllocID = prevAllocation.ID } if preferredNode != nil { selectOptions.PreferredNodes = []*structs.Node{preferredNode} diff --git a/scheduler/stack.go b/scheduler/stack.go index 1f2b658688..22b11e4e6c 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -35,10 +35,11 @@ type Stack interface { } type SelectOptions struct { - PenaltyNodeIDs map[string]struct{} - PreferredNodes []*structs.Node - Preempt bool - AllocName string + PenaltyNodeIDs map[string]struct{} + PreferredNodes []*structs.Node + Preempt bool + AllocName string + PreviousAllocID string } // GenericStack is the Stack used for the Generic scheduler. It is @@ -156,7 +157,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.PreviousAllocID) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) @@ -349,7 +350,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.PreviousAllocID) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0])