diff --git a/.changelog/24724.txt b/.changelog/24724.txt new file mode 100644 index 00000000000..20775747c80 --- /dev/null +++ b/.changelog/24724.txt @@ -0,0 +1,3 @@ +```release-note:improvement +csi: Added CSI volume and plugin events to the event stream +``` diff --git a/nomad/state/events.go b/nomad/state/events.go index a78111e5975..c2c30e10cdd 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -43,6 +43,9 @@ var MsgTypeEvents = map[structs.MessageType]string{ structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration, structs.HostVolumeRegisterRequestType: structs.TypeHostVolumeRegistered, structs.HostVolumeDeleteRequestType: structs.TypeHostVolumeDeleted, + structs.CSIVolumeRegisterRequestType: structs.TypeCSIVolumeRegistered, + structs.CSIVolumeDeregisterRequestType: structs.TypeCSIVolumeDeregistered, + structs.CSIVolumeClaimRequestType: structs.TypeCSIVolumeClaim, } func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events { @@ -190,7 +193,6 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { } return structs.Event{ Topic: structs.TopicHostVolume, - Key: before.ID, FilterKeys: []string{ before.ID, before.Name, @@ -201,6 +203,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Volume: before, }, }, true + case TableCSIVolumes: + before, ok := change.Before.(*structs.CSIVolume) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicCSIVolume, + Key: before.ID, + FilterKeys: []string{ + before.ID, + before.Name, + before.PluginID, + }, + Namespace: before.Namespace, + Payload: &structs.CSIVolumeEvent{ + Volume: before, + }, + }, true + case TableCSIPlugins: + // note: there is no CSIPlugin event type, because CSI plugins don't + // have their own write RPCs; they are always created/removed via + // node updates + before, ok := change.Before.(*structs.CSIPlugin) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicCSIPlugin, + Key: before.ID, + FilterKeys: []string{before.ID}, + Payload: &structs.CSIPluginEvent{ + Plugin: before, + }, + }, true } return structs.Event{}, false } @@ -396,6 +432,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Volume: after, }, }, true + case TableCSIVolumes: + after, ok := change.After.(*structs.CSIVolume) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicCSIVolume, + Key: after.ID, + FilterKeys: []string{ + after.ID, + after.Name, + after.PluginID, + }, + Namespace: after.Namespace, + Payload: &structs.CSIVolumeEvent{ + Volume: after, + }, + }, true + case TableCSIPlugins: + // note: there is no CSIPlugin event type, because CSI plugins don't + // have their own write RPCs; they are always created/removed via + // node updates + after, ok := change.After.(*structs.CSIPlugin) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicCSIPlugin, + Key: after.ID, + FilterKeys: []string{after.ID}, + Payload: &structs.CSIPluginEvent{ + Plugin: after, + }, + }, true } return structs.Event{}, false diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 9b52ae3bfc8..47fc17f75d6 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -1216,7 +1216,6 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) { } func TestEvents_HostVolumes(t *testing.T) { - ci.Parallel(t) store := TestStateStoreCfg(t, TestStateStorePublisher(t)) defer store.StopEventBroker() @@ -1258,6 +1257,84 @@ func TestEvents_HostVolumes(t *testing.T) { must.Eq(t, "HostVolumeDeleted", events[4].Type) } +func TestEvents_CSIVolumes(t *testing.T) { + ci.Parallel(t) + store := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer store.StopEventBroker() + + index, err := store.LatestIndex() + must.NoError(t, err) + + plugin := mock.CSIPlugin() + vol := mock.CSIVolume(plugin) + + index++ + must.NoError(t, store.UpsertCSIVolume(index, []*structs.CSIVolume{vol})) + + alloc := mock.Alloc() + index++ + store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc}) + + claim := &structs.CSIVolumeClaim{ + AllocationID: alloc.ID, + NodeID: uuid.Generate(), + Mode: structs.CSIVolumeClaimGC, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateReadyToFree, + } + index++ + must.NoError(t, store.CSIVolumeClaim(index, time.Now().UnixNano(), vol.Namespace, vol.ID, claim)) + + index++ + must.NoError(t, store.CSIVolumeDeregister(index, vol.Namespace, []string{vol.ID}, false)) + + events := WaitForEvents(t, store, 0, 3, 1*time.Second) + must.Len(t, 3, events) + must.Eq(t, "CSIVolume", events[0].Topic) + must.Eq(t, "CSIVolumeRegistered", events[0].Type) + must.Eq(t, "CSIVolume", events[1].Topic) + must.Eq(t, "CSIVolumeClaim", events[1].Type) + must.Eq(t, "CSIVolume", events[2].Topic) + must.Eq(t, "CSIVolumeDeregistered", events[2].Type) + +} + +func TestEvents_CSIPlugins(t *testing.T) { + ci.Parallel(t) + store := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer store.StopEventBroker() + + index, err := store.LatestIndex() + must.NoError(t, err) + + node := mock.Node() + plugin := mock.CSIPlugin() + + index++ + must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node)) + + node = node.Copy() + node.CSINodePlugins = map[string]*structs.CSIInfo{ + plugin.ID: { + PluginID: plugin.ID, + Healthy: true, + UpdateTime: time.Now(), + }, + } + index++ + must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node)) + + events := WaitForEvents(t, store, 0, 3, 1*time.Second) + must.Len(t, 3, events) + must.Eq(t, "Node", events[0].Topic) + must.Eq(t, "NodeRegistration", events[0].Type) + must.Eq(t, "Node", events[1].Topic) + must.Eq(t, "NodeRegistration", events[1].Type) + must.Eq(t, "CSIPlugin", events[2].Topic) + must.Eq(t, "NodeRegistration", events[2].Type) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 7ee83e074eb..0aae6c1fcf9 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -27,6 +27,8 @@ const ( TableAllocs = "allocs" TableJobSubmission = "job_submission" TableHostVolumes = "host_volumes" + TableCSIVolumes = "csi_volumes" + TableCSIPlugins = "csi_plugins" ) const ( @@ -1150,7 +1152,7 @@ func clusterMetaTableSchema() *memdb.TableSchema { // CSIVolumes are identified by id globally, and searchable by driver func csiVolumeTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: "csi_volumes", + Name: TableCSIVolumes, Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", @@ -1182,7 +1184,7 @@ func csiVolumeTableSchema() *memdb.TableSchema { // CSIPlugins are identified by id globally, and searchable by driver func csiPluginTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: "csi_plugins", + Name: TableCSIPlugins, Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 65ce87813db..38c821e61e8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1398,7 +1398,7 @@ func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEv func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error { upsertFn := func(info *structs.CSIInfo) error { - raw, err := txn.First("csi_plugins", "id", info.PluginID) + raw, err := txn.First(TableCSIPlugins, "id", info.PluginID) if err != nil { return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err) } @@ -1429,7 +1429,7 @@ func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error { plug.ModifyIndex = index - err = txn.Insert("csi_plugins", plug) + err = txn.Insert(TableCSIPlugins, plug) if err != nil { return fmt.Errorf("csi_plugins insert error: %v", err) } @@ -1458,7 +1458,7 @@ func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error { // remove the client node from any plugin that's not // running on it. - iter, err := txn.Get("csi_plugins", "id") + iter, err := txn.Get(TableCSIPlugins, "id") if err != nil { return fmt.Errorf("csi_plugins lookup failed: %v", err) } @@ -1503,7 +1503,7 @@ func upsertCSIPluginsForNode(txn *txn, node *structs.Node, index uint64) error { } } - if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } @@ -1525,7 +1525,7 @@ func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { } for id := range names { - raw, err := txn.First("csi_plugins", "id", id) + raw, err := txn.First(TableCSIPlugins, "id", id) if err != nil { return fmt.Errorf("csi_plugins lookup error %s: %v", id, err) } @@ -1546,7 +1546,7 @@ func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { } } - if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } @@ -1556,13 +1556,13 @@ func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { // updateOrGCPlugin updates a plugin but will delete it if the plugin is empty func updateOrGCPlugin(index uint64, txn Txn, plug *structs.CSIPlugin) error { if plug.IsEmpty() { - err := txn.Delete("csi_plugins", plug) + err := txn.Delete(TableCSIPlugins, plug) if err != nil { return fmt.Errorf("csi_plugins delete error: %v", err) } } else { plug.ModifyIndex = index - err := txn.Insert("csi_plugins", plug) + err := txn.Insert(TableCSIPlugins, plug) if err != nil { return fmt.Errorf("csi_plugins update error %s: %v", plug.ID, err) } @@ -1661,7 +1661,7 @@ func (s *StateStore) deleteJobFromPlugins(index uint64, txn Txn, job *structs.Jo } if len(plugins) > 0 { - if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + if err = txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } } @@ -2561,7 +2561,7 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) // UpsertCSIVolume inserts a volume in the state store. func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(structs.CSIVolumeRegisterRequestType, index) defer txn.Abort() for _, v := range volumes { @@ -2571,7 +2571,7 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace) } - obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID) + obj, err := txn.First(TableCSIVolumes, "id", v.Namespace, v.ID) if err != nil { return fmt.Errorf("volume existence check error: %v", err) } @@ -2600,13 +2600,13 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) v.WriteAllocs[allocID] = nil } - err = txn.Insert("csi_volumes", v) + err = txn.Insert(TableCSIVolumes, v) if err != nil { return fmt.Errorf("volume insert: %v", err) } } - if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } @@ -2619,7 +2619,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) txn := s.db.ReadTxn() defer txn.Abort() - iter, err := txn.Get("csi_volumes", "id") + iter, err := txn.Get(TableCSIVolumes, "id") if err != nil { return nil, fmt.Errorf("csi_volumes lookup failed: %v", err) } @@ -2635,7 +2635,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { txn := s.db.ReadTxn() - watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id) + watchCh, obj, err := txn.FirstWatch(TableCSIVolumes, "id", namespace, id) if err != nil { return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err) } @@ -2656,7 +2656,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - iter, err := txn.Get("csi_volumes", "plugin_id", pluginID) + iter, err := txn.Get(TableCSIVolumes, "plugin_id", pluginID) if err != nil { return nil, fmt.Errorf("volume lookup failed: %v", err) } @@ -2684,7 +2684,7 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID txn := s.db.ReadTxn() - iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID) + iter, err := txn.Get(TableCSIVolumes, "id_prefix", namespace, volumeID) if err != nil { return nil, err } @@ -2698,7 +2698,7 @@ func (s *StateStore) csiVolumeByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix txn := s.db.ReadTxn() // Walk the entire csi_volumes table - iter, err := txn.Get("csi_volumes", "id") + iter, err := txn.Get(TableCSIVolumes, "id") if err != nil { return nil, err @@ -2750,7 +2750,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string txn := s.db.ReadTxn() for id, namespace := range ids { if strings.HasPrefix(id, prefix) { - watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id) + watchCh, raw, err := txn.FirstWatch(TableCSIVolumes, "id", namespace, id) if err != nil { return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) } @@ -2771,7 +2771,7 @@ func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix func (s *StateStore) csiVolumesByNamespaceImpl(txn *txn, ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) { - iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix) + iter, err := txn.Get(TableCSIVolumes, "id_prefix", namespace, prefix) if err != nil { return nil, fmt.Errorf("volume lookup failed: %v", err) } @@ -2783,10 +2783,10 @@ func (s *StateStore) csiVolumesByNamespaceImpl(txn *txn, ws memdb.WatchSet, name // CSIVolumeClaim updates the volume's claim count and allocation list func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id string, claim *structs.CSIVolumeClaim) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(structs.CSIVolumeClaimRequestType, index) defer txn.Abort() - row, err := txn.First("csi_volumes", "id", namespace, id) + row, err := txn.First(TableCSIVolumes, "id", namespace, id) if err != nil { return fmt.Errorf("volume lookup failed: %s: %v", id, err) } @@ -2844,11 +2844,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id strin volume.WriteAllocs[allocID] = nil } - if err = txn.Insert("csi_volumes", volume); err != nil { + if err = txn.Insert(TableCSIVolumes, volume); err != nil { return fmt.Errorf("volume update failed: %s: %v", id, err) } - if err = txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { + if err = txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } @@ -2857,11 +2857,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id strin // CSIVolumeDeregister removes the volume from the server func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(structs.CSIVolumeDeregisterRequestType, index) defer txn.Abort() for _, id := range ids { - existing, err := txn.First("csi_volumes", "id", namespace, id) + existing, err := txn.First(TableCSIVolumes, "id", namespace, id) if err != nil { return fmt.Errorf("volume lookup failed: %s: %v", id, err) } @@ -2885,12 +2885,12 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s } } - if err = txn.Delete("csi_volumes", existing); err != nil { + if err = txn.Delete(TableCSIVolumes, existing); err != nil { return fmt.Errorf("volume delete failed: %s: %v", id, err) } } - if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } @@ -3072,7 +3072,7 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) txn := s.db.ReadTxn() defer txn.Abort() - iter, err := txn.Get("csi_plugins", "id") + iter, err := txn.Get(TableCSIPlugins, "id") if err != nil { return nil, fmt.Errorf("csi_plugins lookup failed: %v", err) } @@ -3086,7 +3086,7 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - iter, err := txn.Get("csi_plugins", "id_prefix", pluginID) + iter, err := txn.Get(TableCSIPlugins, "id_prefix", pluginID) if err != nil { return nil, err } @@ -3110,7 +3110,7 @@ func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPl // CSIPluginByIDTxn returns a named CSIPlugin func (s *StateStore) CSIPluginByIDTxn(txn Txn, ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) { - watchCh, obj, err := txn.FirstWatch("csi_plugins", "id", id) + watchCh, obj, err := txn.FirstWatch(TableCSIPlugins, "id", id) if err != nil { return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err) } @@ -3167,7 +3167,7 @@ func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) erro txn := s.db.WriteTxn(index) defer txn.Abort() - existing, err := txn.First("csi_plugins", "id", plug.ID) + existing, err := txn.First(TableCSIPlugins, "id", plug.ID) if err != nil { return fmt.Errorf("csi_plugin lookup error: %s %v", plug.ID, err) } @@ -3178,11 +3178,11 @@ func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) erro plug.CreateTime = existing.(*structs.CSIPlugin).CreateTime } - err = txn.Insert("csi_plugins", plug) + err = txn.Insert(TableCSIPlugins, plug) if err != nil { return fmt.Errorf("csi_plugins insert error: %v", err) } - if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } return txn.Commit() @@ -3242,7 +3242,7 @@ func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { return structs.ErrCSIPluginInUse } - err = txn.Delete("csi_plugins", plug) + err = txn.Delete(TableCSIPlugins, plug) if err != nil { return fmt.Errorf("csi_plugins delete error: %v", err) } @@ -5900,13 +5900,13 @@ func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, t } for _, plugIn := range plugIns { - err = txn.Insert("csi_plugins", plugIn) + err = txn.Insert(TableCSIPlugins, plugIn) if err != nil { return fmt.Errorf("csi_plugins insert error: %v", err) } } - if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{TableCSIPlugins, index}); err != nil { return fmt.Errorf("index update failed: %v", err) } diff --git a/nomad/state/state_store_restore.go b/nomad/state/state_store_restore.go index 0a1638422f2..41a571b71d9 100644 --- a/nomad/state/state_store_restore.go +++ b/nomad/state/state_store_restore.go @@ -181,7 +181,7 @@ func (r *StateRestore) ScalingPolicyRestore(scalingPolicy *structs.ScalingPolicy // CSIPluginRestore is used to restore a CSI plugin func (r *StateRestore) CSIPluginRestore(plugin *structs.CSIPlugin) error { - if err := r.txn.Insert("csi_plugins", plugin); err != nil { + if err := r.txn.Insert(TableCSIPlugins, plugin); err != nil { return fmt.Errorf("csi plugin insert failed: %v", err) } return nil @@ -189,7 +189,7 @@ func (r *StateRestore) CSIPluginRestore(plugin *structs.CSIPlugin) error { // CSIVolumeRestore is used to restore a CSI volume func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error { - if err := r.txn.Insert("csi_volumes", volume); err != nil { + if err := r.txn.Insert(TableCSIVolumes, volume); err != nil { return fmt.Errorf("csi volume insert failed: %v", err) } return nil diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 05a4b8449c1..9445cec3100 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -367,6 +367,14 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool { if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityHostVolumeRead); !ok { return false } + case structs.TopicCSIVolume: + if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityCSIReadVolume); !ok { + return false + } + case structs.TopicCSIPlugin: + if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok { + return false + } case structs.TopicNode: if ok := aclObj.AllowNodeRead(); !ok { return false diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 2b5c81316a8..7a2e98fd056 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -32,6 +32,8 @@ const ( TopicACLBindingRule Topic = "ACLBindingRule" TopicService Topic = "Service" TopicHostVolume Topic = "HostVolume" + TopicCSIVolume Topic = "CSIVolume" + TopicCSIPlugin Topic = "CSIPlugin" TopicAll Topic = "*" TypeNodeRegistration = "NodeRegistration" @@ -66,6 +68,9 @@ const ( TypeServiceDeregistration = "ServiceDeregistration" TypeHostVolumeRegistered = "HostVolumeRegistered" TypeHostVolumeDeleted = "HostVolumeDeleted" + TypeCSIVolumeRegistered = "CSIVolumeRegistered" + TypeCSIVolumeDeregistered = "CSIVolumeDeregistered" + TypeCSIVolumeClaim = "CSIVolumeClaim" ) // Event represents a change in Nomads state. @@ -197,3 +202,15 @@ type ACLBindingRuleEvent struct { type HostVolumeEvent struct { Volume *HostVolume } + +// CSIVolumeEvent holds a newly updated or deleted CSI volume to be +// used as an event in the event stream +type CSIVolumeEvent struct { + Volume *CSIVolume +} + +// CSIPluginEvent holds a newly updated or deleted CSI plugin to be +// used as an event in the event stream +type CSIPluginEvent struct { + Plugin *CSIPlugin +} diff --git a/nomad/volumewatcher/volumes_watcher.go b/nomad/volumewatcher/volumes_watcher.go index 2da49c7d1f6..a9cf7ae3524 100644 --- a/nomad/volumewatcher/volumes_watcher.go +++ b/nomad/volumewatcher/volumes_watcher.go @@ -141,9 +141,9 @@ func (w *Watcher) getVolumes(ctx context.Context, minIndex uint64) ([]*structs.C } // getVolumesImpl retrieves all volumes from the passed state store. -func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { +func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { - iter, err := state.CSIVolumes(ws) + iter, err := store.CSIVolumes(ws) if err != nil { return nil, 0, err } @@ -159,7 +159,7 @@ func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (in } // Use the last index that affected the volume table - index, err := state.Index("csi_volumes") + index, err := store.Index(state.TableCSIVolumes) if err != nil { return nil, 0, err } diff --git a/website/content/api-docs/events.mdx b/website/content/api-docs/events.mdx index f006e2982e3..d8c606d22e4 100644 --- a/website/content/api-docs/events.mdx +++ b/website/content/api-docs/events.mdx @@ -28,6 +28,7 @@ the nature of this endpoint individual topics require specific policies. Note that if you do not include a `topic` parameter all topics will be included by default, requiring a management token. + | Topic | ACL Required | |--------------|------------------------------| | `*` | `management` | @@ -35,6 +36,8 @@ by default, requiring a management token. | `ACLRole` | `management` | | `ACLToken` | `management` | | `Allocation` | `namespace:read-job` | +| `CSIPlugin` | `namespace:read-job` | +| `CSIVolume` | `namespace:csi-read-volume` | | `Deployment` | `namespace:read-job` | | `Evaluation` | `namespace:read-job` | | `HostVolume` | `namespace:host-volume-read` | @@ -72,6 +75,8 @@ by default, requiring a management token. | ACLRoles | ACLRole | | ACLToken | ACLToken | | Allocation | Allocation (no job information) | +| CSIPlugin | CSIPlugin | +| CSIVolume | CSIVolume | | Deployment | Deployment | | Evaluation | Evaluation | | HostVolume | HostVolume (dynamic host volumes only) | @@ -94,6 +99,8 @@ by default, requiring a management token. | AllocationCreated | | AllocationUpdateDesiredStatus | | AllocationUpdated | +| CSIVolumeDeregistered | +| CSIVolumeRegistered | | DeploymentAllocHealth | | DeploymentPromotion | | DeploymentStatusUpdate |