From ca18849a291811bdaf1d2fd57e00f1117f53c00e Mon Sep 17 00:00:00 2001 From: Michael Wain Date: Thu, 14 Nov 2024 21:56:00 +0000 Subject: [PATCH] [IMPROVED] Add extra options to OrderedConsumerConfig Signed-off-by: Michael Wain --- jetstream/consumer_config.go | 10 +++++ jetstream/ordered.go | 18 +++++---- jetstream/test/ordered_test.go | 72 +++++++++++++++++++--------------- 3 files changed, 60 insertions(+), 40 deletions(-) diff --git a/jetstream/consumer_config.go b/jetstream/consumer_config.go index 4e2e3d6e0..913764fe2 100644 --- a/jetstream/consumer_config.go +++ b/jetstream/consumer_config.go @@ -263,6 +263,16 @@ type ( // Maximum number of attempts for the consumer to be recreated in a // single recreation cycle. Defaults to unlimited. MaxResetAttempts int + + // MaxRequestMaxBytes is the optional maximum total bytes that can be + // requested in a given batch. When set with MaxRequestBatch, the batch + // size will be constrained by whichever limit is hit first. + MaxRequestMaxBytes int `json:"max_bytes,omitempty"` + + // Metadata is a set of application-defined key-value pairs for + // associating metadata on the consumer. This feature requires + // nats-server v2.10.0 or later. + Metadata map[string]string `json:"metadata,omitempty"` } // DeliverPolicy determines from which point to start delivering messages. diff --git a/jetstream/ordered.go b/jetstream/ordered.go index 0d7f952c3..aed3e20d9 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -596,14 +596,16 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { } name := fmt.Sprintf("%s_%d", c.namePrefix, c.serial) cfg := &ConsumerConfig{ - Name: name, - DeliverPolicy: DeliverByStartSequencePolicy, - OptStartSeq: nextSeq, - AckPolicy: AckNonePolicy, - InactiveThreshold: 5 * time.Minute, - Replicas: 1, - HeadersOnly: c.cfg.HeadersOnly, - MemoryStorage: true, + Name: name, + DeliverPolicy: DeliverByStartSequencePolicy, + OptStartSeq: nextSeq, + AckPolicy: AckNonePolicy, + InactiveThreshold: 5 * time.Minute, + Replicas: 1, + HeadersOnly: c.cfg.HeadersOnly, + MemoryStorage: true, + MaxRequestMaxBytes: c.cfg.MaxRequestMaxBytes, + Metadata: c.cfg.Metadata, } if len(c.cfg.FilterSubjects) == 1 { cfg.FilterSubject = c.cfg.FilterSubjects[0] diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index 6680955be..c73fa0d1b 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -1946,47 +1946,55 @@ func TestOrderedConsumerConfig(t *testing.T) { { name: "all fields customized, start with custom seq", config: jetstream.OrderedConsumerConfig{ - FilterSubjects: []string{"foo.a", "foo.b"}, - DeliverPolicy: jetstream.DeliverByStartSequencePolicy, - OptStartSeq: 10, - ReplayPolicy: jetstream.ReplayOriginalPolicy, - InactiveThreshold: 10 * time.Second, - HeadersOnly: true, + FilterSubjects: []string{"foo.a", "foo.b"}, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + OptStartSeq: 10, + ReplayPolicy: jetstream.ReplayOriginalPolicy, + InactiveThreshold: 10 * time.Second, + HeadersOnly: true, + MaxRequestMaxBytes: 1024, + Metadata: map[string]string{"foo": "a"}, }, expected: jetstream.ConsumerConfig{ - FilterSubjects: []string{"foo.a", "foo.b"}, - OptStartSeq: 10, - DeliverPolicy: jetstream.DeliverByStartSequencePolicy, - AckPolicy: jetstream.AckNonePolicy, - MaxDeliver: -1, - MaxWaiting: 512, - InactiveThreshold: 10 * time.Second, - Replicas: 1, - MemoryStorage: true, - HeadersOnly: true, + FilterSubjects: []string{"foo.a", "foo.b"}, + OptStartSeq: 10, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + HeadersOnly: true, + MaxRequestMaxBytes: 1024, + Metadata: map[string]string{"foo": "a"}, }, }, { name: "all fields customized, start with custom time", config: jetstream.OrderedConsumerConfig{ - FilterSubjects: []string{"foo.a", "foo.b"}, - DeliverPolicy: jetstream.DeliverByStartTimePolicy, - OptStartTime: &time.Time{}, - ReplayPolicy: jetstream.ReplayOriginalPolicy, - InactiveThreshold: 10 * time.Second, - HeadersOnly: true, + FilterSubjects: []string{"foo.a", "foo.b"}, + DeliverPolicy: jetstream.DeliverByStartTimePolicy, + OptStartTime: &time.Time{}, + ReplayPolicy: jetstream.ReplayOriginalPolicy, + InactiveThreshold: 10 * time.Second, + HeadersOnly: true, + MaxRequestMaxBytes: 1024, + Metadata: map[string]string{"foo": "a"}, }, expected: jetstream.ConsumerConfig{ - FilterSubjects: []string{"foo.a", "foo.b"}, - OptStartTime: &time.Time{}, - DeliverPolicy: jetstream.DeliverByStartTimePolicy, - AckPolicy: jetstream.AckNonePolicy, - MaxDeliver: -1, - MaxWaiting: 512, - InactiveThreshold: 10 * time.Second, - Replicas: 1, - MemoryStorage: true, - HeadersOnly: true, + FilterSubjects: []string{"foo.a", "foo.b"}, + OptStartTime: &time.Time{}, + DeliverPolicy: jetstream.DeliverByStartTimePolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + HeadersOnly: true, + MaxRequestMaxBytes: 1024, + Metadata: map[string]string{"foo": "a"}, }, }, }