Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event stream: refactor to use Eventer interface #24738

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
429 changes: 40 additions & 389 deletions nomad/state/events.go

Large diffs are not rendered by default.

24 changes: 15 additions & 9 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@ import (
const (
tableIndex = "index"

TableACLAuthMethods = "acl_auth_methods"
TableACLBindingRules = "acl_binding_rules"
TableACLPolicies = "acl_policy"
TableACLRoles = "acl_roles"
TableACLTokens = "acl_token"
TableAllocs = "allocs"
TableCSIPlugins = "csi_plugins"
TableCSIVolumes = "csi_volumes"
TableDeployments = "deployment"
TableEvals = "evals"
TableHostVolumes = "host_volumes"
TableJobSubmission = "job_submission"
TableJobs = "jobs"
TableNamespaces = "namespaces"
TableNodePools = "node_pools"
TableNodes = "nodes"
TableRootKeys = "root_keys"
TableServiceRegistrations = "service_registrations"
TableVariables = "variables"
TableVariablesQuotas = "variables_quota"
TableRootKeys = "root_keys"
TableACLRoles = "acl_roles"
TableACLAuthMethods = "acl_auth_methods"
TableACLBindingRules = "acl_binding_rules"
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
)

const (
Expand Down
35 changes: 35 additions & 0 deletions nomad/structs/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,18 @@ func (a *ACLRole) Stub() *ACLRoleListStub {
}
}

// Event emits an event for the event stream
func (a *ACLRole) Event() Event {
return Event{
Topic: TopicACLRole,
Key: a.ID,
FilterKeys: []string{a.Name},
Payload: &ACLRoleStreamEvent{
ACLRole: a,
},
}
}

// ACLRoleListStub is the stub object returned when performing a listing of ACL
// roles. While it might not currently be different to the full response
// object, it allows us to future-proof the RPC in the event the ACLRole object
Expand Down Expand Up @@ -983,6 +995,17 @@ func (a *ACLAuthMethod) TokenLocalityIsGlobal() bool {
return a.TokenLocality == ACLAuthMethodTokenLocalityGlobal
}

// Event emits an event for the event stream
func (a *ACLAuthMethod) Event() Event {
return Event{
Topic: TopicACLAuthMethod,
Key: a.Name,
Payload: &ACLAuthMethodEvent{
AuthMethod: a,
},
}
}

// ACLAuthMethodConfig is used to store configuration of an auth method
type ACLAuthMethodConfig struct {
// A list of PEM-encoded public keys to use to authenticate signatures
Expand Down Expand Up @@ -1442,6 +1465,18 @@ func (a *ACLBindingRule) Stub() *ACLBindingRuleListStub {
}
}

// Event emits an event for the event stream
func (a *ACLBindingRule) Event() Event {
return Event{
Topic: TopicACLBindingRule,
Key: a.ID,
FilterKeys: []string{a.AuthMethod},
Payload: &ACLBindingRuleEvent{
ACLBindingRule: a,
},
}
}

// ACLBindingRuleListStub is the stub object returned when performing a listing
// of ACL binding rules.
type ACLBindingRuleListStub struct {
Expand Down
30 changes: 30 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,22 @@ func (v *CSIVolume) Merge(other *CSIVolume) error {
return errs.ErrorOrNil()
}

// Event emits an event for the event stream
func (v *CSIVolume) Event() Event {
return Event{
Topic: TopicCSIVolume,
FilterKeys: []string{
v.ID,
v.Name,
v.PluginID,
},
Namespace: v.Namespace,
Payload: &CSIVolumeEvent{
Volume: v,
},
}
}

// Request and response wrappers
type CSIVolumeRegisterRequest struct {
Volumes []*CSIVolume
Expand Down Expand Up @@ -1581,6 +1597,20 @@ func (p *CSIPlugin) IsEmpty() bool {
p.NodeJobs.Count() == 0
}

// Event emits an event for the event stream. Note there is no CSIPlugin event
// type, because CSI plugins don't have their own write RPCs; they are always
// created/removed via node updates
func (p *CSIPlugin) Event() Event {
return Event{
Topic: TopicCSIPlugin,
Key: p.ID,
FilterKeys: []string{p.ID},
Payload: &CSIPluginEvent{
Plugin: p,
},
}
}

type CSIPluginListRequest struct {
QueryOptions
}
Expand Down
23 changes: 8 additions & 15 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type EventStreamWrapper struct {
Event *EventJson
}

type Eventer interface {
Event() Event
}

type Topic string

const (
Expand Down Expand Up @@ -154,27 +158,16 @@ type ACLTokenEvent struct {
secretID string
}

func (a *ACLTokenEvent) SecretID() string {
return a.secretID
}

// ServiceRegistrationStreamEvent holds a newly updated or deleted service
// registration.
type ServiceRegistrationStreamEvent struct {
Service *ServiceRegistration
}

// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates
// a copy of the passed in ACLToken and empties out the copied tokens SecretID
func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent {
c := token.Sanitize()

return &ACLTokenEvent{
ACLToken: c,
secretID: token.SecretID,
}
}

func (a *ACLTokenEvent) SecretID() string {
return a.secretID
}

type ACLPolicyEvent struct {
ACLPolicy *ACLPolicy
}
Expand Down
16 changes: 16 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ func (hv *HostVolume) GetID() string {
return hv.ID
}

// Event emits an event for the event stream
func (hv *HostVolume) Event() Event {
return Event{
Topic: TopicHostVolume,
FilterKeys: []string{
hv.ID,
hv.Name,
hv.PluginID,
},
Namespace: hv.Namespace,
Payload: &HostVolumeEvent{
Volume: hv,
},
}
}

// HostVolumeCapability is the requested attachment and access mode for a volume
type HostVolumeCapability struct {
AttachmentMode HostVolumeAttachmentMode
Expand Down
11 changes: 11 additions & 0 deletions nomad/structs/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ func (n *NodePool) SetHash() []byte {
return hashVal
}

// Event emits an event for the event stream
func (n *NodePool) Event() Event {
return Event{
Topic: TopicNodePool,
Key: n.Name,
Payload: &NodePoolEvent{
NodePool: n,
},
}
}

// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a
// node pool.
//
Expand Down
16 changes: 16 additions & 0 deletions nomad/structs/service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ func (s *ServiceRegistration) HashWith(key string) string {
return fmt.Sprintf("%x", sum.Sum(nil))
}

// Event emits an event for the event stream
func (s *ServiceRegistration) Event() Event {
return Event{
Topic: TopicService,
Key: s.ID,
FilterKeys: []string{
s.JobID,
s.ServiceName,
},
Namespace: s.Namespace,
Payload: &ServiceRegistrationStreamEvent{
Service: s,
},
}
}

// ServiceRegistrationUpsertRequest is the request object used to upsert one or
// more service registrations.
type ServiceRegistrationUpsertRequest struct {
Expand Down
97 changes: 97 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,18 @@ func (n *Node) Stub(fields *NodeStubFields) *NodeListStub {
return s
}

// Event emits a sanitized node event for the event stream
func (n *Node) Event() Event {
nn := n.Sanitize()
return Event{
Topic: TopicNode,
Key: nn.ID,
Payload: &NodeStreamEvent{
Node: nn,
},
}
}

// NodeListStub is used to return a subset of job information
// for the job list
type NodeListStub struct {
Expand Down Expand Up @@ -5285,6 +5297,18 @@ func (j *Job) SetSubmitTime() {
j.SubmitTime = time.Now().UTC().UnixNano()
}

// Event emits an event for the event stream
func (j *Job) Event() Event {
return Event{
Topic: TopicJob,
Key: j.ID,
Namespace: j.Namespace,
Payload: &JobEvent{
Job: j,
},
}
}

// JobListStub is used to return a subset of job information
// for the job list
type JobListStub struct {
Expand Down Expand Up @@ -10783,6 +10807,19 @@ func (d *Deployment) GetNamespace() string {
return d.Namespace
}

// Event emits an event for the event stream
func (d *Deployment) Event() Event {
return Event{
Topic: TopicDeployment,
Key: d.ID,
Namespace: d.Namespace,
FilterKeys: []string{d.JobID},
Payload: &DeploymentEvent{
Deployment: d,
},
}
}

// DeploymentState tracks the state of a deployment for a given task group.
type DeploymentState struct {
// AutoRevert marks whether the task group has indicated the job should be
Expand Down Expand Up @@ -11919,6 +11956,26 @@ func (a *Allocation) LastRescheduleFailed() bool {
a.RescheduleTracker.LastReschedule != LastRescheduleSuccess
}

// Event emits an event for the event stream
func (a *Allocation) Event() Event {
// remove job info to help keep size of alloc event down
aa := a.Copy()
aa.Job = nil

return Event{
Topic: TopicAllocation,
Key: aa.ID,
FilterKeys: []string{
aa.JobID,
aa.DeploymentID,
},
Namespace: aa.Namespace,
Payload: &AllocationEvent{
Allocation: aa,
},
}
}

// AllocationDiff is another named type for Allocation (to use the same fields),
// which is used to represent the delta for an Allocation. If you need a method
// defined on the al
Expand Down Expand Up @@ -12870,6 +12927,22 @@ func (e *Evaluation) UpdateModifyTime() {
}
}

// Event emits an event for the event stream
func (e *Evaluation) Event() Event {
return Event{
Topic: TopicEvaluation,
Key: e.ID,
FilterKeys: []string{
e.JobID,
e.DeploymentID,
},
Namespace: e.Namespace,
Payload: &EvaluationEvent{
Evaluation: e,
},
}
}

// Plan is used to submit a commit plan for task allocations. These
// are submitted to the leader which verifies that resources have
// not been overcommitted before admitting the plan.
Expand Down Expand Up @@ -13376,6 +13449,17 @@ type ACLPolicy struct {
ModifyIndex uint64
}

// Event emits an event for the event stream
func (a *ACLPolicy) Event() Event {
return Event{
Topic: TopicACLPolicy,
Key: a.Name,
Payload: &ACLPolicyEvent{
ACLPolicy: a,
},
}
}

// JobACL represents an ACL policy's attachment to a job, group, or task.
type JobACL struct {
Namespace string // namespace of the job
Expand Down Expand Up @@ -13574,6 +13658,19 @@ func (a *ACLToken) Copy() *ACLToken {
return c
}

// Event emits an sanitized ACLTokenEvent for the event stream
func (a *ACLToken) Event() Event {
out := a.Sanitize()
return Event{
Topic: TopicACLToken,
Key: a.AccessorID,
Payload: &ACLTokenEvent{
ACLToken: out,
secretID: a.SecretID,
},
}
}

var (
// AnonymousACLToken is used when no SecretID is provided, and the request
// is made anonymously.
Expand Down
Loading