Skip to content

Commit

Permalink
event stream: add events for CSI volumes and plugins
Browse files Browse the repository at this point in the history
Adds new topics to the event stream for CSI volumes and CSI plugins. We'll emit
event when either is created or deleted, and when CSI volumes are claimed.
  • Loading branch information
tgross committed Dec 19, 2024
1 parent 7d86532 commit b6db2cf
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 63 deletions.
3 changes: 3 additions & 0 deletions .changelog/24724.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: Added CSI volume and plugin events to the event stream
```
38 changes: 38 additions & 0 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
structs.CSIVolumeRegisterRequestType: structs.TypeCSIVolumeRegistered,
structs.CSIVolumeDeregisterRequestType: structs.TypeCSIVolumeDeregistered,
}

func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
Expand Down Expand Up @@ -181,6 +183,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Service: before,
},
}, true
case TableCSIVolumes:
before, ok := change.Before.(*structs.CSIVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIVolume,
Key: before.ID,
FilterKeys: []string{
before.ID,
before.Name,
before.PluginID,
},
Namespace: before.Namespace,
Payload: &structs.CSIVolumeEvent{
Volume: before,
},
}, true
}
return structs.Event{}, false
}
Expand Down Expand Up @@ -358,6 +378,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Service: after,
},
}, true
case TableCSIVolumes:
after, ok := change.After.(*structs.CSIVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIVolume,
Key: after.ID,
FilterKeys: []string{
after.ID,
after.Name,
after.PluginID,
},
Namespace: after.Namespace,
Payload: &structs.CSIVolumeEvent{
Volume: after,
},
}, true
}

return structs.Event{}, false
Expand Down
25 changes: 25 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,31 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
}

func TestEvents_CSIVolumes(t *testing.T) {
ci.Parallel(t)
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer store.StopEventBroker()

index, err := store.LatestIndex()
must.NoError(t, err)

plugin := mock.CSIPlugin()
vol := mock.CSIVolume(plugin)

index++
must.NoError(t, store.UpsertCSIVolume(index, []*structs.CSIVolume{vol}))

index++
must.NoError(t, store.CSIVolumeDeregister(index, vol.Namespace, []string{vol.ID}, false))

events := WaitForEvents(t, store, 0, 2, 1*time.Second)
must.Len(t, 2, events)
must.Eq(t, "CSIVolume", events[0].Topic)
must.Eq(t, "CSIVolumeRegistered", events[0].Type)
must.Eq(t, "CSIVolume", events[1].Topic)
must.Eq(t, "CSIVolumeDeregistered", events[1].Type)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

Expand Down
3 changes: 2 additions & 1 deletion nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
)

const (
Expand Down Expand Up @@ -1150,7 +1151,7 @@ func clusterMetaTableSchema() *memdb.TableSchema {
// CSIVolumes are identified by id globally, and searchable by driver
func csiVolumeTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "csi_volumes",
Name: TableCSIVolumes,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Expand Down
36 changes: 18 additions & 18 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2561,7 +2561,7 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string)

// UpsertCSIVolume inserts a volume in the state store.
func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(structs.CSIVolumeRegisterRequestType, index)
defer txn.Abort()

for _, v := range volumes {
Expand All @@ -2571,7 +2571,7 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume)
return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
}

obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID)
obj, err := txn.First(TableCSIVolumes, "id", v.Namespace, v.ID)
if err != nil {
return fmt.Errorf("volume existence check error: %v", err)
}
Expand Down Expand Up @@ -2600,13 +2600,13 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume)
v.WriteAllocs[allocID] = nil
}

err = txn.Insert("csi_volumes", v)
err = txn.Insert(TableCSIVolumes, v)
if err != nil {
return fmt.Errorf("volume insert: %v", err)
}
}

if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
if err := txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

Expand All @@ -2619,7 +2619,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error)
txn := s.db.ReadTxn()
defer txn.Abort()

iter, err := txn.Get("csi_volumes", "id")
iter, err := txn.Get(TableCSIVolumes, "id")
if err != nil {
return nil, fmt.Errorf("csi_volumes lookup failed: %v", err)
}
Expand All @@ -2635,7 +2635,7 @@ func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error)
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) {
txn := s.db.ReadTxn()

watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
watchCh, obj, err := txn.FirstWatch(TableCSIVolumes, "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err)
}
Expand All @@ -2656,7 +2656,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("csi_volumes", "plugin_id", pluginID)
iter, err := txn.Get(TableCSIVolumes, "plugin_id", pluginID)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
}
Expand Down Expand Up @@ -2684,7 +2684,7 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID

txn := s.db.ReadTxn()

iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID)
iter, err := txn.Get(TableCSIVolumes, "id_prefix", namespace, volumeID)
if err != nil {
return nil, err
}
Expand All @@ -2698,7 +2698,7 @@ func (s *StateStore) csiVolumeByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix
txn := s.db.ReadTxn()

// Walk the entire csi_volumes table
iter, err := txn.Get("csi_volumes", "id")
iter, err := txn.Get(TableCSIVolumes, "id")

if err != nil {
return nil, err
Expand Down Expand Up @@ -2750,7 +2750,7 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string
txn := s.db.ReadTxn()
for id, namespace := range ids {
if strings.HasPrefix(id, prefix) {
watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
watchCh, raw, err := txn.FirstWatch(TableCSIVolumes, "id", namespace, id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
}
Expand All @@ -2771,7 +2771,7 @@ func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix

func (s *StateStore) csiVolumesByNamespaceImpl(txn *txn, ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {

iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix)
iter, err := txn.Get(TableCSIVolumes, "id_prefix", namespace, prefix)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
}
Expand All @@ -2786,7 +2786,7 @@ func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id strin
txn := s.db.WriteTxn(index)
defer txn.Abort()

row, err := txn.First("csi_volumes", "id", namespace, id)
row, err := txn.First(TableCSIVolumes, "id", namespace, id)
if err != nil {
return fmt.Errorf("volume lookup failed: %s: %v", id, err)
}
Expand Down Expand Up @@ -2844,11 +2844,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id strin
volume.WriteAllocs[allocID] = nil
}

if err = txn.Insert("csi_volumes", volume); err != nil {
if err = txn.Insert(TableCSIVolumes, volume); err != nil {
return fmt.Errorf("volume update failed: %s: %v", id, err)
}

if err = txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
if err = txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

Expand All @@ -2857,11 +2857,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, now int64, namespace, id strin

// CSIVolumeDeregister removes the volume from the server
func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(structs.CSIVolumeDeregisterRequestType, index)
defer txn.Abort()

for _, id := range ids {
existing, err := txn.First("csi_volumes", "id", namespace, id)
existing, err := txn.First(TableCSIVolumes, "id", namespace, id)
if err != nil {
return fmt.Errorf("volume lookup failed: %s: %v", id, err)
}
Expand All @@ -2885,12 +2885,12 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s
}
}

if err = txn.Delete("csi_volumes", existing); err != nil {
if err = txn.Delete(TableCSIVolumes, existing); err != nil {
return fmt.Errorf("volume delete failed: %s: %v", id, err)
}
}

if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
if err := txn.Insert("index", &IndexEntry{TableCSIVolumes, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/state/state_store_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (r *StateRestore) CSIPluginRestore(plugin *structs.CSIPlugin) error {

// CSIVolumeRestore is used to restore a CSI volume
func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error {
if err := r.txn.Insert("csi_volumes", volume); err != nil {
if err := r.txn.Insert(TableCSIVolumes, volume); err != nil {
return fmt.Errorf("csi volume insert failed: %v", err)
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions nomad/stream/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return false
}
case structs.TopicCSIVolume:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityCSIReadVolume); !ok {
return false
}
case structs.TopicNode:
if ok := aclObj.AllowNodeRead(); !ok {
return false
Expand Down
9 changes: 9 additions & 0 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
TopicACLAuthMethod Topic = "ACLAuthMethod"
TopicACLBindingRule Topic = "ACLBindingRule"
TopicService Topic = "Service"
TopicCSIVolume Topic = "CSIVolume"
TopicAll Topic = "*"

TypeNodeRegistration = "NodeRegistration"
Expand Down Expand Up @@ -63,6 +64,8 @@ const (
TypeACLBindingRuleDeleted = "ACLBindingRuleDeleted"
TypeServiceRegistration = "ServiceRegistration"
TypeServiceDeregistration = "ServiceDeregistration"
TypeCSIVolumeRegistered = "CSIVolumeRegistered"
TypeCSIVolumeDeregistered = "CSIVolumeDeregistered"
)

// Event represents a change in Nomads state.
Expand Down Expand Up @@ -188,3 +191,9 @@ type ACLAuthMethodEvent struct {
type ACLBindingRuleEvent struct {
ACLBindingRule *ACLBindingRule
}

// CSIVolumeEvent holds a newly updated or deleted CSI volume to be
// used as an event in the event stream
type CSIVolumeEvent struct {
Volume *CSIVolume
}
6 changes: 3 additions & 3 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ func (w *Watcher) getVolumes(ctx context.Context, minIndex uint64) ([]*structs.C
}

// getVolumesImpl retrieves all volumes from the passed state store.
func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {

iter, err := state.CSIVolumes(ws)
iter, err := store.CSIVolumes(ws)
if err != nil {
return nil, 0, err
}
Expand All @@ -159,7 +159,7 @@ func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (in
}

// Use the last index that affected the volume table
index, err := state.Index("csi_volumes")
index, err := store.Index(state.TableCSIVolumes)
if err != nil {
return nil, 0, err
}
Expand Down
Loading

0 comments on commit b6db2cf

Please sign in to comment.