Skip to content

Commit

Permalink
event stream: refactor to use Eventer interface
Browse files Browse the repository at this point in the history
Add an `Eventer` interface to the core structs so they can emit their own
events, and use this to refactor the `eventFromChange` function at the core of
the event stream.
  • Loading branch information
tgross committed Dec 19, 2024
1 parent c3ac9c1 commit 1858899
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 413 deletions.
429 changes: 40 additions & 389 deletions nomad/state/events.go

Large diffs are not rendered by default.

24 changes: 15 additions & 9 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@ import (
const (
tableIndex = "index"

TableACLAuthMethods = "acl_auth_methods"
TableACLBindingRules = "acl_binding_rules"
TableACLPolicies = "acl_policy"
TableACLRoles = "acl_roles"
TableACLTokens = "acl_token"
TableAllocs = "allocs"
TableCSIPlugins = "csi_plugins"
TableCSIVolumes = "csi_volumes"
TableDeployments = "deployment"
TableEvals = "evals"
TableHostVolumes = "host_volumes"
TableJobSubmission = "job_submission"
TableJobs = "jobs"
TableNamespaces = "namespaces"
TableNodePools = "node_pools"
TableNodes = "nodes"
TableRootKeys = "root_keys"
TableServiceRegistrations = "service_registrations"
TableVariables = "variables"
TableVariablesQuotas = "variables_quota"
TableRootKeys = "root_keys"
TableACLRoles = "acl_roles"
TableACLAuthMethods = "acl_auth_methods"
TableACLBindingRules = "acl_binding_rules"
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
)

const (
Expand Down
35 changes: 35 additions & 0 deletions nomad/structs/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,18 @@ func (a *ACLRole) Stub() *ACLRoleListStub {
}
}

// Event emits an event for the event stream
func (a *ACLRole) Event() Event {
return Event{
Topic: TopicACLRole,
Key: a.ID,
FilterKeys: []string{a.Name},
Payload: &ACLRoleStreamEvent{
ACLRole: a,
},
}
}

// ACLRoleListStub is the stub object returned when performing a listing of ACL
// roles. While it might not currently be different to the full response
// object, it allows us to future-proof the RPC in the event the ACLRole object
Expand Down Expand Up @@ -983,6 +995,17 @@ func (a *ACLAuthMethod) TokenLocalityIsGlobal() bool {
return a.TokenLocality == ACLAuthMethodTokenLocalityGlobal
}

// Event emits an event for the event stream
func (a *ACLAuthMethod) Event() Event {
return Event{
Topic: TopicACLAuthMethod,
Key: a.Name,
Payload: &ACLAuthMethodEvent{
AuthMethod: a,
},
}
}

// ACLAuthMethodConfig is used to store configuration of an auth method
type ACLAuthMethodConfig struct {
// A list of PEM-encoded public keys to use to authenticate signatures
Expand Down Expand Up @@ -1442,6 +1465,18 @@ func (a *ACLBindingRule) Stub() *ACLBindingRuleListStub {
}
}

// Event emits an event for the event stream
func (a *ACLBindingRule) Event() Event {
return Event{
Topic: TopicACLBindingRule,
Key: a.ID,
FilterKeys: []string{a.AuthMethod},
Payload: &ACLBindingRuleEvent{
ACLBindingRule: a,
},
}
}

// ACLBindingRuleListStub is the stub object returned when performing a listing
// of ACL binding rules.
type ACLBindingRuleListStub struct {
Expand Down
30 changes: 30 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,22 @@ func (v *CSIVolume) Merge(other *CSIVolume) error {
return errs.ErrorOrNil()
}

// Event emits an event for the event stream
func (v *CSIVolume) Event() Event {
return Event{
Topic: TopicCSIVolume,
FilterKeys: []string{
v.ID,
v.Name,
v.PluginID,
},
Namespace: v.Namespace,
Payload: &CSIVolumeEvent{
Volume: v,
},
}
}

// Request and response wrappers
type CSIVolumeRegisterRequest struct {
Volumes []*CSIVolume
Expand Down Expand Up @@ -1581,6 +1597,20 @@ func (p *CSIPlugin) IsEmpty() bool {
p.NodeJobs.Count() == 0
}

// Event emits an event for the event stream. Note there is no CSIPlugin event
// type, because CSI plugins don't have their own write RPCs; they are always
// created/removed via node updates
func (p *CSIPlugin) Event() Event {
return Event{
Topic: TopicCSIPlugin,
Key: p.ID,
FilterKeys: []string{p.ID},
Payload: &CSIPluginEvent{
Plugin: p,
},
}
}

type CSIPluginListRequest struct {
QueryOptions
}
Expand Down
23 changes: 8 additions & 15 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type EventStreamWrapper struct {
Event *EventJson
}

type Eventer interface {
Event() Event
}

type Topic string

const (
Expand Down Expand Up @@ -154,27 +158,16 @@ type ACLTokenEvent struct {
secretID string
}

func (a *ACLTokenEvent) SecretID() string {
return a.secretID
}

// ServiceRegistrationStreamEvent holds a newly updated or deleted service
// registration.
type ServiceRegistrationStreamEvent struct {
Service *ServiceRegistration
}

// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates
// a copy of the passed in ACLToken and empties out the copied tokens SecretID
func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent {
c := token.Sanitize()

return &ACLTokenEvent{
ACLToken: c,
secretID: token.SecretID,
}
}

func (a *ACLTokenEvent) SecretID() string {
return a.secretID
}

type ACLPolicyEvent struct {
ACLPolicy *ACLPolicy
}
Expand Down
16 changes: 16 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ func (hv *HostVolume) GetID() string {
return hv.ID
}

// Event emits an event for the event stream
func (hv *HostVolume) Event() Event {
return Event{
Topic: TopicHostVolume,
FilterKeys: []string{
hv.ID,
hv.Name,
hv.PluginID,
},
Namespace: hv.Namespace,
Payload: &HostVolumeEvent{
Volume: hv,
},
}
}

// HostVolumeCapability is the requested attachment and access mode for a volume
type HostVolumeCapability struct {
AttachmentMode HostVolumeAttachmentMode
Expand Down
11 changes: 11 additions & 0 deletions nomad/structs/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ func (n *NodePool) SetHash() []byte {
return hashVal
}

// Event emits an event for the event stream
func (n *NodePool) Event() Event {
return Event{
Topic: TopicNodePool,
Key: n.Name,
Payload: &NodePoolEvent{
NodePool: n,
},
}
}

// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a
// node pool.
//
Expand Down
16 changes: 16 additions & 0 deletions nomad/structs/service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ func (s *ServiceRegistration) HashWith(key string) string {
return fmt.Sprintf("%x", sum.Sum(nil))
}

// Event emits an event for the event stream
func (s *ServiceRegistration) Event() Event {
return Event{
Topic: TopicService,
Key: s.ID,
FilterKeys: []string{
s.JobID,
s.ServiceName,
},
Namespace: s.Namespace,
Payload: &ServiceRegistrationStreamEvent{
Service: s,
},
}
}

// ServiceRegistrationUpsertRequest is the request object used to upsert one or
// more service registrations.
type ServiceRegistrationUpsertRequest struct {
Expand Down
97 changes: 97 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,18 @@ func (n *Node) Stub(fields *NodeStubFields) *NodeListStub {
return s
}

// Event emits a sanitized node event for the event stream
func (n *Node) Event() Event {
nn := n.Sanitize()
return Event{
Topic: TopicNode,
Key: nn.ID,
Payload: &NodeStreamEvent{
Node: nn,
},
}
}

// NodeListStub is used to return a subset of job information
// for the job list
type NodeListStub struct {
Expand Down Expand Up @@ -5285,6 +5297,18 @@ func (j *Job) SetSubmitTime() {
j.SubmitTime = time.Now().UTC().UnixNano()
}

// Event emits an event for the event stream
func (j *Job) Event() Event {
return Event{
Topic: TopicJob,
Key: j.ID,
Namespace: j.Namespace,
Payload: &JobEvent{
Job: j,
},
}
}

// JobListStub is used to return a subset of job information
// for the job list
type JobListStub struct {
Expand Down Expand Up @@ -10783,6 +10807,19 @@ func (d *Deployment) GetNamespace() string {
return d.Namespace
}

// Event emits an event for the event stream
func (d *Deployment) Event() Event {
return Event{
Topic: TopicDeployment,
Key: d.ID,
Namespace: d.Namespace,
FilterKeys: []string{d.JobID},
Payload: &DeploymentEvent{
Deployment: d,
},
}
}

// DeploymentState tracks the state of a deployment for a given task group.
type DeploymentState struct {
// AutoRevert marks whether the task group has indicated the job should be
Expand Down Expand Up @@ -11919,6 +11956,26 @@ func (a *Allocation) LastRescheduleFailed() bool {
a.RescheduleTracker.LastReschedule != LastRescheduleSuccess
}

// Event emits an event for the event stream
func (a *Allocation) Event() Event {
// remove job info to help keep size of alloc event down
aa := a.Copy()
aa.Job = nil

return Event{
Topic: TopicAllocation,
Key: aa.ID,
FilterKeys: []string{
aa.JobID,
aa.DeploymentID,
},
Namespace: aa.Namespace,
Payload: &AllocationEvent{
Allocation: aa,
},
}
}

// AllocationDiff is another named type for Allocation (to use the same fields),
// which is used to represent the delta for an Allocation. If you need a method
// defined on the al
Expand Down Expand Up @@ -12870,6 +12927,22 @@ func (e *Evaluation) UpdateModifyTime() {
}
}

// Event emits an event for the event stream
func (e *Evaluation) Event() Event {
return Event{
Topic: TopicEvaluation,
Key: e.ID,
FilterKeys: []string{
e.JobID,
e.DeploymentID,
},
Namespace: e.Namespace,
Payload: &EvaluationEvent{
Evaluation: e,
},
}
}

// Plan is used to submit a commit plan for task allocations. These
// are submitted to the leader which verifies that resources have
// not been overcommitted before admitting the plan.
Expand Down Expand Up @@ -13376,6 +13449,17 @@ type ACLPolicy struct {
ModifyIndex uint64
}

// Event emits an event for the event stream
func (a *ACLPolicy) Event() Event {
return Event{
Topic: TopicACLPolicy,
Key: a.Name,
Payload: &ACLPolicyEvent{
ACLPolicy: a,
},
}
}

// JobACL represents an ACL policy's attachment to a job, group, or task.
type JobACL struct {
Namespace string // namespace of the job
Expand Down Expand Up @@ -13574,6 +13658,19 @@ func (a *ACLToken) Copy() *ACLToken {
return c
}

// Event emits an sanitized ACLTokenEvent for the event stream
func (a *ACLToken) Event() Event {
out := a.Sanitize()
return Event{
Topic: TopicACLToken,
Key: a.AccessorID,
Payload: &ACLTokenEvent{
ACLToken: out,
secretID: a.SecretID,
},
}
}

var (
// AnonymousACLToken is used when no SecretID is provided, and the request
// is made anonymously.
Expand Down

0 comments on commit 1858899

Please sign in to comment.