Skip to content

Commit

Permalink
Basic support for priority groups
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Nov 7, 2024
1 parent e203fdf commit 297f2e8
Show file tree
Hide file tree
Showing 20 changed files with 718 additions and 29 deletions.
101 changes: 86 additions & 15 deletions api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
JSApiRequestNext = "$JS.API.CONSUMER.MSG.NEXT.*.*"
JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
JSMetricConsumerAckPre = JSMetricPrefix + ".CONSUMER.ACK"
JSAdvisoryConsumerMaxDeliveryExceedPre = JSAdvisoryPrefix + ".CONSUMER.MAX_DELIVERIES"
)
Expand Down Expand Up @@ -95,6 +96,16 @@ func (a *ConsumerAction) UnmarshalJSON(data []byte) error {
return nil
}

// io.nats.jetstream.api.v1.consumer_unpin_response
type JSApiConsumerUnpinRequest struct {
Group string `json:"group"`
}

// io.nats.jetstream.api.v1.consumer_unpin_response
type JSApiConsumerUnpinResponse struct {
JSApiResponse
}

// io.nats.jetstream.api.v1.consumer_delete_response
type JSApiConsumerDeleteResponse struct {
JSApiResponse
Expand Down Expand Up @@ -333,6 +344,53 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
}
}

// PriorityPolicy determines policy for selecting messages based on priority.
type PriorityPolicy int

const (
PriorityNone PriorityPolicy = iota
PriorityOverflow
PriorityPinnedClient
)

func (p PriorityPolicy) String() string {
switch p {
case PriorityOverflow:
return "Overflow"
case PriorityPinnedClient:
return "Pinned Client"
default:
return "None"
}
}

func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("none"):
*p = PriorityNone
case jsonString("overflow"):
*p = PriorityOverflow
case jsonString("pinned_client"):
*p = PriorityPinnedClient
default:
return fmt.Errorf("unknown priority policy: %v", string(data))
}
return nil
}

func (p PriorityPolicy) MarshalJSON() ([]byte, error) {
switch p {
case PriorityNone:
return json.Marshal("none")
case PriorityOverflow:
return json.Marshal("overflow")
case PriorityPinnedClient:
return json.Marshal("pinned_client")
default:
return nil, fmt.Errorf("unknown priority policy: %v", p)
}
}

// ConsumerConfig is the configuration for a JetStream consumes
//
// NATS Schema Type io.nats.jetstream.api.v1.consumer_configuration
Expand Down Expand Up @@ -371,6 +429,11 @@ type ConsumerConfig struct {
// PauseUntil is for suspending the consumer until the deadline.
PauseUntil time.Time `json:"pause_until,omitempty"`

// Priority groups
PriorityGroups []string `json:"priority_groups,omitempty"`
PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"`
PinnedTTL time.Duration `json:"priority_timeout,omitempty"`

// Don't add to general clients.
Direct bool `json:"direct,omitempty"`
}
Expand All @@ -384,21 +447,29 @@ type SequenceInfo struct {

// ConsumerInfo reports the current state of a consumer
type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Config ConsumerConfig `json:"config"`
Created time.Time `json:"created"`
Delivered SequenceInfo `json:"delivered"`
AckFloor SequenceInfo `json:"ack_floor"`
NumAckPending int `json:"num_ack_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
TimeStamp time.Time `json:"ts"`
Stream string `json:"stream_name"`
Name string `json:"name"`
Config ConsumerConfig `json:"config"`
Created time.Time `json:"created"`
Delivered SequenceInfo `json:"delivered"`
AckFloor SequenceInfo `json:"ack_floor"`
NumAckPending int `json:"num_ack_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
TimeStamp time.Time `json:"ts"`
PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"`
}

// PriorityGroupState is the state of a consumer group
type PriorityGroupState struct {
Group string `json:"group"`
PinnedClientID string `json:"pinned_client_id,omitempty"`
PinnedTS time.Time `json:"pinned_ts,omitempty"`
}

// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers
Expand Down
2 changes: 2 additions & 0 deletions api/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func main() {
&schema{P: "jetstream/advisory/v1/consumer_quorum_lost.json", St: "jsadvisory.JSConsumerQuorumLostV1"},
&schema{P: "jetstream/advisory/v1/server_out_of_space.json", St: "jsadvisory.JSServerOutOfSpaceAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/server_removed.json", St: "jsadvisory.JSServerRemovedAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/consumer_group_pinned.json", St: "jsadvisory.JSConsumerGroupPinnedAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/consumer_group_unpinned.json", St: "jsadvisory.JSConsumerGroupUnPinnedAdvisoryV1"},
&schema{P: "jetstream/metric/v1/consumer_ack.json", St: "jsmetric.ConsumerAckMetricV1"},
&schema{P: "jetstream/api/v1/consumer_configuration.json", St: "ConsumerConfig"},
&schema{P: "jetstream/api/v1/stream_configuration.json", St: "StreamConfig"},
Expand Down
36 changes: 36 additions & 0 deletions api/jetstream/advisory/consumer_group_pinned.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package advisory

import "github.com/nats-io/jsm.go/api/event"

// JSConsumerGroupPinnedAdvisoryV1 is an advisory published when a consumer pinned_client grouped consumer pins a client
//
// NATS Schema Type io.nats.jetstream.advisory.v1.consumer_group_pinned
type JSConsumerGroupPinnedAdvisoryV1 struct {
event.NATSEvent

Account string `json:"account,omitempty"`
Stream string `json:"stream"`
Consumer string `json:"consumer"`
Domain string `json:"domain,omitempty"`
Group string `json:"group"`
PinnedClientId string `json:"pinned_id"`
}

func init() {
err := event.RegisterTextCompactTemplate("io.nats.jetstream.advisory.v1.consumer_group_pinned", `{{ .Time | ShortTime }} [PINNED] Consumer {{ .Stream }} > {{ .Consumer }} pinned client {{ .PinnedClientId }} for group {{ .Group }}`)
if err != nil {
panic(err)
}

err = event.RegisterTextExtendedTemplate("io.nats.jetstream.advisory.v1.consumer_group_pinned", `
[{{ .Time | ShortTime }}] [{{ .ID }}] Grouped Consumer Pinned a Client
Stream: {{ .Stream }}
Consumer: {{ .Consumer }}
Group: {{ .Group }}
Domain: {{ .Domain }}
Pinned Client: {{ .PinnedClientId }}`)
if err != nil {
panic(err)
}
}
36 changes: 36 additions & 0 deletions api/jetstream/advisory/consumer_group_unpinned.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package advisory

import "github.com/nats-io/jsm.go/api/event"

// JSConsumerGroupUnPinnedAdvisoryV1 is an advisory published when a consumer pinned_client grouped consumer unpins a client
//
// NATS Schema Type io.nats.jetstream.advisory.v1.consumer_group_unpinned
type JSConsumerGroupUnPinnedAdvisoryV1 struct {
event.NATSEvent

Account string `json:"account,omitempty"`
Stream string `json:"stream"`
Consumer string `json:"consumer"`
Domain string `json:"domain,omitempty"`
Group string `json:"group"`
Reason string `json:"reason"`
}

func init() {
err := event.RegisterTextCompactTemplate("io.nats.jetstream.advisory.v1.consumer_group_unpinned", `{{ .Time | ShortTime }} [UNPINNED] Consumer {{ .Stream }} > {{ .Consumer }} unpinned client for group {{ .Group }}: {{ .Reason }}`)
if err != nil {
panic(err)
}

err = event.RegisterTextExtendedTemplate("io.nats.jetstream.advisory.v1.consumer_group_unpinned", `
[{{ .Time | ShortTime }}] [{{ .ID }}] Grouped Consumer Un-Pinned a Client
Stream: {{ .Stream }}
Consumer: {{ .Consumer }}
Group: {{ .Group }}
Domain: {{ .Domain }}
Reason: {{ .Reason }}`)
if err != nil {
panic(err)
}
}
4 changes: 3 additions & 1 deletion api/schemas_generated.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// auto generated 2024-08-26 09:52:49.187854 +0200 CEST m=+0.016427876
// auto generated 2024-11-07 11:11:35.549364 +0100 CET m=+0.015695959

package api

Expand Down Expand Up @@ -34,6 +34,8 @@ var schemaTypes = map[string]func() any{
"io.nats.jetstream.advisory.v1.consumer_quorum_lost": func() any { return &jsadvisory.JSConsumerQuorumLostV1{} },
"io.nats.jetstream.advisory.v1.server_out_of_space": func() any { return &jsadvisory.JSServerOutOfSpaceAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.server_removed": func() any { return &jsadvisory.JSServerRemovedAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.consumer_group_pinned": func() any { return &jsadvisory.JSConsumerGroupPinnedAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.consumer_group_unpinned": func() any { return &jsadvisory.JSConsumerGroupUnPinnedAdvisoryV1{} },
"io.nats.jetstream.metric.v1.consumer_ack": func() any { return &jsmetric.ConsumerAckMetricV1{} },
"io.nats.jetstream.api.v1.consumer_configuration": func() any { return &ConsumerConfig{} },
"io.nats.jetstream.api.v1.stream_configuration": func() any { return &StreamConfig{} },
Expand Down
78 changes: 71 additions & 7 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,40 @@ func PauseUntil(deadline time.Time) ConsumerOption {
}
}

// PinnedClientPriorityGroups sets the consumer to be a pinned client priority consumer with a certain list of groups. When groups is empty the 'none' policy is set
func PinnedClientPriorityGroups(ttl time.Duration, groups ...string) ConsumerOption {
return func(o *api.ConsumerConfig) error {
if len(groups) == 0 {
o.PriorityGroups = []string{}
o.PriorityPolicy = api.PriorityNone
o.PinnedTTL = 0
return nil
}

o.PriorityPolicy = api.PriorityPinnedClient
o.PriorityGroups = groups
o.PinnedTTL = ttl

return nil
}
}

// OverflowPriorityGroups sets the consumer to support overflow pull requests
func OverflowPriorityGroups(groups ...string) ConsumerOption {
return func(o *api.ConsumerConfig) error {
if len(groups) == 0 {
o.PriorityGroups = []string{}
o.PriorityPolicy = api.PriorityNone
return nil
}

o.PriorityPolicy = api.PriorityOverflow
o.PriorityGroups = groups

return nil
}
}

// UpdateConfiguration updates the consumer configuration
// At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed
func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error {
Expand Down Expand Up @@ -944,13 +978,37 @@ func (c *Consumer) Resume() error {
return nil
}

func (c *Consumer) Name() string { return c.name }
func (c *Consumer) IsSampled() bool { return c.SampleFrequency() != "" }
func (c *Consumer) IsPullMode() bool { return c.cfg.DeliverSubject == "" }
func (c *Consumer) IsPushMode() bool { return !c.IsPullMode() }
func (c *Consumer) IsDurable() bool { return c.cfg.Durable != "" }
func (c *Consumer) IsEphemeral() bool { return !c.IsDurable() }
func (c *Consumer) IsHeadersOnly() bool { return c.cfg.HeadersOnly }
// Unpin requests that the server unpins the current client from a grouped consumer
func (c *Consumer) Unpin(group string) error {
if group == "" {
return fmt.Errorf("group is required")
}

if c.cfg.PriorityPolicy == api.PriorityPinnedClient {
return fmt.Errorf("consumer is not configured for pinned clients")
}

var resp *api.JSApiConsumerUnpinResponse

err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiConsumerUnpinT, c.StreamName(), c.Name()), api.JSApiConsumerUnpinRequest{Group: group}, &resp)
if err != nil {
return err
}

return nil
}

func (c *Consumer) Name() string { return c.name }
func (c *Consumer) IsSampled() bool { return c.SampleFrequency() != "" }
func (c *Consumer) IsPullMode() bool { return c.cfg.DeliverSubject == "" }
func (c *Consumer) IsPushMode() bool { return !c.IsPullMode() }
func (c *Consumer) IsDurable() bool { return c.cfg.Durable != "" }
func (c *Consumer) IsEphemeral() bool { return !c.IsDurable() }
func (c *Consumer) IsHeadersOnly() bool { return c.cfg.HeadersOnly }
func (c *Consumer) IsOverflowPriority() bool { return c.cfg.PriorityPolicy == api.PriorityOverflow }
func (c *Consumer) IsPinnedClientPriority() bool {
return c.cfg.PriorityPolicy == api.PriorityPinnedClient
}
func (c *Consumer) StreamName() string { return c.stream }
func (c *Consumer) DeliverySubject() string { return c.cfg.DeliverSubject }
func (c *Consumer) DurableName() string { return c.cfg.Durable }
Expand Down Expand Up @@ -985,3 +1043,9 @@ func (c *Consumer) StartTime() time.Time {
}
return *c.cfg.OptStartTime
}
func (c *Consumer) PriorityGroups() []string {
return c.cfg.PriorityGroups
}
func (c *Consumer) PriorityPolicy() api.PriorityPolicy {
return c.cfg.PriorityPolicy
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/jedib0t/go-pretty/v6 v6.6.1
github.com/klauspost/compress v1.17.11
github.com/nats-io/jwt/v2 v2.7.2
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.7.2 h1:SCRjfDLJ2q8naXp8YlGJJS5/yj3wGSODFYVi4nnwVMw=
github.com/nats-io/jwt/v2 v2.7.2/go.mod h1:kB6QUmqHG6Wdrzj0KP2L+OX4xiTPBeV+NHVstFaATXU=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241004162106-225cd3dd6eca h1:EcjPbDziop5sCVOpWOhUXTcQPAg1of3p+MgczwJubUo=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241004162106-225cd3dd6eca/go.mod h1:LkVD8QXFfIxYsQCn3r0NHJk48PIg2XY1RqVBDTQwYuU=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8 h1:XMoB88mOGh1u64NNvtBEfidj+rEewunIfLCdmrhsNmY=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241105211031-4122ab992c1a h1:dHIQ0YB484btLEUs9hBNYd3OgbVegvaWA16fA5v2GdA=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241105211031-4122ab992c1a/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e h1:IOoXHQJxuz++vUwiiM4nKMQynU5LrFbjZvS2JXFGoFg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand Down
Loading

0 comments on commit 297f2e8

Please sign in to comment.