From 18588997d57c342df79532b381c4db638042b44c Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 19 Dec 2024 16:09:48 -0500 Subject: [PATCH] event stream: refactor to use Eventer interface Add an `Eventer` interface to the core structs so they can emit their own events, and use this to refactor the `eventFromChange` function at the core of the event stream. --- nomad/state/events.go | 429 +++----------------------- nomad/state/schema.go | 24 +- nomad/structs/acl.go | 35 +++ nomad/structs/csi.go | 30 ++ nomad/structs/event.go | 23 +- nomad/structs/host_volumes.go | 16 + nomad/structs/node_pool.go | 11 + nomad/structs/service_registration.go | 16 + nomad/structs/structs.go | 97 ++++++ 9 files changed, 268 insertions(+), 413 deletions(-) diff --git a/nomad/state/events.go b/nomad/state/events.go index c2c30e10cdd..1a9154fabb7 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -68,405 +68,56 @@ func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events { func eventFromChange(change memdb.Change) (structs.Event, bool) { if change.Deleted() { + // we don't emit events for all Eventers on delete, so we need to make + // sure we're only emitting events for the tables we want switch change.Table { - case "acl_token": - before, ok := change.Before.(*structs.ACLToken) + case TableACLAuthMethods, + TableACLBindingRules, + TableACLPolicies, + TableACLRoles, + TableACLTokens, + TableCSIPlugins, + TableCSIVolumes, + TableHostVolumes, + TableJobs, + TableNodePools, + TableNodes, + TableServiceRegistrations: + before, ok := change.Before.(structs.Eventer) if !ok { return structs.Event{}, false } - - return structs.Event{ - Topic: structs.TopicACLToken, - Key: before.AccessorID, - Payload: structs.NewACLTokenEvent(before), - }, true - case "acl_policy": - before, ok := change.Before.(*structs.ACLPolicy) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicACLPolicy, - Key: before.Name, - Payload: &structs.ACLPolicyEvent{ - ACLPolicy: before, - }, - }, true - case TableACLRoles: - before, ok := change.Before.(*structs.ACLRole) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicACLRole, - Key: before.ID, - FilterKeys: []string{before.Name}, - Payload: &structs.ACLRoleStreamEvent{ - ACLRole: before, - }, - }, true - case TableACLAuthMethods: - before, ok := change.Before.(*structs.ACLAuthMethod) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicACLAuthMethod, - Key: before.Name, - Payload: &structs.ACLAuthMethodEvent{ - AuthMethod: before, - }, - }, true - case TableACLBindingRules: - before, ok := change.Before.(*structs.ACLBindingRule) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicACLBindingRule, - Key: before.ID, - FilterKeys: []string{before.AuthMethod}, - Payload: &structs.ACLBindingRuleEvent{ - ACLBindingRule: before, - }, - }, true - case "jobs": - before, ok := change.Before.(*structs.Job) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicJob, - Key: before.ID, - Namespace: before.Namespace, - Payload: &structs.JobEvent{ - Job: before, - }, - }, true - case "nodes": - before, ok := change.Before.(*structs.Node) - if !ok { - return structs.Event{}, false - } - - before = before.Sanitize() - return structs.Event{ - Topic: structs.TopicNode, - Key: before.ID, - Payload: &structs.NodeStreamEvent{ - Node: before, - }, - }, true - case TableNodePools: - before, ok := change.Before.(*structs.NodePool) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicNodePool, - Key: before.Name, - Payload: &structs.NodePoolEvent{ - NodePool: before, - }, - }, true - case TableServiceRegistrations: - before, ok := change.Before.(*structs.ServiceRegistration) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicService, - Key: before.ID, - FilterKeys: []string{ - before.JobID, - before.ServiceName, - }, - Namespace: before.Namespace, - Payload: &structs.ServiceRegistrationStreamEvent{ - Service: before, - }, - }, true - case TableHostVolumes: - before, ok := change.Before.(*structs.HostVolume) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicHostVolume, - FilterKeys: []string{ - before.ID, - before.Name, - before.PluginID, - }, - Namespace: before.Namespace, - Payload: &structs.HostVolumeEvent{ - Volume: before, - }, - }, true - case TableCSIVolumes: - before, ok := change.Before.(*structs.CSIVolume) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicCSIVolume, - Key: before.ID, - FilterKeys: []string{ - before.ID, - before.Name, - before.PluginID, - }, - Namespace: before.Namespace, - Payload: &structs.CSIVolumeEvent{ - Volume: before, - }, - }, true - case TableCSIPlugins: - // note: there is no CSIPlugin event type, because CSI plugins don't - // have their own write RPCs; they are always created/removed via - // node updates - before, ok := change.Before.(*structs.CSIPlugin) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicCSIPlugin, - Key: before.ID, - FilterKeys: []string{before.ID}, - Payload: &structs.CSIPluginEvent{ - Plugin: before, - }, - }, true + return before.Event(), true + default: + return structs.Event{}, false } - return structs.Event{}, false } + // we don't emit events for all Eventers on update (ex. the Job and + // JobVersion table have the same Job object), so we need to make sure + // we're only emitting events for the tables we want switch change.Table { - case "acl_token": - after, ok := change.After.(*structs.ACLToken) - if !ok { - return structs.Event{}, false - } - - return structs.Event{ - Topic: structs.TopicACLToken, - Key: after.AccessorID, - Payload: structs.NewACLTokenEvent(after), - }, true - case "acl_policy": - after, ok := change.After.(*structs.ACLPolicy) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicACLPolicy, - Key: after.Name, - Payload: &structs.ACLPolicyEvent{ - ACLPolicy: after, - }, - }, true - case TableACLRoles: - after, ok := change.After.(*structs.ACLRole) + case TableACLAuthMethods, + TableACLBindingRules, + TableACLPolicies, + TableACLRoles, + TableACLTokens, + TableAllocs, + TableCSIPlugins, + TableCSIVolumes, + TableDeployments, + TableEvals, + TableHostVolumes, + TableJobs, + TableNodePools, + TableNodes, + TableServiceRegistrations: + after, ok := change.After.(structs.Eventer) if !ok { return structs.Event{}, false } - return structs.Event{ - Topic: structs.TopicACLRole, - Key: after.ID, - FilterKeys: []string{after.Name}, - Payload: &structs.ACLRoleStreamEvent{ - ACLRole: after, - }, - }, true - case TableACLAuthMethods: - after, ok := change.After.(*structs.ACLAuthMethod) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicACLAuthMethod, - Key: after.Name, - Payload: &structs.ACLAuthMethodEvent{ - AuthMethod: after, - }, - }, true - case TableACLBindingRules: - after, ok := change.After.(*structs.ACLBindingRule) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicACLBindingRule, - Key: after.ID, - FilterKeys: []string{after.AuthMethod}, - Payload: &structs.ACLBindingRuleEvent{ - ACLBindingRule: after, - }, - }, true - case "evals": - after, ok := change.After.(*structs.Evaluation) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicEvaluation, - Key: after.ID, - FilterKeys: []string{ - after.JobID, - after.DeploymentID, - }, - Namespace: after.Namespace, - Payload: &structs.EvaluationEvent{ - Evaluation: after, - }, - }, true - case "allocs": - after, ok := change.After.(*structs.Allocation) - if !ok { - return structs.Event{}, false - } - alloc := after.Copy() - - filterKeys := []string{ - alloc.JobID, - alloc.DeploymentID, - } - - // remove job info to help keep size of alloc event down - alloc.Job = nil - - return structs.Event{ - Topic: structs.TopicAllocation, - Key: after.ID, - FilterKeys: filterKeys, - Namespace: after.Namespace, - Payload: &structs.AllocationEvent{ - Allocation: alloc, - }, - }, true - case "jobs": - after, ok := change.After.(*structs.Job) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicJob, - Key: after.ID, - Namespace: after.Namespace, - Payload: &structs.JobEvent{ - Job: after, - }, - }, true - case "nodes": - after, ok := change.After.(*structs.Node) - if !ok { - return structs.Event{}, false - } - - after = after.Sanitize() - return structs.Event{ - Topic: structs.TopicNode, - Key: after.ID, - Payload: &structs.NodeStreamEvent{ - Node: after, - }, - }, true - case TableNodePools: - after, ok := change.After.(*structs.NodePool) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicNodePool, - Key: after.Name, - Payload: &structs.NodePoolEvent{ - NodePool: after, - }, - }, true - case "deployment": - after, ok := change.After.(*structs.Deployment) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicDeployment, - Key: after.ID, - Namespace: after.Namespace, - FilterKeys: []string{after.JobID}, - Payload: &structs.DeploymentEvent{ - Deployment: after, - }, - }, true - case TableServiceRegistrations: - after, ok := change.After.(*structs.ServiceRegistration) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicService, - Key: after.ID, - FilterKeys: []string{ - after.JobID, - after.ServiceName, - }, - Namespace: after.Namespace, - Payload: &structs.ServiceRegistrationStreamEvent{ - Service: after, - }, - }, true - case TableHostVolumes: - after, ok := change.After.(*structs.HostVolume) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicHostVolume, - Key: after.ID, - FilterKeys: []string{ - after.ID, - after.Name, - after.PluginID, - }, - Namespace: after.Namespace, - Payload: &structs.HostVolumeEvent{ - Volume: after, - }, - }, true - case TableCSIVolumes: - after, ok := change.After.(*structs.CSIVolume) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicCSIVolume, - Key: after.ID, - FilterKeys: []string{ - after.ID, - after.Name, - after.PluginID, - }, - Namespace: after.Namespace, - Payload: &structs.CSIVolumeEvent{ - Volume: after, - }, - }, true - case TableCSIPlugins: - // note: there is no CSIPlugin event type, because CSI plugins don't - // have their own write RPCs; they are always created/removed via - // node updates - after, ok := change.After.(*structs.CSIPlugin) - if !ok { - return structs.Event{}, false - } - return structs.Event{ - Topic: structs.TopicCSIPlugin, - Key: after.ID, - FilterKeys: []string{after.ID}, - Payload: &structs.CSIPluginEvent{ - Plugin: after, - }, - }, true + return after.Event(), true + default: + return structs.Event{}, false } - - return structs.Event{}, false } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 0aae6c1fcf9..60a28bad3da 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -15,20 +15,26 @@ import ( const ( tableIndex = "index" + TableACLAuthMethods = "acl_auth_methods" + TableACLBindingRules = "acl_binding_rules" + TableACLPolicies = "acl_policy" + TableACLRoles = "acl_roles" + TableACLTokens = "acl_token" + TableAllocs = "allocs" + TableCSIPlugins = "csi_plugins" + TableCSIVolumes = "csi_volumes" + TableDeployments = "deployment" + TableEvals = "evals" + TableHostVolumes = "host_volumes" + TableJobSubmission = "job_submission" + TableJobs = "jobs" TableNamespaces = "namespaces" TableNodePools = "node_pools" + TableNodes = "nodes" + TableRootKeys = "root_keys" TableServiceRegistrations = "service_registrations" TableVariables = "variables" TableVariablesQuotas = "variables_quota" - TableRootKeys = "root_keys" - TableACLRoles = "acl_roles" - TableACLAuthMethods = "acl_auth_methods" - TableACLBindingRules = "acl_binding_rules" - TableAllocs = "allocs" - TableJobSubmission = "job_submission" - TableHostVolumes = "host_volumes" - TableCSIVolumes = "csi_volumes" - TableCSIPlugins = "csi_plugins" ) const ( diff --git a/nomad/structs/acl.go b/nomad/structs/acl.go index eed00046fa8..459d461eda2 100644 --- a/nomad/structs/acl.go +++ b/nomad/structs/acl.go @@ -629,6 +629,18 @@ func (a *ACLRole) Stub() *ACLRoleListStub { } } +// Event emits an event for the event stream +func (a *ACLRole) Event() Event { + return Event{ + Topic: TopicACLRole, + Key: a.ID, + FilterKeys: []string{a.Name}, + Payload: &ACLRoleStreamEvent{ + ACLRole: a, + }, + } +} + // ACLRoleListStub is the stub object returned when performing a listing of ACL // roles. While it might not currently be different to the full response // object, it allows us to future-proof the RPC in the event the ACLRole object @@ -983,6 +995,17 @@ func (a *ACLAuthMethod) TokenLocalityIsGlobal() bool { return a.TokenLocality == ACLAuthMethodTokenLocalityGlobal } +// Event emits an event for the event stream +func (a *ACLAuthMethod) Event() Event { + return Event{ + Topic: TopicACLAuthMethod, + Key: a.Name, + Payload: &ACLAuthMethodEvent{ + AuthMethod: a, + }, + } +} + // ACLAuthMethodConfig is used to store configuration of an auth method type ACLAuthMethodConfig struct { // A list of PEM-encoded public keys to use to authenticate signatures @@ -1442,6 +1465,18 @@ func (a *ACLBindingRule) Stub() *ACLBindingRuleListStub { } } +// Event emits an event for the event stream +func (a *ACLBindingRule) Event() Event { + return Event{ + Topic: TopicACLBindingRule, + Key: a.ID, + FilterKeys: []string{a.AuthMethod}, + Payload: &ACLBindingRuleEvent{ + ACLBindingRule: a, + }, + } +} + // ACLBindingRuleListStub is the stub object returned when performing a listing // of ACL binding rules. type ACLBindingRuleListStub struct { diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 8dd85db766e..2a5d477d4e8 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -852,6 +852,22 @@ func (v *CSIVolume) Merge(other *CSIVolume) error { return errs.ErrorOrNil() } +// Event emits an event for the event stream +func (v *CSIVolume) Event() Event { + return Event{ + Topic: TopicCSIVolume, + FilterKeys: []string{ + v.ID, + v.Name, + v.PluginID, + }, + Namespace: v.Namespace, + Payload: &CSIVolumeEvent{ + Volume: v, + }, + } +} + // Request and response wrappers type CSIVolumeRegisterRequest struct { Volumes []*CSIVolume @@ -1581,6 +1597,20 @@ func (p *CSIPlugin) IsEmpty() bool { p.NodeJobs.Count() == 0 } +// Event emits an event for the event stream. Note there is no CSIPlugin event +// type, because CSI plugins don't have their own write RPCs; they are always +// created/removed via node updates +func (p *CSIPlugin) Event() Event { + return Event{ + Topic: TopicCSIPlugin, + Key: p.ID, + FilterKeys: []string{p.ID}, + Payload: &CSIPluginEvent{ + Plugin: p, + }, + } +} + type CSIPluginListRequest struct { QueryOptions } diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 7a2e98fd056..c91312d5d60 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -16,6 +16,10 @@ type EventStreamWrapper struct { Event *EventJson } +type Eventer interface { + Event() Event +} + type Topic string const ( @@ -154,27 +158,16 @@ type ACLTokenEvent struct { secretID string } +func (a *ACLTokenEvent) SecretID() string { + return a.secretID +} + // ServiceRegistrationStreamEvent holds a newly updated or deleted service // registration. type ServiceRegistrationStreamEvent struct { Service *ServiceRegistration } -// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates -// a copy of the passed in ACLToken and empties out the copied tokens SecretID -func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent { - c := token.Sanitize() - - return &ACLTokenEvent{ - ACLToken: c, - secretID: token.SecretID, - } -} - -func (a *ACLTokenEvent) SecretID() string { - return a.secretID -} - type ACLPolicyEvent struct { ACLPolicy *ACLPolicy } diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 440ad956512..e30a1838868 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -245,6 +245,22 @@ func (hv *HostVolume) GetID() string { return hv.ID } +// Event emits an event for the event stream +func (hv *HostVolume) Event() Event { + return Event{ + Topic: TopicHostVolume, + FilterKeys: []string{ + hv.ID, + hv.Name, + hv.PluginID, + }, + Namespace: hv.Namespace, + Payload: &HostVolumeEvent{ + Volume: hv, + }, + } +} + // HostVolumeCapability is the requested attachment and access mode for a volume type HostVolumeCapability struct { AttachmentMode HostVolumeAttachmentMode diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go index ada2a8d57ab..c58b0f892d4 100644 --- a/nomad/structs/node_pool.go +++ b/nomad/structs/node_pool.go @@ -179,6 +179,17 @@ func (n *NodePool) SetHash() []byte { return hashVal } +// Event emits an event for the event stream +func (n *NodePool) Event() Event { + return Event{ + Topic: TopicNodePool, + Key: n.Name, + Payload: &NodePoolEvent{ + NodePool: n, + }, + } +} + // NodePoolSchedulerConfiguration is the scheduler confinguration applied to a // node pool. // diff --git a/nomad/structs/service_registration.go b/nomad/structs/service_registration.go index 2b2bf7a4d92..c680931fbd9 100644 --- a/nomad/structs/service_registration.go +++ b/nomad/structs/service_registration.go @@ -196,6 +196,22 @@ func (s *ServiceRegistration) HashWith(key string) string { return fmt.Sprintf("%x", sum.Sum(nil)) } +// Event emits an event for the event stream +func (s *ServiceRegistration) Event() Event { + return Event{ + Topic: TopicService, + Key: s.ID, + FilterKeys: []string{ + s.JobID, + s.ServiceName, + }, + Namespace: s.Namespace, + Payload: &ServiceRegistrationStreamEvent{ + Service: s, + }, + } +} + // ServiceRegistrationUpsertRequest is the request object used to upsert one or // more service registrations. type ServiceRegistrationUpsertRequest struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e55089f3e12..73b655c7534 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2401,6 +2401,18 @@ func (n *Node) Stub(fields *NodeStubFields) *NodeListStub { return s } +// Event emits a sanitized node event for the event stream +func (n *Node) Event() Event { + nn := n.Sanitize() + return Event{ + Topic: TopicNode, + Key: nn.ID, + Payload: &NodeStreamEvent{ + Node: nn, + }, + } +} + // NodeListStub is used to return a subset of job information // for the job list type NodeListStub struct { @@ -5285,6 +5297,18 @@ func (j *Job) SetSubmitTime() { j.SubmitTime = time.Now().UTC().UnixNano() } +// Event emits an event for the event stream +func (j *Job) Event() Event { + return Event{ + Topic: TopicJob, + Key: j.ID, + Namespace: j.Namespace, + Payload: &JobEvent{ + Job: j, + }, + } +} + // JobListStub is used to return a subset of job information // for the job list type JobListStub struct { @@ -10783,6 +10807,19 @@ func (d *Deployment) GetNamespace() string { return d.Namespace } +// Event emits an event for the event stream +func (d *Deployment) Event() Event { + return Event{ + Topic: TopicDeployment, + Key: d.ID, + Namespace: d.Namespace, + FilterKeys: []string{d.JobID}, + Payload: &DeploymentEvent{ + Deployment: d, + }, + } +} + // DeploymentState tracks the state of a deployment for a given task group. type DeploymentState struct { // AutoRevert marks whether the task group has indicated the job should be @@ -11919,6 +11956,26 @@ func (a *Allocation) LastRescheduleFailed() bool { a.RescheduleTracker.LastReschedule != LastRescheduleSuccess } +// Event emits an event for the event stream +func (a *Allocation) Event() Event { + // remove job info to help keep size of alloc event down + aa := a.Copy() + aa.Job = nil + + return Event{ + Topic: TopicAllocation, + Key: aa.ID, + FilterKeys: []string{ + aa.JobID, + aa.DeploymentID, + }, + Namespace: aa.Namespace, + Payload: &AllocationEvent{ + Allocation: aa, + }, + } +} + // AllocationDiff is another named type for Allocation (to use the same fields), // which is used to represent the delta for an Allocation. If you need a method // defined on the al @@ -12870,6 +12927,22 @@ func (e *Evaluation) UpdateModifyTime() { } } +// Event emits an event for the event stream +func (e *Evaluation) Event() Event { + return Event{ + Topic: TopicEvaluation, + Key: e.ID, + FilterKeys: []string{ + e.JobID, + e.DeploymentID, + }, + Namespace: e.Namespace, + Payload: &EvaluationEvent{ + Evaluation: e, + }, + } +} + // Plan is used to submit a commit plan for task allocations. These // are submitted to the leader which verifies that resources have // not been overcommitted before admitting the plan. @@ -13376,6 +13449,17 @@ type ACLPolicy struct { ModifyIndex uint64 } +// Event emits an event for the event stream +func (a *ACLPolicy) Event() Event { + return Event{ + Topic: TopicACLPolicy, + Key: a.Name, + Payload: &ACLPolicyEvent{ + ACLPolicy: a, + }, + } +} + // JobACL represents an ACL policy's attachment to a job, group, or task. type JobACL struct { Namespace string // namespace of the job @@ -13574,6 +13658,19 @@ func (a *ACLToken) Copy() *ACLToken { return c } +// Event emits an sanitized ACLTokenEvent for the event stream +func (a *ACLToken) Event() Event { + out := a.Sanitize() + return Event{ + Topic: TopicACLToken, + Key: a.AccessorID, + Payload: &ACLTokenEvent{ + ACLToken: out, + secretID: a.SecretID, + }, + } +} + var ( // AnonymousACLToken is used when no SecretID is provided, and the request // is made anonymously.