Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event stream: add events for CSI volumes and plugins #24724

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24724.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: Added CSI volume and plugin events to the event stream
```
72 changes: 71 additions & 1 deletion nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
structs.HostVolumeRegisterRequestType: structs.TypeHostVolumeRegistered,
structs.HostVolumeDeleteRequestType: structs.TypeHostVolumeDeleted,
structs.CSIVolumeRegisterRequestType: structs.TypeCSIVolumeRegistered,
structs.CSIVolumeDeregisterRequestType: structs.TypeCSIVolumeDeregistered,
structs.CSIVolumeClaimRequestType: structs.TypeCSIVolumeClaim,
}

func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
Expand Down Expand Up @@ -190,7 +193,6 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
}
return structs.Event{
Topic: structs.TopicHostVolume,
Key: before.ID,
FilterKeys: []string{
before.ID,
before.Name,
Expand All @@ -201,6 +203,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Volume: before,
},
}, true
case TableCSIVolumes:
before, ok := change.Before.(*structs.CSIVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIVolume,
Key: before.ID,
FilterKeys: []string{
before.ID,
before.Name,
before.PluginID,
},
Namespace: before.Namespace,
Payload: &structs.CSIVolumeEvent{
Volume: before,
},
}, true
case TableCSIPlugins:
// note: there is no CSIPlugin event type, because CSI plugins don't
// have their own write RPCs; they are always created/removed via
// node updates
before, ok := change.Before.(*structs.CSIPlugin)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIPlugin,
Key: before.ID,
FilterKeys: []string{before.ID},
Payload: &structs.CSIPluginEvent{
Plugin: before,
},
}, true
}
return structs.Event{}, false
}
Expand Down Expand Up @@ -396,6 +432,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Volume: after,
},
}, true
case TableCSIVolumes:
after, ok := change.After.(*structs.CSIVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIVolume,
Key: after.ID,
FilterKeys: []string{
after.ID,
after.Name,
after.PluginID,
},
Namespace: after.Namespace,
Payload: &structs.CSIVolumeEvent{
Volume: after,
},
}, true
case TableCSIPlugins:
// note: there is no CSIPlugin event type, because CSI plugins don't
// have their own write RPCs; they are always created/removed via
// node updates
after, ok := change.After.(*structs.CSIPlugin)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIPlugin,
Key: after.ID,
FilterKeys: []string{after.ID},
Payload: &structs.CSIPluginEvent{
Plugin: after,
},
}, true
}

return structs.Event{}, false
Expand Down
79 changes: 78 additions & 1 deletion nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,6 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
}

func TestEvents_HostVolumes(t *testing.T) {

ci.Parallel(t)
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer store.StopEventBroker()
Expand Down Expand Up @@ -1258,6 +1257,84 @@ func TestEvents_HostVolumes(t *testing.T) {
must.Eq(t, "HostVolumeDeleted", events[4].Type)
}

func TestEvents_CSIVolumes(t *testing.T) {
ci.Parallel(t)
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer store.StopEventBroker()

index, err := store.LatestIndex()
must.NoError(t, err)

plugin := mock.CSIPlugin()
vol := mock.CSIVolume(plugin)

index++
must.NoError(t, store.UpsertCSIVolume(index, []*structs.CSIVolume{vol}))

alloc := mock.Alloc()
index++
store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc})

claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: uuid.Generate(),
Mode: structs.CSIVolumeClaimGC,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateReadyToFree,
}
index++
must.NoError(t, store.CSIVolumeClaim(index, time.Now().UnixNano(), vol.Namespace, vol.ID, claim))

index++
must.NoError(t, store.CSIVolumeDeregister(index, vol.Namespace, []string{vol.ID}, false))

events := WaitForEvents(t, store, 0, 3, 1*time.Second)
must.Len(t, 3, events)
must.Eq(t, "CSIVolume", events[0].Topic)
must.Eq(t, "CSIVolumeRegistered", events[0].Type)
must.Eq(t, "CSIVolume", events[1].Topic)
must.Eq(t, "CSIVolumeClaim", events[1].Type)
must.Eq(t, "CSIVolume", events[2].Topic)
must.Eq(t, "CSIVolumeDeregistered", events[2].Type)

}

func TestEvents_CSIPlugins(t *testing.T) {
ci.Parallel(t)
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer store.StopEventBroker()

index, err := store.LatestIndex()
must.NoError(t, err)

node := mock.Node()
plugin := mock.CSIPlugin()

index++
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node))

node = node.Copy()
node.CSINodePlugins = map[string]*structs.CSIInfo{
plugin.ID: {
PluginID: plugin.ID,
Healthy: true,
UpdateTime: time.Now(),
},
}
index++
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node))

events := WaitForEvents(t, store, 0, 3, 1*time.Second)
must.Len(t, 3, events)
must.Eq(t, "Node", events[0].Topic)
must.Eq(t, "NodeRegistration", events[0].Type)
must.Eq(t, "Node", events[1].Topic)
must.Eq(t, "NodeRegistration", events[1].Type)
must.Eq(t, "CSIPlugin", events[2].Topic)
must.Eq(t, "NodeRegistration", events[2].Type)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

Expand Down
6 changes: 4 additions & 2 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
)

const (
Expand Down Expand Up @@ -1150,7 +1152,7 @@ func clusterMetaTableSchema() *memdb.TableSchema {
// CSIVolumes are identified by id globally, and searchable by driver
func csiVolumeTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "csi_volumes",
Name: TableCSIVolumes,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Expand Down Expand Up @@ -1182,7 +1184,7 @@ func csiVolumeTableSchema() *memdb.TableSchema {
// CSIPlugins are identified by id globally, and searchable by driver
func csiPluginTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "csi_plugins",
Name: TableCSIPlugins,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Expand Down
Loading
Loading