Skip to content

Commit

Permalink
Support pedantic mode for streams and consumers
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Aug 26, 2024
1 parent 0134ba6 commit 32e2c0b
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 12 deletions.
2 changes: 2 additions & 0 deletions api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type JSApiConsumerCreateRequest struct {
Stream string `json:"stream_name"`
Config ConsumerConfig `json:"config"`
Action ConsumerAction `json:"action"`
// Pedantic disables server features that would set defaults and adjust the provided config
Pedantic bool `json:"pedantic,omitempty"`
}

// io.nats.jetstream.api.v1.consumer_create_response
Expand Down
2 changes: 1 addition & 1 deletion api/schemas_generated.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// auto generated 2024-05-17 16:56:48.242365 +0200 CEST m=+0.023207376
// auto generated 2024-08-26 09:52:49.187854 +0200 CEST m=+0.016427876

package api

Expand Down
2 changes: 2 additions & 0 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type JSApiStreamInfoRequest struct {
// io.nats.jetstream.api.v1.stream_create_request
type JSApiStreamCreateRequest struct {
StreamConfig
// Pedantic disables server features that would set defaults and adjust the provided config
Pedantic bool `json:"pedantic,omitempty"`
}

// io.nats.jetstream.api.v1.stream_names_request
Expand Down
5 changes: 3 additions & 2 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig,
}

req := api.JSApiConsumerCreateRequest{
Stream: stream,
Config: *cfg,
Stream: stream,
Config: *cfg,
Pedantic: m.pedantic,
}

createdInfo, err := m.createConsumer(req)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/jedib0t/go-pretty/v6 v6.5.9
github.com/klauspost/compress v1.17.9
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240813233400-f2726b416745
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240813233400-f2726b416745 h1:jWXoDTp7NUeq+aPFANE2/1WBH52567DSn1G9bi0X64w=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240813233400-f2726b416745/go.mod h1:vCoSIPH7pQwl4DhZvT3XiLVjq50ycYEDy/ZbvTzNGrY=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7 h1:GnAHBiWk/CTiaKuf530XMNZdUevPqgvDaaEx4Dn6vEA=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7/go.mod h1:vCoSIPH7pQwl4DhZvT3XiLVjq50ycYEDy/ZbvTzNGrY=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand Down
6 changes: 6 additions & 0 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Manager struct {
apiPrefix string
eventPrefix string
domain string
pedantic bool

sync.Mutex
}
Expand All @@ -62,6 +63,11 @@ func New(nc *nats.Conn, opts ...Option) (*Manager, error) {
return m, nil
}

// IsPedantic checks if the manager is in pedantic mode
func (m *Manager) IsPedantic() bool {
return m.pedantic
}

// IsJetStreamEnabled determines if JetStream is enabled for the current account
func (m *Manager) IsJetStreamEnabled() bool {
info, err := m.JetStreamAccountInfo()
Expand Down
7 changes: 7 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,10 @@ func WithDomain(d string) Option {
o.domain = d
}
}

// WithPedanticRequests enables pedantic mode in certain API calls that would avoid the server changing user configurations during request handling
func WithPedanticRequests() Option {
return func(o *Manager) {
o.pedantic = true
}
}
5 changes: 5 additions & 0 deletions schema_source/jetstream/api/v1/consumer_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
"action" : {
"type": "string",
"description": "The consumer create action"
},
"pedantic": {
"type": "boolean",
"description": "Enables pedantic mode where the server will not apply defaults or change the request",
"default": false
}
}
}
10 changes: 10 additions & 0 deletions schema_source/jetstream/api/v1/stream_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@
"allOf": [
{
"$ref": "definitions.json#/definitions/stream_configuration"
},
{
"type": "object",
"properties": {
"pedantic": {
"type": "boolean",
"description": "Enables pedantic mode where the server will not apply defaults or change the request",
"default": false
}
}
}
]
}
5 changes: 5 additions & 0 deletions schemas/jetstream/api/v1/consumer_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@
"action": {
"type": "string",
"description": "The consumer create action"
},
"pedantic": {
"type": "boolean",
"description": "Enables pedantic mode where the server will not apply defaults or change the request",
"default": false
}
}
}
10 changes: 10 additions & 0 deletions schemas/jetstream/api/v1/stream_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,16 @@
}
}
}
},
{
"type": "object",
"properties": {
"pedantic": {
"type": "boolean",
"description": "Enables pedantic mode where the server will not apply defaults or change the request",
"default": false
}
}
}
]
}
48 changes: 44 additions & 4 deletions schemas/server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@
"constant": "JSConsumerPullRequiresAckErr",
"code": 400,
"error_code": 10084,
"description": "consumer in pull mode requires ack policy",
"description": "consumer in pull mode requires ack policy on workqueue stream",
"comment": "",
"help": "",
"url": "",
Expand Down Expand Up @@ -1433,7 +1433,7 @@
"constant": "JSSourceInvalidSubjectFilter",
"code": 400,
"error_code": 10145,
"description": "source subject filter is invalid",
"description": "source transform source: {err}",
"comment": "",
"help": "",
"url": "",
Expand All @@ -1443,7 +1443,7 @@
"constant": "JSSourceInvalidTransformDestination",
"code": 400,
"error_code": 10146,
"description": "source transform destination is invalid",
"description": "source transform: {err}",
"comment": "",
"help": "",
"url": "",
Expand Down Expand Up @@ -1493,7 +1493,7 @@
"constant": "JSMirrorInvalidSubjectFilter",
"code": 400,
"error_code": 10151,
"description": "mirror subject filter is invalid",
"description": "mirror transform source: {err}",
"comment": "",
"help": "",
"url": "",
Expand All @@ -1518,5 +1518,45 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorInvalidTransformDestination",
"code": 400,
"error_code": 10154,
"description": "mirror transform: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamTransformInvalidSource",
"code": 400,
"error_code": 10155,
"description": "stream transform source: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamTransformInvalidDestination",
"code": 400,
"error_code": 10156,
"description": "stream transform: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSPedanticErrF",
"code": 400,
"error_code": 10157,
"description": "pedantic mode: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
15 changes: 13 additions & 2 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts
}

