From 297f2e87273aba1d97a86dfb667d2a4d5ac3566e Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 4 Nov 2024 14:28:28 +0100 Subject: [PATCH] Basic support for priority groups Signed-off-by: R.I.Pienaar --- api/consumers.go | 101 +++++++++++++++--- api/gen.go | 2 + .../advisory/consumer_group_pinned.go | 36 +++++++ .../advisory/consumer_group_unpinned.go | 36 +++++++ api/schemas_generated.go | 4 +- consumers.go | 78 ++++++++++++-- go.mod | 2 +- go.sum | 8 +- .../advisory/v1/consumer_group_pinned.json | 57 ++++++++++ .../advisory/v1/consumer_group_unpinned.json | 58 ++++++++++ .../jetstream/api/v1/definitions.json | 46 ++++++++ .../advisory/v1/consumer_group_pinned.json | 57 ++++++++++ .../advisory/v1/consumer_group_unpinned.json | 61 +++++++++++ .../api/v1/consumer_configuration.json | 20 ++++ .../api/v1/consumer_create_request.json | 20 ++++ .../api/v1/consumer_create_response.json | 27 +++++ .../api/v1/consumer_info_response.json | 27 +++++ .../api/v1/consumer_list_response.json | 27 +++++ schemas/server/errors.json | 42 +++++++- test/consumers_test.go | 38 +++++++ 20 files changed, 718 insertions(+), 29 deletions(-) create mode 100644 api/jetstream/advisory/consumer_group_pinned.go create mode 100644 api/jetstream/advisory/consumer_group_unpinned.go create mode 100644 schema_source/jetstream/advisory/v1/consumer_group_pinned.json create mode 100644 schema_source/jetstream/advisory/v1/consumer_group_unpinned.json create mode 100644 schemas/jetstream/advisory/v1/consumer_group_pinned.json create mode 100644 schemas/jetstream/advisory/v1/consumer_group_unpinned.json diff --git a/api/consumers.go b/api/consumers.go index aaae785..8bf7802 100644 --- a/api/consumers.go +++ b/api/consumers.go @@ -39,6 +39,7 @@ const ( JSApiRequestNext = "$JS.API.CONSUMER.MSG.NEXT.*.*" JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s" JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s" + JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s" JSMetricConsumerAckPre = JSMetricPrefix + ".CONSUMER.ACK" JSAdvisoryConsumerMaxDeliveryExceedPre = JSAdvisoryPrefix + ".CONSUMER.MAX_DELIVERIES" ) @@ -95,6 +96,16 @@ func (a *ConsumerAction) UnmarshalJSON(data []byte) error { return nil } +// io.nats.jetstream.api.v1.consumer_unpin_response +type JSApiConsumerUnpinRequest struct { + Group string `json:"group"` +} + +// io.nats.jetstream.api.v1.consumer_unpin_response +type JSApiConsumerUnpinResponse struct { + JSApiResponse +} + // io.nats.jetstream.api.v1.consumer_delete_response type JSApiConsumerDeleteResponse struct { JSApiResponse @@ -333,6 +344,53 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { } } +// PriorityPolicy determines policy for selecting messages based on priority. +type PriorityPolicy int + +const ( + PriorityNone PriorityPolicy = iota + PriorityOverflow + PriorityPinnedClient +) + +func (p PriorityPolicy) String() string { + switch p { + case PriorityOverflow: + return "Overflow" + case PriorityPinnedClient: + return "Pinned Client" + default: + return "None" + } +} + +func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString("none"): + *p = PriorityNone + case jsonString("overflow"): + *p = PriorityOverflow + case jsonString("pinned_client"): + *p = PriorityPinnedClient + default: + return fmt.Errorf("unknown priority policy: %v", string(data)) + } + return nil +} + +func (p PriorityPolicy) MarshalJSON() ([]byte, error) { + switch p { + case PriorityNone: + return json.Marshal("none") + case PriorityOverflow: + return json.Marshal("overflow") + case PriorityPinnedClient: + return json.Marshal("pinned_client") + default: + return nil, fmt.Errorf("unknown priority policy: %v", p) + } +} + // ConsumerConfig is the configuration for a JetStream consumes // // NATS Schema Type io.nats.jetstream.api.v1.consumer_configuration @@ -371,6 +429,11 @@ type ConsumerConfig struct { // PauseUntil is for suspending the consumer until the deadline. PauseUntil time.Time `json:"pause_until,omitempty"` + // Priority groups + PriorityGroups []string `json:"priority_groups,omitempty"` + PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"` + PinnedTTL time.Duration `json:"priority_timeout,omitempty"` + // Don't add to general clients. Direct bool `json:"direct,omitempty"` } @@ -384,21 +447,29 @@ type SequenceInfo struct { // ConsumerInfo reports the current state of a consumer type ConsumerInfo struct { - Stream string `json:"stream_name"` - Name string `json:"name"` - Config ConsumerConfig `json:"config"` - Created time.Time `json:"created"` - Delivered SequenceInfo `json:"delivered"` - AckFloor SequenceInfo `json:"ack_floor"` - NumAckPending int `json:"num_ack_pending"` - NumRedelivered int `json:"num_redelivered"` - NumWaiting int `json:"num_waiting"` - NumPending uint64 `json:"num_pending"` - Cluster *ClusterInfo `json:"cluster,omitempty"` - PushBound bool `json:"push_bound,omitempty"` - Paused bool `json:"paused,omitempty"` - PauseRemaining time.Duration `json:"pause_remaining,omitempty"` - TimeStamp time.Time `json:"ts"` + Stream string `json:"stream_name"` + Name string `json:"name"` + Config ConsumerConfig `json:"config"` + Created time.Time `json:"created"` + Delivered SequenceInfo `json:"delivered"` + AckFloor SequenceInfo `json:"ack_floor"` + NumAckPending int `json:"num_ack_pending"` + NumRedelivered int `json:"num_redelivered"` + NumWaiting int `json:"num_waiting"` + NumPending uint64 `json:"num_pending"` + Cluster *ClusterInfo `json:"cluster,omitempty"` + PushBound bool `json:"push_bound,omitempty"` + Paused bool `json:"paused,omitempty"` + PauseRemaining time.Duration `json:"pause_remaining,omitempty"` + TimeStamp time.Time `json:"ts"` + PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"` +} + +// PriorityGroupState is the state of a consumer group +type PriorityGroupState struct { + Group string `json:"group"` + PinnedClientID string `json:"pinned_client_id,omitempty"` + PinnedTS time.Time `json:"pinned_ts,omitempty"` } // JSApiConsumerGetNextRequest is for getting next messages for pull based consumers diff --git a/api/gen.go b/api/gen.go index 47da4f7..732e11c 100644 --- a/api/gen.go +++ b/api/gen.go @@ -209,6 +209,8 @@ func main() { &schema{P: "jetstream/advisory/v1/consumer_quorum_lost.json", St: "jsadvisory.JSConsumerQuorumLostV1"}, &schema{P: "jetstream/advisory/v1/server_out_of_space.json", St: "jsadvisory.JSServerOutOfSpaceAdvisoryV1"}, &schema{P: "jetstream/advisory/v1/server_removed.json", St: "jsadvisory.JSServerRemovedAdvisoryV1"}, + &schema{P: "jetstream/advisory/v1/consumer_group_pinned.json", St: "jsadvisory.JSConsumerGroupPinnedAdvisoryV1"}, + &schema{P: "jetstream/advisory/v1/consumer_group_unpinned.json", St: "jsadvisory.JSConsumerGroupUnPinnedAdvisoryV1"}, &schema{P: "jetstream/metric/v1/consumer_ack.json", St: "jsmetric.ConsumerAckMetricV1"}, &schema{P: "jetstream/api/v1/consumer_configuration.json", St: "ConsumerConfig"}, &schema{P: "jetstream/api/v1/stream_configuration.json", St: "StreamConfig"}, diff --git a/api/jetstream/advisory/consumer_group_pinned.go b/api/jetstream/advisory/consumer_group_pinned.go new file mode 100644 index 0000000..893a387 --- /dev/null +++ b/api/jetstream/advisory/consumer_group_pinned.go @@ -0,0 +1,36 @@ +package advisory + +import "github.com/nats-io/jsm.go/api/event" + +// JSConsumerGroupPinnedAdvisoryV1 is an advisory published when a consumer pinned_client grouped consumer pins a client +// +// NATS Schema Type io.nats.jetstream.advisory.v1.consumer_group_pinned +type JSConsumerGroupPinnedAdvisoryV1 struct { + event.NATSEvent + + Account string `json:"account,omitempty"` + Stream string `json:"stream"` + Consumer string `json:"consumer"` + Domain string `json:"domain,omitempty"` + Group string `json:"group"` + PinnedClientId string `json:"pinned_id"` +} + +func init() { + err := event.RegisterTextCompactTemplate("io.nats.jetstream.advisory.v1.consumer_group_pinned", `{{ .Time | ShortTime }} [PINNED] Consumer {{ .Stream }} > {{ .Consumer }} pinned client {{ .PinnedClientId }} for group {{ .Group }}`) + if err != nil { + panic(err) + } + + err = event.RegisterTextExtendedTemplate("io.nats.jetstream.advisory.v1.consumer_group_pinned", ` +[{{ .Time | ShortTime }}] [{{ .ID }}] Grouped Consumer Pinned a Client + + Stream: {{ .Stream }} + Consumer: {{ .Consumer }} + Group: {{ .Group }} + Domain: {{ .Domain }} + Pinned Client: {{ .PinnedClientId }}`) + if err != nil { + panic(err) + } +} diff --git a/api/jetstream/advisory/consumer_group_unpinned.go b/api/jetstream/advisory/consumer_group_unpinned.go new file mode 100644 index 0000000..47aca8a --- /dev/null +++ b/api/jetstream/advisory/consumer_group_unpinned.go @@ -0,0 +1,36 @@ +package advisory + +import "github.com/nats-io/jsm.go/api/event" + +// JSConsumerGroupUnPinnedAdvisoryV1 is an advisory published when a consumer pinned_client grouped consumer unpins a client +// +// NATS Schema Type io.nats.jetstream.advisory.v1.consumer_group_unpinned +type JSConsumerGroupUnPinnedAdvisoryV1 struct { + event.NATSEvent + + Account string `json:"account,omitempty"` + Stream string `json:"stream"` + Consumer string `json:"consumer"` + Domain string `json:"domain,omitempty"` + Group string `json:"group"` + Reason string `json:"reason"` +} + +func init() { + err := event.RegisterTextCompactTemplate("io.nats.jetstream.advisory.v1.consumer_group_unpinned", `{{ .Time | ShortTime }} [UNPINNED] Consumer {{ .Stream }} > {{ .Consumer }} unpinned client for group {{ .Group }}: {{ .Reason }}`) + if err != nil { + panic(err) + } + + err = event.RegisterTextExtendedTemplate("io.nats.jetstream.advisory.v1.consumer_group_unpinned", ` +[{{ .Time | ShortTime }}] [{{ .ID }}] Grouped Consumer Un-Pinned a Client + + Stream: {{ .Stream }} + Consumer: {{ .Consumer }} + Group: {{ .Group }} + Domain: {{ .Domain }} + Reason: {{ .Reason }}`) + if err != nil { + panic(err) + } +} diff --git a/api/schemas_generated.go b/api/schemas_generated.go index 3b1a180..ac0a30b 100644 --- a/api/schemas_generated.go +++ b/api/schemas_generated.go @@ -1,4 +1,4 @@ -// auto generated 2024-08-26 09:52:49.187854 +0200 CEST m=+0.016427876 +// auto generated 2024-11-07 11:11:35.549364 +0100 CET m=+0.015695959 package api @@ -34,6 +34,8 @@ var schemaTypes = map[string]func() any{ "io.nats.jetstream.advisory.v1.consumer_quorum_lost": func() any { return &jsadvisory.JSConsumerQuorumLostV1{} }, "io.nats.jetstream.advisory.v1.server_out_of_space": func() any { return &jsadvisory.JSServerOutOfSpaceAdvisoryV1{} }, "io.nats.jetstream.advisory.v1.server_removed": func() any { return &jsadvisory.JSServerRemovedAdvisoryV1{} }, + "io.nats.jetstream.advisory.v1.consumer_group_pinned": func() any { return &jsadvisory.JSConsumerGroupPinnedAdvisoryV1{} }, + "io.nats.jetstream.advisory.v1.consumer_group_unpinned": func() any { return &jsadvisory.JSConsumerGroupUnPinnedAdvisoryV1{} }, "io.nats.jetstream.metric.v1.consumer_ack": func() any { return &jsmetric.ConsumerAckMetricV1{} }, "io.nats.jetstream.api.v1.consumer_configuration": func() any { return &ConsumerConfig{} }, "io.nats.jetstream.api.v1.stream_configuration": func() any { return &StreamConfig{} }, diff --git a/consumers.go b/consumers.go index 6ce1780..9696dfb 100644 --- a/consumers.go +++ b/consumers.go @@ -629,6 +629,40 @@ func PauseUntil(deadline time.Time) ConsumerOption { } } +// PinnedClientPriorityGroups sets the consumer to be a pinned client priority consumer with a certain list of groups. When groups is empty the 'none' policy is set +func PinnedClientPriorityGroups(ttl time.Duration, groups ...string) ConsumerOption { + return func(o *api.ConsumerConfig) error { + if len(groups) == 0 { + o.PriorityGroups = []string{} + o.PriorityPolicy = api.PriorityNone + o.PinnedTTL = 0 + return nil + } + + o.PriorityPolicy = api.PriorityPinnedClient + o.PriorityGroups = groups + o.PinnedTTL = ttl + + return nil + } +} + +// OverflowPriorityGroups sets the consumer to support overflow pull requests +func OverflowPriorityGroups(groups ...string) ConsumerOption { + return func(o *api.ConsumerConfig) error { + if len(groups) == 0 { + o.PriorityGroups = []string{} + o.PriorityPolicy = api.PriorityNone + return nil + } + + o.PriorityPolicy = api.PriorityOverflow + o.PriorityGroups = groups + + return nil + } +} + // UpdateConfiguration updates the consumer configuration // At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error { @@ -944,13 +978,37 @@ func (c *Consumer) Resume() error { return nil } -func (c *Consumer) Name() string { return c.name } -func (c *Consumer) IsSampled() bool { return c.SampleFrequency() != "" } -func (c *Consumer) IsPullMode() bool { return c.cfg.DeliverSubject == "" } -func (c *Consumer) IsPushMode() bool { return !c.IsPullMode() } -func (c *Consumer) IsDurable() bool { return c.cfg.Durable != "" } -func (c *Consumer) IsEphemeral() bool { return !c.IsDurable() } -func (c *Consumer) IsHeadersOnly() bool { return c.cfg.HeadersOnly } +// Unpin requests that the server unpins the current client from a grouped consumer +func (c *Consumer) Unpin(group string) error { + if group == "" { + return fmt.Errorf("group is required") + } + + if c.cfg.PriorityPolicy == api.PriorityPinnedClient { + return fmt.Errorf("consumer is not configured for pinned clients") + } + + var resp *api.JSApiConsumerUnpinResponse + + err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiConsumerUnpinT, c.StreamName(), c.Name()), api.JSApiConsumerUnpinRequest{Group: group}, &resp) + if err != nil { + return err + } + + return nil +} + +func (c *Consumer) Name() string { return c.name } +func (c *Consumer) IsSampled() bool { return c.SampleFrequency() != "" } +func (c *Consumer) IsPullMode() bool { return c.cfg.DeliverSubject == "" } +func (c *Consumer) IsPushMode() bool { return !c.IsPullMode() } +func (c *Consumer) IsDurable() bool { return c.cfg.Durable != "" } +func (c *Consumer) IsEphemeral() bool { return !c.IsDurable() } +func (c *Consumer) IsHeadersOnly() bool { return c.cfg.HeadersOnly } +func (c *Consumer) IsOverflowPriority() bool { return c.cfg.PriorityPolicy == api.PriorityOverflow } +func (c *Consumer) IsPinnedClientPriority() bool { + return c.cfg.PriorityPolicy == api.PriorityPinnedClient +} func (c *Consumer) StreamName() string { return c.stream } func (c *Consumer) DeliverySubject() string { return c.cfg.DeliverSubject } func (c *Consumer) DurableName() string { return c.cfg.Durable } @@ -985,3 +1043,9 @@ func (c *Consumer) StartTime() time.Time { } return *c.cfg.OptStartTime } +func (c *Consumer) PriorityGroups() []string { + return c.cfg.PriorityGroups +} +func (c *Consumer) PriorityPolicy() api.PriorityPolicy { + return c.cfg.PriorityPolicy +} diff --git a/go.mod b/go.mod index de0295b..29b748f 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/jedib0t/go-pretty/v6 v6.6.1 github.com/klauspost/compress v1.17.11 github.com/nats-io/jwt/v2 v2.7.2 - github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8 + github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index 3b2ca9f..f25eb9d 100644 --- a/go.sum +++ b/go.sum @@ -29,10 +29,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/jwt/v2 v2.7.2 h1:SCRjfDLJ2q8naXp8YlGJJS5/yj3wGSODFYVi4nnwVMw= github.com/nats-io/jwt/v2 v2.7.2/go.mod h1:kB6QUmqHG6Wdrzj0KP2L+OX4xiTPBeV+NHVstFaATXU= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241004162106-225cd3dd6eca h1:EcjPbDziop5sCVOpWOhUXTcQPAg1of3p+MgczwJubUo= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241004162106-225cd3dd6eca/go.mod h1:LkVD8QXFfIxYsQCn3r0NHJk48PIg2XY1RqVBDTQwYuU= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8 h1:XMoB88mOGh1u64NNvtBEfidj+rEewunIfLCdmrhsNmY= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241105211031-4122ab992c1a h1:dHIQ0YB484btLEUs9hBNYd3OgbVegvaWA16fA5v2GdA= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241105211031-4122ab992c1a/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e h1:IOoXHQJxuz++vUwiiM4nKMQynU5LrFbjZvS2JXFGoFg= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= diff --git a/schema_source/jetstream/advisory/v1/consumer_group_pinned.json b/schema_source/jetstream/advisory/v1/consumer_group_pinned.json new file mode 100644 index 0000000..4cafd93 --- /dev/null +++ b/schema_source/jetstream/advisory/v1/consumer_group_pinned.json @@ -0,0 +1,57 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_group_pinned.json", + "description": "An Advisory sent when a pinned_client grouped consumer pinned a client", + "title": "io.nats.jetstream.advisory.v1.consumer_group_pinned", + "type": "object", + "required": [ + "type", + "id", + "timestamp", + "server", + "stream", + "consumer", + "group", + "pinned_id" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.advisory.v1.consumer_group_pinned" + }, + "id": { + "type": "string", + "description": "Unique correlation ID for this event" + }, + "timestamp": { + "type": "string", + "description": "The time this event was created in RFC3339 format" + }, + "account": { + "type": "string", + "description": "The account hosting the consumer" + }, + "stream": { + "type": "string", + "description": "The stream hosting the consumer" + }, + "consumer": { + "type": "string", + "description": "The consumer name" + }, + "domain": { + "type": "string", + "minLength": 1, + "description": "The domain of the JetStreamServer" + }, + "group": { + "type": "string", + "description": "The group that unpinned a client" + }, + "pinned_id": { + "type": "string", + "description": "The unique server-assigned ID for the client" + } + } +} diff --git a/schema_source/jetstream/advisory/v1/consumer_group_unpinned.json b/schema_source/jetstream/advisory/v1/consumer_group_unpinned.json new file mode 100644 index 0000000..fc4c66f --- /dev/null +++ b/schema_source/jetstream/advisory/v1/consumer_group_unpinned.json @@ -0,0 +1,58 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_group_unpinned.json", + "description": "An Advisory sent when a pinned_client grouped consumer unpinned a client", + "title": "io.nats.jetstream.advisory.v1.consumer_group_unpinned", + "type": "object", + "required": [ + "type", + "id", + "timestamp", + "server", + "stream", + "consumer", + "group", + "reason" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.advisory.v1.consumer_group_unpinned" + }, + "id": { + "type": "string", + "description": "Unique correlation ID for this event" + }, + "timestamp": { + "type": "string", + "description": "The time this event was created in RFC3339 format" + }, + "account": { + "type": "string", + "description": "The account hosting the consumer" + }, + "stream": { + "type": "string", + "description": "The stream hosting the consumer" + }, + "consumer": { + "type": "string", + "description": "The consumer name" + }, + "domain": { + "type": "string", + "minLength": 1, + "description": "The domain of the JetStreamServer" + }, + "group": { + "type": "string", + "description": "The group that unpinned a client" + }, + "reason": { + "type": "string", + "enum": ["admin", "timeout"], + "description": "The reason the client was unpinned" + } + } +} diff --git a/schema_source/jetstream/api/v1/definitions.json b/schema_source/jetstream/api/v1/definitions.json index d8ac872..7f7beaa 100644 --- a/schema_source/jetstream/api/v1/definitions.json +++ b/schema_source/jetstream/api/v1/definitions.json @@ -70,6 +70,26 @@ } } }, + "priority_group_state": { + "required": ["group"], + "type": "object", + "description": "Status of a specific consumer priority group", + "properties": { + "group": { + "type": "string", + "description": "The group this status is for", + "minLength": 1 + }, + "pinned_client_id": { + "type": "string", + "description": "The generated ID of the pinned client" + }, + "pinned_ts": { + "description": "The timestamp when the client was pinned", + "$ref": "#/definitions/golang_time" + } + } + }, "external_stream_source": { "required": ["api"], "type": "object", @@ -465,6 +485,10 @@ "pattern": "^[^.*>]+$", "minLength": 1 }, + "priority_policy": { + "type": "string", + "enum": ["none","overflow","pinned_client"] + }, "deliver_policy": { "oneOf": [ {"$ref": "#/definitions/all_deliver_policy"}, @@ -648,6 +672,13 @@ "pause_until": { "description": "A deadline time for when the consumer will be paused. Only usable if 'paused' is true", "$ref": "#/definitions/golang_time" + }, + "priority_groups": { + "description": "The state of Priority Groups", + "type": "array", + "items": { + "$ref": "#/definitions/priority_group_state" + } } } }, @@ -796,6 +827,21 @@ "pause_until": { "description": "When creating a consumer supplying a time in the future will act as a deadline for when the consumer will be paused till", "$ref": "#/definitions/golang_time" + }, + "priority_groups": { + "description": "List of priority groups this consumer supports", + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "priority_policy": { + "description": "The policy the consumer is set to", + "$ref": "#/definitions/priority_policy" + }, + "pinned_ttl": { + "description": "For pinned_client priority policy how long before the client times out" } } }, diff --git a/schemas/jetstream/advisory/v1/consumer_group_pinned.json b/schemas/jetstream/advisory/v1/consumer_group_pinned.json new file mode 100644 index 0000000..4cafd93 --- /dev/null +++ b/schemas/jetstream/advisory/v1/consumer_group_pinned.json @@ -0,0 +1,57 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_group_pinned.json", + "description": "An Advisory sent when a pinned_client grouped consumer pinned a client", + "title": "io.nats.jetstream.advisory.v1.consumer_group_pinned", + "type": "object", + "required": [ + "type", + "id", + "timestamp", + "server", + "stream", + "consumer", + "group", + "pinned_id" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.advisory.v1.consumer_group_pinned" + }, + "id": { + "type": "string", + "description": "Unique correlation ID for this event" + }, + "timestamp": { + "type": "string", + "description": "The time this event was created in RFC3339 format" + }, + "account": { + "type": "string", + "description": "The account hosting the consumer" + }, + "stream": { + "type": "string", + "description": "The stream hosting the consumer" + }, + "consumer": { + "type": "string", + "description": "The consumer name" + }, + "domain": { + "type": "string", + "minLength": 1, + "description": "The domain of the JetStreamServer" + }, + "group": { + "type": "string", + "description": "The group that unpinned a client" + }, + "pinned_id": { + "type": "string", + "description": "The unique server-assigned ID for the client" + } + } +} diff --git a/schemas/jetstream/advisory/v1/consumer_group_unpinned.json b/schemas/jetstream/advisory/v1/consumer_group_unpinned.json new file mode 100644 index 0000000..545704f --- /dev/null +++ b/schemas/jetstream/advisory/v1/consumer_group_unpinned.json @@ -0,0 +1,61 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_group_unpinned.json", + "description": "An Advisory sent when a pinned_client grouped consumer unpinned a client", + "title": "io.nats.jetstream.advisory.v1.consumer_group_unpinned", + "type": "object", + "required": [ + "type", + "id", + "timestamp", + "server", + "stream", + "consumer", + "group", + "reason" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "const": "io.nats.jetstream.advisory.v1.consumer_group_unpinned" + }, + "id": { + "type": "string", + "description": "Unique correlation ID for this event" + }, + "timestamp": { + "type": "string", + "description": "The time this event was created in RFC3339 format" + }, + "account": { + "type": "string", + "description": "The account hosting the consumer" + }, + "stream": { + "type": "string", + "description": "The stream hosting the consumer" + }, + "consumer": { + "type": "string", + "description": "The consumer name" + }, + "domain": { + "type": "string", + "minLength": 1, + "description": "The domain of the JetStreamServer" + }, + "group": { + "type": "string", + "description": "The group that unpinned a client" + }, + "reason": { + "type": "string", + "enum": [ + "admin", + "timeout" + ], + "description": "The reason the client was unpinned" + } + } +} diff --git a/schemas/jetstream/api/v1/consumer_configuration.json b/schemas/jetstream/api/v1/consumer_configuration.json index f9b0017..1fbafef 100644 --- a/schemas/jetstream/api/v1/consumer_configuration.json +++ b/schemas/jetstream/api/v1/consumer_configuration.json @@ -284,6 +284,26 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "priority_groups": { + "description": "List of priority groups this consumer supports", + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "priority_policy": { + "description": "The policy the consumer is set to", + "type": "string", + "enum": [ + "none", + "overflow", + "pinned_client" + ] + }, + "pinned_ttl": { + "description": "For pinned_client priority policy how long before the client times out" } } } diff --git a/schemas/jetstream/api/v1/consumer_create_request.json b/schemas/jetstream/api/v1/consumer_create_request.json index 6955491..2b72c39 100644 --- a/schemas/jetstream/api/v1/consumer_create_request.json +++ b/schemas/jetstream/api/v1/consumer_create_request.json @@ -296,6 +296,26 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "priority_groups": { + "description": "List of priority groups this consumer supports", + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "priority_policy": { + "description": "The policy the consumer is set to", + "type": "string", + "enum": [ + "none", + "overflow", + "pinned_client" + ] + }, + "pinned_ttl": { + "description": "For pinned_client priority policy how long before the client times out" } } }, diff --git a/schemas/jetstream/api/v1/consumer_create_response.json b/schemas/jetstream/api/v1/consumer_create_response.json index 47b8f20..86f8219 100644 --- a/schemas/jetstream/api/v1/consumer_create_response.json +++ b/schemas/jetstream/api/v1/consumer_create_response.json @@ -318,6 +318,26 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "priority_groups": { + "description": "List of priority groups this consumer supports", + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "priority_policy": { + "description": "The policy the consumer is set to", + "type": "string", + "enum": [ + "none", + "overflow", + "pinned_client" + ] + }, + "pinned_ttl": { + "description": "For pinned_client priority policy how long before the client times out" } } }, @@ -490,6 +510,13 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "pinned_ids": { + "description": "The IDs pinned by each consumer priority group", + "type": "object", + "additionalProperties": { + "type": "string" + } } } }, diff --git a/schemas/jetstream/api/v1/consumer_info_response.json b/schemas/jetstream/api/v1/consumer_info_response.json index 0261dab..8713976 100644 --- a/schemas/jetstream/api/v1/consumer_info_response.json +++ b/schemas/jetstream/api/v1/consumer_info_response.json @@ -318,6 +318,26 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "priority_groups": { + "description": "List of priority groups this consumer supports", + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "priority_policy": { + "description": "The policy the consumer is set to", + "type": "string", + "enum": [ + "none", + "overflow", + "pinned_client" + ] + }, + "pinned_ttl": { + "description": "For pinned_client priority policy how long before the client times out" } } }, @@ -490,6 +510,13 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "pinned_ids": { + "description": "The IDs pinned by each consumer priority group", + "type": "object", + "additionalProperties": { + "type": "string" + } } } }, diff --git a/schemas/jetstream/api/v1/consumer_list_response.json b/schemas/jetstream/api/v1/consumer_list_response.json index 4cc72d5..e0aeef5 100644 --- a/schemas/jetstream/api/v1/consumer_list_response.json +++ b/schemas/jetstream/api/v1/consumer_list_response.json @@ -383,6 +383,26 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "priority_groups": { + "description": "List of priority groups this consumer supports", + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "priority_policy": { + "description": "The policy the consumer is set to", + "type": "string", + "enum": [ + "none", + "overflow", + "pinned_client" + ] + }, + "pinned_ttl": { + "description": "For pinned_client priority policy how long before the client times out" } } }, @@ -555,6 +575,13 @@ "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", "type": "string", "format": "date-time" + }, + "pinned_ids": { + "description": "The IDs pinned by each consumer priority group", + "type": "object", + "additionalProperties": { + "type": "string" + } } } }, diff --git a/schemas/server/errors.json b/schemas/server/errors.json index c7eaef2..0111df4 100644 --- a/schemas/server/errors.json +++ b/schemas/server/errors.json @@ -203,7 +203,7 @@ "constant": "JSInvalidJSONErr", "code": 400, "error_code": 10025, - "description": "invalid JSON", + "description": "invalid JSON: {err}", "comment": "", "help": "", "url": "", @@ -1558,5 +1558,45 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSStreamDuplicateMessageConflict", + "code": 409, + "error_code": 10158, + "description": "duplicate message id is in process", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerPriorityPolicyWithoutGroup", + "code": 400, + "error_code": 10159, + "description": "Setting PriorityPolicy requires at least one PriorityGroup to be set", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerInvalidPriorityGroupErr", + "code": 400, + "error_code": 10160, + "description": "Provided priority group does not exist for this consumer", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerEmptyGroupName", + "code": 400, + "error_code": 10161, + "description": "Group name cannot be an empty string", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/test/consumers_test.go b/test/consumers_test.go index 32488a1..61c118b 100644 --- a/test/consumers_test.go +++ b/test/consumers_test.go @@ -1057,3 +1057,41 @@ func TestConsumerPedantic(t *testing.T) { t.Fatalf("expected pednatic error, got: %v", err) } } + +func TestConsumerPinnedClientPriorityGroups(t *testing.T) { + srv, nc, mgr := startJSServer(t) + defer srv.Shutdown() + defer nc.Flush() + + s, err := mgr.NewStreamFromDefault("TEST", api.StreamConfig{}, jsm.Subjects("test.*")) + checkErr(t, err, "create failed") + + c, err := s.NewConsumer(jsm.PinnedClientPriorityGroups(time.Minute, "foo")) + checkErr(t, err, "create failed") + + if !c.IsPinnedClientPriority() { + t.Fatalf("expected pinned client priority to be set") + } + if !cmp.Equal(c.PriorityGroups(), []string{"foo"}) { + t.Fatalf("invalid priority group to be [foo], got %v", c.PriorityGroups()) + } +} + +func TestConsumerOverflowPriorityGroups(t *testing.T) { + srv, nc, mgr := startJSServer(t) + defer srv.Shutdown() + defer nc.Flush() + + s, err := mgr.NewStreamFromDefault("TEST", api.StreamConfig{}, jsm.Subjects("test.*")) + checkErr(t, err, "create failed") + + c, err := s.NewConsumer(jsm.OverflowPriorityGroups("foo")) + checkErr(t, err, "create failed") + + if !c.IsOverflowPriority() { + t.Fatalf("expected overflow priority to be set got %v", c.PriorityPolicy()) + } + if !cmp.Equal(c.PriorityGroups(), []string{"foo"}) { + t.Fatalf("invalid priority group to be [foo], got %v", c.PriorityGroups()) + } +}