var resp api.JSApiStreamCreateResponse
err = m.jsonRequest(fmt.Sprintf(api.JSApiStreamCreateT, name), &cfg, &resp)

req := api.JSApiStreamCreateRequest{
Pedantic: m.pedantic,
StreamConfig: *cfg,
}

err = m.jsonRequest(fmt.Sprintf(api.JSApiStreamCreateT, name), &req, &resp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,8 +556,13 @@ func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption)
return err
}

req := api.JSApiStreamCreateRequest{
Pedantic: s.mgr.pedantic,
StreamConfig: *ncfg,
}

var resp api.JSApiStreamUpdateResponse
err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiStreamUpdateT, s.Name()), ncfg, &resp)
err = s.mgr.jsonRequest(fmt.Sprintf(api.JSApiStreamUpdateT, s.Name()), &req, &resp)
if err != nil {
return err
}
Expand Down
33 changes: 33 additions & 0 deletions test/consumers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,3 +1024,36 @@ func TestConsumerDescription(t *testing.T) {
t.Fatalf("invalid description %q", c.Description())
}
}

func TestConsumerPedantic(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

s, err := mgr.NewStreamFromDefault("TEST", api.StreamConfig{}, jsm.Subjects("test.*"), jsm.ConsumerLimits(api.StreamConsumerLimits{
MaxAckPending: 10,
}))
checkErr(t, err, "create failed")

c, err := s.NewConsumer(jsm.MaxAckPending(0))
checkErr(t, err, "create failed")

if c.MaxAckPending() != 10 {
t.Fatalf("expected max ack to be overrode, got %v", c.MaxAckPending())
}

mgr, err = jsm.New(nc, jsm.WithPedanticRequests())
checkErr(t, err, "mgr failed")

if !mgr.IsPedantic() {
t.Fatalf("expected mgr to be pedantic")
}

s, err = mgr.LoadStream("TEST")
checkErr(t, err, "load stream failed")

_, err = s.NewConsumerFromDefault(api.ConsumerConfig{}, jsm.MaxAckPending(0))
if !api.IsNatsErr(err, 10157) {
t.Fatalf("expected pednatic error, got: %v", err)
}
}
25 changes: 25 additions & 0 deletions test/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,3 +1244,28 @@ func TestStreamRepublish(t *testing.T) {
t.Fatalf("Expected 2 mapped subjects to get messages: %#v", received)
}
}

func TestStreamPedantic(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

s, err := mgr.NewStreamFromDefault("TEST", api.StreamConfig{}, jsm.Subjects("test.*"), jsm.MaxAge(time.Second))
checkErr(t, err, "create failed")

if s.MaxAge() != time.Second {
t.Fatalf("expected max age to be overrode, got: %v", s.MaxAge())
}
checkErr(t, s.Delete(), "delete failed")

mgr, err = jsm.New(nc, jsm.WithPedanticRequests())
checkErr(t, err, "manager failed")
if !mgr.IsPedantic() {
t.Fatalf("expected mgr to be pedantic")
}

_, err = mgr.NewStreamFromDefault("TEST", api.StreamConfig{}, jsm.Subjects("test.*"), jsm.MaxAge(time.Second))
if !api.IsNatsErr(err, 10157) {
t.Fatalf("expected pednatic error, got: %v", err)
}
}

0 comments on commit 32e2c0b

Please sign in to comment.