From 3d6d45ee33bb7ea483ce0984413d7e520f8ba496 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 21 Jun 2024 15:14:51 +0300 Subject: [PATCH] Extract stream and consumer monitoring into package This allows the CLI and other tools to share logic and yield identical results Signed-off-by: R.I.Pienaar --- {connbalancer => api}/logger.go | 18 +- connbalancer/balancer.go | 37 +- connbalancer/balancer_test.go | 3 +- consumers_monitor.go | 199 ++++++ consumers_monitor_test.go | 246 ++++++++ go.mod | 16 +- go.sum | 39 +- jsm.go | 102 +++ monitor/monitor.go | 41 ++ monitor/perfdata.go | 65 ++ monitor/result.go | 305 +++++++++ .../nats/context/user_pass_token_creds.json | 3 +- streams.go | 11 +- streams_monitor.go | 368 +++++++++++ streams_monitor_test.go | 580 ++++++++++++++++++ consumers_test.go => test/consumers_test.go | 4 +- jsm_test.go => test/jsm_test.go | 2 +- manager_test.go => test/manager_test.go | 2 +- msginfo_test.go => test/msginfo_test.go | 2 +- snapshots_test.go => test/snapshots_test.go | 3 +- .../stream_pager_test.go | 15 +- .../stream_query_test.go | 4 +- streams_test.go => test/streams_test.go | 4 +- .../testdata}/bytes_required.cfg | 0 24 files changed, 2018 insertions(+), 51 deletions(-) rename {connbalancer => api}/logger.go (77%) create mode 100644 consumers_monitor.go create mode 100644 consumers_monitor_test.go create mode 100644 monitor/monitor.go create mode 100644 monitor/perfdata.go create mode 100644 monitor/result.go create mode 100644 streams_monitor.go create mode 100644 streams_monitor_test.go rename consumers_test.go => test/consumers_test.go (99%) rename jsm_test.go => test/jsm_test.go (99%) rename manager_test.go => test/manager_test.go (99%) rename msginfo_test.go => test/msginfo_test.go (99%) rename snapshots_test.go => test/snapshots_test.go (99%) rename stream_pager_test.go => test/stream_pager_test.go (80%) rename stream_query_test.go => test/stream_query_test.go (99%) rename streams_test.go => test/streams_test.go (99%) rename {testdata => test/testdata}/bytes_required.cfg (100%) diff --git a/connbalancer/logger.go b/api/logger.go similarity index 77% rename from connbalancer/logger.go rename to api/logger.go index 5ed469a3..2841fb02 100644 --- a/connbalancer/logger.go +++ b/api/logger.go @@ -11,17 +11,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -package connbalancer +package api import ( "log" ) type Logger interface { - Trace(format string, a ...any) - Debug(format string, a ...any) - Info(format string, a ...any) - Error(format string, a ...any) + Tracef(format string, a ...any) + Debugf(format string, a ...any) + Infof(format string, a ...any) + Errorf(format string, a ...any) } type Level uint @@ -46,25 +46,25 @@ func NewDiscardLogger() Logger { return &dfltLogger{lvl: ErrorLevel, logFunc: func(format string, a ...any) {}} } -func (d *dfltLogger) Trace(format string, a ...any) { +func (d *dfltLogger) Tracef(format string, a ...any) { if d.lvl >= TraceLevel { d.logFunc(format, a...) } } -func (d *dfltLogger) Debug(format string, a ...any) { +func (d *dfltLogger) Debugf(format string, a ...any) { if d.lvl >= DebugLevel { d.logFunc(format, a...) } } -func (d *dfltLogger) Info(format string, a ...any) { +func (d *dfltLogger) Infof(format string, a ...any) { if d.lvl >= InfoLevel { d.logFunc(format, a...) } } -func (d *dfltLogger) Error(format string, a ...any) { +func (d *dfltLogger) Errorf(format string, a ...any) { if d.lvl >= ErrorLevel { d.logFunc(format, a...) } diff --git a/connbalancer/balancer.go b/connbalancer/balancer.go index 306d331a..7b8e99fc 100644 --- a/connbalancer/balancer.go +++ b/connbalancer/balancer.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) @@ -41,7 +42,7 @@ type balancer struct { nc *nats.Conn duration time.Duration limits *ConnectionSelector - log Logger + log api.Logger } type conn struct { @@ -50,7 +51,7 @@ type conn struct { conn *server.ConnInfo } -func New(nc *nats.Conn, runTime time.Duration, log Logger, connections ConnectionSelector) (Balancer, error) { +func New(nc *nats.Conn, runTime time.Duration, log api.Logger, connections ConnectionSelector) (Balancer, error) { if connections.SubjectInterest != "" && connections.Account == "" { return nil, fmt.Errorf("can only filter by subject if account is given") } @@ -84,14 +85,14 @@ func (c *balancer) Balance(ctx context.Context) (int, error) { return 0, err } - c.log.Debug("Had %d connz responses", len(connz)) + c.log.Debugf("Had %d connz responses", len(connz)) matched, err := c.pickConnections(connz) if err != nil { return 0, err } - c.log.Debug("Matched %d connections", len(matched)) + c.log.Debugf("Matched %d connections", len(matched)) if len(matched) == 0 { return 0, nil @@ -100,40 +101,40 @@ func (c *balancer) Balance(ctx context.Context) (int, error) { var sleep = c.duration / time.Duration(len(matched)) var success int - c.log.Info("Balancing %d connections with %v sleep between each balance request", len(matched), sleep) + c.log.Infof("Balancing %d connections with %v sleep between each balance request", len(matched), sleep) for i, m := range matched { cid, err := c.nc.GetClientID() if err != nil { - c.log.Error("Could not exclude self from kicks: %v", err) + c.log.Errorf("Could not exclude self from kicks: %v", err) continue } if m.serverId == c.nc.ConnectedServerId() && m.conn.Cid == cid { - c.log.Debug("Not kicking own connection") + c.log.Debugf("Not kicking own connection") continue } res, err := c.reqMany(ctx, fmt.Sprintf("$SYS.REQ.SERVER.%s.KICK", m.serverId), &server.KickClientReq{CID: m.conn.Cid}, 1) if err != nil { - c.log.Error("Could not kick %d on %s: %v", m.conn.Cid, m.serverId, err) + c.log.Errorf("Could not kick %d on %s: %v", m.conn.Cid, m.serverId, err) continue } if len(res) != 1 { - c.log.Error("Could not kick %d on %s: expected 1 response but had %d", m.conn.Cid, m.serverId, len(res)) + c.log.Errorf("Could not kick %d on %s: expected 1 response but had %d", m.conn.Cid, m.serverId, len(res)) continue } var resp server.ServerAPIResponse err = json.Unmarshal(res[0].Data, &resp) if err != nil { - c.log.Error("Could not kick %d on %s: invalid server response: %v", m.conn.Cid, m.serverId, err) + c.log.Errorf("Could not kick %d on %s: invalid server response: %v", m.conn.Cid, m.serverId, err) continue } if resp.Error != nil { - c.log.Error("Could not kick %d on %s: invalid server response: %v", m.conn.Cid, m.serverId, resp.Error.Description) + c.log.Errorf("Could not kick %d on %s: invalid server response: %v", m.conn.Cid, m.serverId, resp.Error.Description) continue } @@ -143,16 +144,16 @@ func (c *balancer) Balance(ctx context.Context) (int, error) { } if m.conn.Account != "" { - c.log.Info("Balanced client %d%s in account %s on %s", m.conn.Cid, name, m.conn.Account, m.serverName) + c.log.Infof("Balanced client %d%s in account %s on %s", m.conn.Cid, name, m.conn.Account, m.serverName) } else { - c.log.Info("Balanced client %d%s on %s", m.conn.Cid, name, m.serverName) + c.log.Infof("Balanced client %d%s on %s", m.conn.Cid, name, m.serverName) } success++ if i != len(matched)-1 { timer := time.NewTimer(sleep) - c.log.Debug("Sleeping for %v", sleep) + c.log.Debugf("Sleeping for %v", sleep) select { case <-timer.C: case <-ctx.Done(): @@ -211,7 +212,7 @@ func (c *balancer) getConnz(ctx context.Context) ([]*server.ServerAPIConnzRespon break } - c.log.Info("Gathering paged connection information") + c.log.Infof("Gathering paged connection information") } return results, nil @@ -287,7 +288,7 @@ func (c *balancer) reqMany(ctx context.Context, subj string, req any, expect int } } - c.log.Trace(">>> %s: %s", subj, string(jreq)) + c.log.Tracef(">>> %s: %s", subj, string(jreq)) var ( mu sync.Mutex @@ -317,7 +318,7 @@ func (c *balancer) reqMany(ctx context.Context, subj string, req any, expect int mu.Lock() defer mu.Unlock() - c.log.Trace("<<< (%dB) %s", len(m.Data), string(m.Data)) + c.log.Tracef("<<< (%dB) %s", len(m.Data), string(m.Data)) if finisher != nil { finisher.Reset(300 * time.Millisecond) } @@ -363,7 +364,7 @@ func (c *balancer) reqMany(ctx context.Context, subj string, req any, expect int } mu.Lock() - c.log.Debug("=== Received %d responses", ctr) + c.log.Debugf("=== Received %d responses", ctr) mu.Unlock() return res, nil diff --git a/connbalancer/balancer_test.go b/connbalancer/balancer_test.go index ff95ac65..dea9e35d 100644 --- a/connbalancer/balancer_test.go +++ b/connbalancer/balancer_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) @@ -128,7 +129,7 @@ func checkBalanced(t *testing.T, nc *nats.Conn, expect int, s ConnectionSelector t.Helper() // dont kick ourselves or connections on other servers - balancer, err := New(nc, 0, NewDiscardLogger(), s) + balancer, err := New(nc, 0, api.NewDiscardLogger(), s) if err != nil { t.Fatalf("create failed: %v", err) } diff --git a/consumers_monitor.go b/consumers_monitor.go new file mode 100644 index 00000000..76151ca5 --- /dev/null +++ b/consumers_monitor.go @@ -0,0 +1,199 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jsm + +import ( + "strconv" + "time" + + "github.com/nats-io/jsm.go/api" + "github.com/nats-io/jsm.go/monitor" +) + +const ( + ConsumerMonitorMetaOutstandingAckCritical = "io.nats.monitor.outstanding-ack-critical" + ConsumerMonitorMetaWaitingCritical = "io.nats.monitor.waiting-critical" + ConsumerMonitorMetaUnprocessedCritical = "io.nats.monitor.unprocessed-critical" + ConsumerMonitorMetaLastDeliveredCritical = "io.nats.monitor.last-delivery-critical" + ConsumerMonitorMetaLastAckCritical = "io.nats.monitor.last-ack-critical" + ConsumerMonitorMetaRedeliveryCritical = "io.nats.monitor.redelivery-critical" +) + +type ConsumerHealthCheck func(*Consumer, *monitor.Result, ConsumerHealthCheckOptions, api.Logger) + +type ConsumerHealthCheckOptions struct { + Enabled bool + AckOutstandingCritical int + WaitingCritical int + UnprocessedCritical int + LastDeliveryCritical time.Duration + LastAckCritical time.Duration + RedeliveryCritical int + HealthChecks []ConsumerHealthCheck +} + +func (c *Consumer) MonitorOptions(extraChecks ...ConsumerHealthCheck) (*ConsumerHealthCheckOptions, error) { + opts := &ConsumerHealthCheckOptions{ + HealthChecks: extraChecks, + } + + var err error + parser := []monitorMetaParser{ + {MonitorEnabled, func(v string) error { + opts.Enabled, err = strconv.ParseBool(v) + return err + }}, + {ConsumerMonitorMetaOutstandingAckCritical, func(v string) error { + opts.AckOutstandingCritical, err = strconv.Atoi(v) + return err + }}, + {ConsumerMonitorMetaWaitingCritical, func(v string) error { + opts.WaitingCritical, err = strconv.Atoi(v) + return err + }}, + {ConsumerMonitorMetaUnprocessedCritical, func(v string) error { + opts.UnprocessedCritical, err = strconv.Atoi(v) + return err + }}, + {ConsumerMonitorMetaLastDeliveredCritical, func(v string) error { + opts.LastDeliveryCritical, err = parseDuration(v) + return err + }}, + {ConsumerMonitorMetaLastAckCritical, func(v string) error { + opts.LastAckCritical, err = parseDuration(v) + return err + }}, + {ConsumerMonitorMetaRedeliveryCritical, func(v string) error { + opts.RedeliveryCritical, err = strconv.Atoi(v) + return err + }}, + } + + metadata := c.Metadata() + + for _, m := range parser { + if v, ok := metadata[m.k]; ok { + err = m.fn(v) + if err != nil { + return nil, err + } + } + } + + return opts, nil +} + +func (c *Consumer) HealthCheck(opts ConsumerHealthCheckOptions, check *monitor.Result, log api.Logger) (*monitor.Result, error) { + if check == nil { + check = &monitor.Result{ + Check: "consumer_status", + Name: c.Name(), + } + } + + // make sure latest info cache is set as checks accesses it directly + nfo, err := c.LatestState() + if err != nil { + return nil, err + } + + c.checkOutstandingAck(&nfo, check, opts, log) + c.checkWaiting(&nfo, check, opts, log) + c.checkUnprocessed(&nfo, check, opts, log) + c.checkRedelivery(&nfo, check, opts, log) + c.checkLastDelivery(&nfo, check, opts, log) + c.checkLastAck(&nfo, check, opts, log) + + for _, hc := range opts.HealthChecks { + hc(c, check, opts, log) + } + + return check, nil +} + +func (c *Consumer) checkLastAck(nfo *api.ConsumerInfo, check *monitor.Result, opts ConsumerHealthCheckOptions, log api.Logger) { + switch { + case opts.LastAckCritical <= 0: + case nfo.AckFloor.Last == nil: + log.Debugf("CRITICAL: No acks") + check.Critical("No acks") + case time.Since(*nfo.AckFloor.Last) >= opts.LastAckCritical: + log.Debugf("CRITICAL: Last ack %v ago", time.Since(*nfo.AckFloor.Last)) + check.Critical("Last ack %v ago", time.Since(*nfo.AckFloor.Last)) + default: + check.Ok("Last ack %v", nfo.AckFloor.Last) + } +} + +func (c *Consumer) checkLastDelivery(nfo *api.ConsumerInfo, check *monitor.Result, opts ConsumerHealthCheckOptions, log api.Logger) { + switch { + case opts.LastDeliveryCritical <= 0: + case nfo.Delivered.Last == nil: + log.Debugf("CRITICAL: No deliveries") + check.Critical("No deliveries") + case time.Since(*nfo.Delivered.Last) >= opts.LastDeliveryCritical: + log.Debugf("CRITICAL: Last delivery %v", nfo.Delivered.Last.Format(time.DateTime)) + check.Critical("Last delivery %s ago", time.Since(*nfo.Delivered.Last)) + default: + check.Ok("Last delivery %v", nfo.Delivered.Last) + } +} + +func (c *Consumer) checkRedelivery(nfo *api.ConsumerInfo, check *monitor.Result, opts ConsumerHealthCheckOptions, log api.Logger) { + switch { + case opts.RedeliveryCritical <= 0: + return + case nfo.NumRedelivered >= opts.RedeliveryCritical: + log.Debugf("CRITICAL Redelivered: %v", nfo.NumRedelivered) + check.Critical("Redelivered: %v", nfo.NumRedelivered) + default: + check.Ok("Redelivered: %v", nfo.NumRedelivered) + } +} + +func (c *Consumer) checkUnprocessed(nfo *api.ConsumerInfo, check *monitor.Result, opts ConsumerHealthCheckOptions, log api.Logger) { + switch { + case opts.UnprocessedCritical <= 0: + return + case nfo.NumPending >= uint64(opts.UnprocessedCritical): + log.Debugf("CRITICAL Unprocessed Messages: %v", nfo.NumAckPending) + check.Critical("Unprocessed Messages: %v", nfo.NumAckPending) + default: + check.Ok("Unprocessed Messages: %v", nfo.NumAckPending) + } +} + +func (c *Consumer) checkWaiting(nfo *api.ConsumerInfo, check *monitor.Result, opts ConsumerHealthCheckOptions, log api.Logger) { + switch { + case opts.WaitingCritical <= 0: + return + case nfo.NumWaiting >= opts.WaitingCritical: + log.Debugf("CRITICAL Waiting Pulls: %v", nfo.NumWaiting) + check.Critical("Waiting Pulls: %v", nfo.NumWaiting) + default: + check.Ok("Waiting Pulls: %v", nfo.NumWaiting) + } +} + +func (c *Consumer) checkOutstandingAck(nfo *api.ConsumerInfo, check *monitor.Result, opts ConsumerHealthCheckOptions, log api.Logger) { + switch { + case opts.AckOutstandingCritical <= 0: + return + case nfo.NumAckPending >= opts.AckOutstandingCritical: + log.Debugf("CRITICAL Ack Pending: %v", nfo.NumAckPending) + check.Critical("Ack Pending: %v", nfo.NumAckPending) + default: + check.Ok("Ack Pending: %v", nfo.NumAckPending) + } +} diff --git a/consumers_monitor_test.go b/consumers_monitor_test.go new file mode 100644 index 00000000..a8753c90 --- /dev/null +++ b/consumers_monitor_test.go @@ -0,0 +1,246 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jsm + +import ( + "testing" + "time" + + "github.com/nats-io/jsm.go/api" + "github.com/nats-io/jsm.go/monitor" +) + +func TestConsumer_checkLastAck(t *testing.T) { + setup := func() (*Consumer, *monitor.Result, *api.ConsumerInfo) { + return &Consumer{}, &monitor.Result{}, &api.ConsumerInfo{} + } + + t.Run("Should skip without a threshold", func(t *testing.T) { + c, check, ci := setup() + c.checkLastAck(ci, check, ConsumerHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle no ack floor", func(t *testing.T) { + c, check, ci := setup() + c.checkLastAck(ci, check, ConsumerHealthCheckOptions{ + LastAckCritical: time.Second, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "No acks") + }) + + t.Run("Should handle time greater than or equal", func(t *testing.T) { + c, check, ci := setup() + last := time.Now().Add(-time.Hour) + ci.AckFloor.Last = &last + c.checkLastAck(ci, check, ConsumerHealthCheckOptions{ + LastAckCritical: time.Second, + }, api.NewDiscardLogger()) + requireLen(t, check.Criticals, 1) + requireRegexElement(t, check.Criticals, "Last ack .+ ago") + }) + + t.Run("Should be ok otherwise", func(t *testing.T) { + c, check, ci := setup() + last := time.Now() + ci.AckFloor.Last = &last + c.checkLastAck(ci, check, ConsumerHealthCheckOptions{ + LastAckCritical: time.Second, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireLen(t, check.OKs, 1) + }) +} + +func TestConsumer_checkLastDelivery(t *testing.T) { + setup := func() (*Consumer, *monitor.Result, *api.ConsumerInfo) { + return &Consumer{}, &monitor.Result{}, &api.ConsumerInfo{} + } + + t.Run("Should skip without a threshold", func(t *testing.T) { + c, check, ci := setup() + c.checkLastDelivery(ci, check, ConsumerHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle no delivery", func(t *testing.T) { + c, check, ci := setup() + c.checkLastDelivery(ci, check, ConsumerHealthCheckOptions{ + LastDeliveryCritical: time.Second, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "No deliveries") + }) + + t.Run("Should handle time greater than or equal", func(t *testing.T) { + c, check, ci := setup() + last := time.Now().Add(-time.Hour) + ci.Delivered.Last = &last + + c.checkLastDelivery(ci, check, ConsumerHealthCheckOptions{ + LastDeliveryCritical: time.Second, + }, api.NewDiscardLogger()) + + requireLen(t, check.Criticals, 1) + requireRegexElement(t, check.Criticals, "Last delivery .+ ago") + }) + + t.Run("Should handle time greater than or equal", func(t *testing.T) { + c, check, ci := setup() + last := time.Now().Add(-time.Hour) + ci.Delivered.Last = &last + + c.checkLastDelivery(ci, check, ConsumerHealthCheckOptions{ + LastDeliveryCritical: time.Second, + }, api.NewDiscardLogger()) + + requireLen(t, check.Criticals, 1) + requireRegexElement(t, check.Criticals, "Last delivery .+ ago") + }) + + t.Run("Should be ok otherwise", func(t *testing.T) { + c, check, ci := setup() + last := time.Now() + ci.Delivered.Last = &last + + c.checkLastDelivery(ci, check, ConsumerHealthCheckOptions{ + LastDeliveryCritical: time.Second, + }, api.NewDiscardLogger()) + + requireEmpty(t, check.Criticals) + requireLen(t, check.OKs, 1) + }) +} + +func TestConsumer_checkUnprocessed(t *testing.T) { + setup := func() (*Consumer, *monitor.Result, *api.ConsumerInfo) { + return &Consumer{}, &monitor.Result{}, &api.ConsumerInfo{} + } + + t.Run("Should skip without a threshold", func(t *testing.T) { + c, check, ci := setup() + c.checkRedelivery(ci, check, ConsumerHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle redelivery above threshold", func(t *testing.T) { + c, check, ci := setup() + ci.NumRedelivered = 10 + c.checkRedelivery(ci, check, ConsumerHealthCheckOptions{ + RedeliveryCritical: 1, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Redelivered: 10") + + check = &monitor.Result{} + c.checkRedelivery(ci, check, ConsumerHealthCheckOptions{ + RedeliveryCritical: 10, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Redelivered: 10") + }) + + t.Run("Should handle below threshold", func(t *testing.T) { + c, check, ci := setup() + ci.NumRedelivered = 10 + c.checkRedelivery(ci, check, ConsumerHealthCheckOptions{ + RedeliveryCritical: 11, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireElement(t, check.OKs, "Redelivered: 10") + }) +} + +func TestConsumer_checkWaiting(t *testing.T) { + setup := func() (*Consumer, *monitor.Result, *api.ConsumerInfo) { + return &Consumer{}, &monitor.Result{}, &api.ConsumerInfo{} + } + + t.Run("Should skip without a threshold", func(t *testing.T) { + c, check, ci := setup() + c.checkWaiting(ci, check, ConsumerHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle redelivery above threshold", func(t *testing.T) { + c, check, ci := setup() + ci.NumWaiting = 10 + c.checkWaiting(ci, check, ConsumerHealthCheckOptions{ + WaitingCritical: 1, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Waiting Pulls: 10") + + c, check, ci = setup() + ci.NumWaiting = 10 + c.checkWaiting(ci, check, ConsumerHealthCheckOptions{ + WaitingCritical: 10, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Waiting Pulls: 10") + }) + + t.Run("Should handle below threshold", func(t *testing.T) { + c, check, ci := setup() + ci.NumWaiting = 10 + c.checkWaiting(ci, check, ConsumerHealthCheckOptions{ + WaitingCritical: 11, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireElement(t, check.OKs, "Waiting Pulls: 10") + }) +} + +func TestConsumer_checkOutstandingAck(t *testing.T) { + setup := func() (*Consumer, *monitor.Result, *api.ConsumerInfo) { + return &Consumer{}, &monitor.Result{}, &api.ConsumerInfo{} + } + + t.Run("Should skip without a threshold", func(t *testing.T) { + c, check, ci := setup() + c.checkOutstandingAck(ci, check, ConsumerHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle outstanding above threshold", func(t *testing.T) { + c, check, ci := setup() + ci.NumAckPending = 10 + c.checkOutstandingAck(ci, check, ConsumerHealthCheckOptions{ + AckOutstandingCritical: 1, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Ack Pending: 10") + + c, check, ci = setup() + ci.NumAckPending = 10 + c.checkOutstandingAck(ci, check, ConsumerHealthCheckOptions{ + AckOutstandingCritical: 10, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Ack Pending: 10") + }) + + t.Run("Should handle below threshold", func(t *testing.T) { + c, check, ci := setup() + ci.NumAckPending = 10 + c.checkOutstandingAck(ci, check, ConsumerHealthCheckOptions{ + AckOutstandingCritical: 11, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireElement(t, check.OKs, "Ack Pending: 10") + }) +} diff --git a/go.mod b/go.mod index 8941ce7b..0d4c0374 100644 --- a/go.mod +++ b/go.mod @@ -6,24 +6,36 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/expr-lang/expr v1.16.9 github.com/google/go-cmp v0.6.0 + github.com/jedib0t/go-pretty/v6 v6.5.9 github.com/klauspost/compress v1.17.9 github.com/nats-io/nats-server/v2 v2.11.0-preview.2 github.com/nats-io/nats.go v1.36.0 + github.com/nats-io/natscli v0.1.4 github.com/nats-io/nuid v1.0.1 + github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/common v0.54.0 golang.org/x/net v0.26.0 golang.org/x/text v0.16.0 gopkg.in/yaml.v3 v3.0.1 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/google/go-tpm v0.9.0 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.5.7 // indirect github.com/nats-io/nkeys v0.4.7 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/sys v0.21.0 // indirect + golang.org/x/term v0.21.0 // indirect golang.org/x/time v0.5.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index 04cc6f81..a4328875 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,10 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= @@ -7,15 +13,18 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-tpm v0.9.0 h1:sQF6YqWMi+SCXpsmS3fd21oPy/vSddwZry4JnmltHVk= github.com/google/go-tpm v0.9.0/go.mod h1:FkNVkc6C+IsvDI9Jw1OveJmxGZUUaKxtrpOS47QWKfU= +github.com/jedib0t/go-pretty/v6 v6.5.9 h1:ACteMBRrrmm1gMsXe9PSTOClQ63IXDUt03H5U+UV8OU= +github.com/jedib0t/go-pretty/v6 v6.5.9/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= @@ -24,25 +33,45 @@ github.com/nats-io/nats-server/v2 v2.11.0-preview.2 h1:tT/UeBbFzHRzwy77T/+/Rbw58 github.com/nats-io/nats-server/v2 v2.11.0-preview.2/go.mod h1:ILDVzrTqMco4rQMOgEZimBjJHb1oZDlz1J+qhJtZlRM= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/natscli v0.1.4 h1:LjbJT71u4VOzA+pbmuPj4ZTyPxdPfm0z/2oEHDpe0vo= +github.com/nats-io/natscli v0.1.4/go.mod h1:GSa+epC8k1QS3OqbCQCDDO/5to6hyhDEb8NHeTASPhE= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= +github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/jsm.go b/jsm.go index 27959941..6d553214 100644 --- a/jsm.go +++ b/jsm.go @@ -19,6 +19,8 @@ package jsm import ( "encoding/json" "fmt" + "regexp" + "strconv" "strings" "time" @@ -215,3 +217,103 @@ func LinearBackoffPeriods(steps uint, min time.Duration, max time.Duration) ([]t return res, nil } + +var ( + durationMatcher = regexp.MustCompile(`([-+]?)(([\d\.]+)([a-zA-Z]+))`) + errInvalidDuration = fmt.Errorf("invalid duration") +) + +// parseDuration parse durations with additional units over those from +// standard go parser. +// +// In addition to normal go parser time units it also supports +// these. +// +// The reason these are not in go standard lib is due to precision around +// how many days in a month and about leap years and leap seconds. This +// function does nothing to try and correct for those. +// +// * "w", "W" - a week based on 7 days of exactly 24 hours +// * "d", "D" - a day based on 24 hours +// * "M" - a month made of 30 days of 24 hours +// * "y", "Y" - a year made of 365 days of 24 hours each +// +// Valid duration strings can be -1y1d1µs +func parseDuration(d string) (time.Duration, error) { + // golang time.ParseDuration has a special case for 0 + if d == "0" { + return 0 * time.Second, nil + } + + var ( + r time.Duration + neg = 1 + ) + + d = strings.TrimSpace(d) + + if len(d) == 0 { + return r, errInvalidDuration + } + + parts := durationMatcher.FindAllStringSubmatch(d, -1) + if len(parts) == 0 { + return r, errInvalidDuration + } + + for i, p := range parts { + if len(p) != 5 { + return 0, errInvalidDuration + } + + if i == 0 && p[1] == "-" { + neg = -1 + } + + switch p[4] { + case "w", "W": + val, err := strconv.ParseFloat(p[3], 32) + if err != nil { + return 0, fmt.Errorf("%w: %v", errInvalidDuration, err) + } + + r += time.Duration(val*7*24) * time.Hour + + case "d", "D": + val, err := strconv.ParseFloat(p[3], 32) + if err != nil { + return 0, fmt.Errorf("%w: %v", errInvalidDuration, err) + } + + r += time.Duration(val*24) * time.Hour + + case "M": + val, err := strconv.ParseFloat(p[3], 32) + if err != nil { + return 0, fmt.Errorf("%w: %v", errInvalidDuration, err) + } + + r += time.Duration(val*24*30) * time.Hour + + case "Y", "y": + val, err := strconv.ParseFloat(p[3], 32) + if err != nil { + return 0, fmt.Errorf("%w: %v", errInvalidDuration, err) + } + + r += time.Duration(val*24*365) * time.Hour + + case "ns", "us", "µs", "ms", "s", "m", "h": + dur, err := time.ParseDuration(p[2]) + if err != nil { + return 0, fmt.Errorf("%w: %v", errInvalidDuration, err) + } + + r += dur + default: + return 0, fmt.Errorf("%w: invalid unit %v", errInvalidDuration, p[4]) + } + } + + return time.Duration(neg) * r, nil +} diff --git a/monitor/monitor.go b/monitor/monitor.go new file mode 100644 index 00000000..9aeacbf0 --- /dev/null +++ b/monitor/monitor.go @@ -0,0 +1,41 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor + +import ( + "github.com/jedib0t/go-pretty/v6/table" + "github.com/jedib0t/go-pretty/v6/text" +) + +type Status string + +var ( + OKStatus Status = "OK" + WarningStatus Status = "WARNING" + CriticalStatus Status = "CRITICAL" + UnknownStatus Status = "UNKNOWN" +) + +func newTableWriter(title string) table.Writer { + tbl := table.NewWriter() + tbl.SetStyle(table.StyleRounded) + tbl.Style().Title.Align = text.AlignCenter + tbl.Style().Format.Header = text.FormatDefault + + if title != "" { + tbl.SetTitle(title) + } + + return tbl +} diff --git a/monitor/perfdata.go b/monitor/perfdata.go new file mode 100644 index 00000000..a34ec326 --- /dev/null +++ b/monitor/perfdata.go @@ -0,0 +1,65 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor + +import ( + "fmt" + "strings" +) + +type PerfDataItem struct { + Help string `json:"-"` + Name string `json:"name"` + Value float64 `json:"value"` + Warn float64 `json:"warning"` + Crit float64 `json:"critical"` + Unit string `json:"unit,omitempty"` +} + +type PerfData []*PerfDataItem + +func (p PerfData) String() string { + var res []string + for _, i := range p { + res = append(res, i.String()) + } + + return strings.TrimSpace(strings.Join(res, " ")) +} + +func (i *PerfDataItem) String() string { + valueFmt := "%0.0f" + if i.Unit == "s" { + valueFmt = "%0.4f" + } + + pd := fmt.Sprintf("%s="+valueFmt, i.Name, i.Value) + if i.Unit != "" { + pd = pd + i.Unit + } + + if i.Warn > 0 || i.Crit > 0 { + if i.Warn != 0 { + pd = fmt.Sprintf("%s;"+valueFmt, pd, i.Warn) + } else if i.Crit > 0 { + pd = fmt.Sprintf("%s;", pd) + } + + if i.Crit != 0 { + pd = fmt.Sprintf("%s;"+valueFmt, pd, i.Crit) + } + } + + return pd +} diff --git a/monitor/result.go b/monitor/result.go new file mode 100644 index 00000000..8c45fcbe --- /dev/null +++ b/monitor/result.go @@ -0,0 +1,305 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/nats-io/natscli/columns" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" +) + +type RenderFormat int + +const ( + NagiosFormat RenderFormat = iota + PrometheusFormat + TextFormat + JSONFormat +) + +type Result struct { + Output string `json:"output,omitempty"` + Status Status `json:"status"` + Check string `json:"check_suite"` + Name string `json:"check_name"` + Warnings []string `json:"warning,omitempty"` + Criticals []string `json:"critical,omitempty"` + OKs []string `json:"ok,omitempty"` + PerfData PerfData `json:"perf_data"` + RenderFormat RenderFormat `json:"-"` + NameSpace string `json:"-"` + OutFile string `json:"-"` +} + +func (r *Result) Pd(pd ...*PerfDataItem) { + r.PerfData = append(r.PerfData, pd...) +} + +func (r *Result) CriticalExit(format string, a ...any) { + r.Critical(format, a...) + r.GenericExit() +} + +func (r *Result) Critical(format string, a ...any) { + r.Criticals = append(r.Criticals, fmt.Sprintf(format, a...)) +} + +func (r *Result) Warn(format string, a ...any) { + r.Warnings = append(r.Warnings, fmt.Sprintf(format, a...)) +} + +func (r *Result) Ok(format string, a ...any) { + r.OKs = append(r.OKs, fmt.Sprintf(format, a...)) +} + +func (r *Result) CriticalIfErr(err error, format string, a ...any) bool { + if err == nil { + return false + } + + r.CriticalExit(format, a...) + + return true +} + +func (r *Result) nagiosCode() int { + switch r.Status { + case OKStatus: + return 0 + case WarningStatus: + return 1 + case CriticalStatus: + return 2 + default: + return 3 + } +} + +func (r *Result) exitCode() int { + if r.RenderFormat == PrometheusFormat { + return 0 + } + + return r.nagiosCode() +} + +func (r *Result) Exit() { + os.Exit(r.exitCode()) +} + +func (r *Result) renderHuman() string { + buf := bytes.NewBuffer([]byte{}) + + fmt.Fprintf(buf, "%s: %s\n\n", r.Name, r.Status) + + tblWriter := newTableWriter("") + tblWriter.AppendHeader(table.Row{"Status", "Message"}) + lines := 0 + for _, ok := range r.OKs { + tblWriter.AppendRow(table.Row{"OK", ok}) + lines++ + } + for _, warn := range r.Warnings { + tblWriter.AppendRow(table.Row{"Warning", warn}) + lines++ + } + for _, crit := range r.Criticals { + tblWriter.AppendRow(table.Row{"Critical", crit}) + lines++ + } + + if lines > 0 { + fmt.Fprintln(buf, "Status Detail") + fmt.Fprintln(buf) + fmt.Fprint(buf, tblWriter.Render()) + fmt.Fprintln(buf) + } + + tblWriter = newTableWriter("") + tblWriter.AppendHeader(table.Row{"Metric", "Value", "Unit", "Critical Threshold", "Warning Threshold", "Description"}) + lines = 0 + for _, pd := range r.PerfData { + tblWriter.AppendRow(table.Row{pd.Name, f(pd.Value), pd.Unit, f(pd.Crit), f(pd.Warn), pd.Help}) + lines++ + } + if lines > 0 { + fmt.Fprintln(buf) + fmt.Fprintln(buf, "Check Metrics") + fmt.Fprintln(buf) + fmt.Fprint(buf, tblWriter.Render()) + fmt.Fprintln(buf) + } + + return buf.String() +} + +func (r *Result) renderPrometheus() string { + if r.Check == "" { + r.Check = r.Name + } + + registry := prometheus.NewRegistry() + prometheus.DefaultRegisterer = registry + prometheus.DefaultGatherer = registry + + sname := strings.ReplaceAll(r.Name, `"`, `.`) + for _, pd := range r.PerfData { + help := fmt.Sprintf("Data about the NATS CLI check %s", r.Check) + if pd.Help != "" { + help = pd.Help + } + + gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(r.NameSpace, r.Check, pd.Name), + Help: help, + }, []string{"item"}) + prometheus.MustRegister(gauge) + gauge.WithLabelValues(sname).Set(pd.Value) + } + + status := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(r.NameSpace, r.Check, "status_code"), + Help: fmt.Sprintf("Nagios compatible status code for %s", r.Check), + }, []string{"item", "status"}) + prometheus.MustRegister(status) + + status.WithLabelValues(sname, string(r.Status)).Set(float64(r.nagiosCode())) + + var buf bytes.Buffer + + mfs, err := prometheus.DefaultGatherer.Gather() + if err != nil { + panic(err) + } + + for _, mf := range mfs { + _, err = expfmt.MetricFamilyToText(&buf, mf) + if err != nil { + panic(err) + } + } + + return buf.String() +} + +func (r *Result) renderJSON() string { + res, _ := json.MarshalIndent(r, "", " ") + return string(res) +} + +func (r *Result) renderNagios() string { + res := []string{r.Name} + for _, c := range r.Criticals { + res = append(res, fmt.Sprintf("Crit:%s", c)) + } + + for _, w := range r.Warnings { + res = append(res, fmt.Sprintf("Warn:%s", w)) + } + + if r.Output != "" { + res = append(res, r.Output) + } else if len(r.OKs) > 0 { + for _, ok := range r.OKs { + res = append(res, fmt.Sprintf("OK:%s", ok)) + } + } + + if len(r.PerfData) == 0 { + return fmt.Sprintf("%s %s", r.Status, strings.Join(res, " ")) + } + + return fmt.Sprintf("%s %s | %s", r.Status, strings.Join(res, " "), r.PerfData) +} + +func (r *Result) String() string { + if r.Status == "" { + r.Status = UnknownStatus + } + if r.PerfData == nil { + r.PerfData = PerfData{} + } + + switch { + case len(r.Criticals) > 0: + r.Status = CriticalStatus + case len(r.Warnings) > 0: + r.Status = WarningStatus + default: + r.Status = OKStatus + } + + switch r.RenderFormat { + case JSONFormat: + return r.renderJSON() + case PrometheusFormat: + return r.renderPrometheus() + case TextFormat: + return r.renderHuman() + default: + return r.renderNagios() + } +} + +func (r *Result) GenericExit() { + if r.OutFile != "" { + f, err := os.CreateTemp(filepath.Dir(r.OutFile), "") + if err != nil { + fmt.Fprintf(os.Stderr, "temp file failed: %s", err) + os.Exit(1) + } + defer os.Remove(f.Name()) + + _, err = fmt.Fprintln(f, r.String()) + if err != nil { + fmt.Fprintf(os.Stderr, "temp file write failed: %s", err) + os.Exit(1) + } + + err = f.Close() + if err != nil { + fmt.Fprintf(os.Stderr, "temp file write failed: %s", err) + os.Exit(1) + } + + err = os.Chmod(f.Name(), 0600) + if err != nil { + fmt.Fprintf(os.Stderr, "temp file mode change failed: %s", err) + os.Exit(1) + } + + err = os.Rename(f.Name(), r.OutFile) + if err != nil { + fmt.Fprintf(os.Stderr, "temp file rename failed: %s", err) + } + + os.Exit(1) + } + + fmt.Println(r.String()) + + r.Exit() +} + +func f(v any) string { + return columns.F(v) +} diff --git a/natscontext/testdata/nats/context/user_pass_token_creds.json b/natscontext/testdata/nats/context/user_pass_token_creds.json index a7e3734c..438f5e4e 100644 --- a/natscontext/testdata/nats/context/user_pass_token_creds.json +++ b/natscontext/testdata/nats/context/user_pass_token_creds.json @@ -20,5 +20,6 @@ "tls_first": false, "windows_cert_store": "", "windows_cert_match_by": "", - "windows_cert_match": "" + "windows_cert_match": "", + "windows_ca_certs_match": null } \ No newline at end of file diff --git a/streams.go b/streams.go index 246e1953..a1974096 100644 --- a/streams.go +++ b/streams.go @@ -610,9 +610,7 @@ func (s *Stream) EachConsumer(cb func(consumer *Consumer)) (missing []string, er // LatestInformation returns the most recently fetched stream information func (s *Stream) LatestInformation() (info *api.StreamInfo, err error) { - s.Lock() - nfo := s.lastInfo - s.Unlock() + nfo := s.lastInfoLocked() if nfo != nil { return nfo, nil @@ -621,6 +619,13 @@ func (s *Stream) LatestInformation() (info *api.StreamInfo, err error) { return s.Information() } +func (s *Stream) lastInfoLocked() *api.StreamInfo { + s.Lock() + defer s.Unlock() + + return s.lastInfo +} + // Information loads the current stream information func (s *Stream) Information(req ...api.JSApiStreamInfoRequest) (info *api.StreamInfo, err error) { if len(req) > 1 { diff --git a/streams_monitor.go b/streams_monitor.go new file mode 100644 index 00000000..80a8df6a --- /dev/null +++ b/streams_monitor.go @@ -0,0 +1,368 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jsm + +import ( + "strconv" + "time" + + "github.com/nats-io/jsm.go/api" + "github.com/nats-io/jsm.go/monitor" +) + +const ( + MonitorEnabled = "io.nats.monitor.enabled" + StreamMonitorMetaLagCritical = "io.nats.monitor.lag-critical" + StreamMonitorMetaSeenCritical = "io.nats.monitor.seen-critical" + StreamMonitorMetaMinSources = "io.nats.monitor.min-sources" + StreamMonitorMetaMaxSources = "io.nats.monitor.max-sources" + StreamMonitorMetaPeerExpect = "io.nats.monitor.peer-expect" + StreamMonitorMetaPeerLagCritical = "io.nats.monitor.peer-lag-critical" + StreamMonitorMetaPeerSeenCritical = "io.nats.monitor.peer-seen-critical" + StreamMonitorMetaMessagesWarn = "io.nats.monitor.msgs-warn" + StreamMonitorMetaMessagesCritical = "io.nats.monitor.msgs-critical" + StreamMonitorMetaSubjectsWarn = "io.nats.monitor.subjects-warn" + StreamMonitorMetaSubjectsCritical = "io.nats.monitor.subjects-critical" +) + +type StreamHealthCheck func(*Stream, *monitor.Result, StreamHealthCheckOptions, api.Logger) + +type StreamHealthCheckOptions struct { + Enabled bool + SourcesLagCritical uint64 + SourcesSeenCritical time.Duration + MinSources int + MaxSources int + ClusterExpectedPeers int + ClusterLagCritical uint64 + ClusterSeenCritical time.Duration + MessagesWarn uint64 + MessagesCrit uint64 + SubjectsWarn int + SubjectsCrit int + HealthChecks []StreamHealthCheck +} + +type monitorMetaParser struct { + k string + fn func(string) error +} + +func (s *Stream) HealthCheckOptions(extraChecks ...StreamHealthCheck) (*StreamHealthCheckOptions, error) { + opts := &StreamHealthCheckOptions{ + HealthChecks: extraChecks, + } + + var err error + parser := []monitorMetaParser{ + {MonitorEnabled, func(v string) error { + opts.Enabled, err = strconv.ParseBool(v) + return err + }}, + {StreamMonitorMetaLagCritical, func(v string) error { + opts.SourcesLagCritical, err = strconv.ParseUint(v, 10, 64) + return err + }}, + {StreamMonitorMetaSeenCritical, func(v string) error { + opts.SourcesSeenCritical, err = parseDuration(v) + return err + }}, + {StreamMonitorMetaMinSources, func(v string) error { + opts.MinSources, err = strconv.Atoi(v) + return err + }}, + {StreamMonitorMetaMaxSources, func(v string) error { + opts.MaxSources, err = strconv.Atoi(v) + return err + }}, + {StreamMonitorMetaPeerExpect, func(v string) error { + opts.ClusterExpectedPeers, err = strconv.Atoi(v) + return err + }}, + {StreamMonitorMetaPeerLagCritical, func(v string) error { + opts.ClusterLagCritical, err = strconv.ParseUint(v, 10, 64) + return err + }}, + {StreamMonitorMetaPeerSeenCritical, func(v string) error { + opts.ClusterSeenCritical, err = parseDuration(v) + return err + }}, + {StreamMonitorMetaMessagesWarn, func(v string) error { + opts.MessagesWarn, err = strconv.ParseUint(v, 10, 64) + return err + }}, + {StreamMonitorMetaMessagesCritical, func(v string) error { + opts.MessagesCrit, err = strconv.ParseUint(v, 10, 64) + return err + }}, + {StreamMonitorMetaSubjectsWarn, func(v string) error { + opts.SubjectsWarn, err = strconv.Atoi(v) + return err + }}, + {StreamMonitorMetaSubjectsCritical, func(v string) error { + opts.SubjectsCrit, err = strconv.Atoi(v) + return err + }}, + } + + metadata := s.Metadata() + + for _, m := range parser { + if v, ok := metadata[m.k]; ok { + err = m.fn(v) + if err != nil { + return nil, err + } + } + } + + return opts, nil +} + +func (s *Stream) HealthCheck(opts StreamHealthCheckOptions, check *monitor.Result, log api.Logger) (*monitor.Result, error) { + if check == nil { + check = &monitor.Result{ + Check: "stream_status", + Name: s.Name(), + } + } + + // make sure latest info cache is set as checks accesses it directly + nfo, err := s.LatestInformation() + if err != nil { + return nil, err + } + + s.checkCluster(nfo, check, opts, log) + s.checkMessages(nfo, check, opts, log) + s.checkSubjects(nfo, check, opts, log) + s.checkSources(nfo, check, opts, log) + s.checkMirror(nfo, check, opts, log) + + for _, hc := range opts.HealthChecks { + hc(s, check, opts, log) + } + + return check, nil +} + +func (s *Stream) checkMirror(si *api.StreamInfo, check *monitor.Result, opts StreamHealthCheckOptions, log api.Logger) { + if (opts.SourcesLagCritical <= 0 && opts.SourcesSeenCritical <= 0) || si.Config.Name == "" || si.Config.Mirror == nil { + return + } + + if si.Config.Name == "" { + log.Debugf("CRITICAL: no configuration present") + check.Critical("no configuration present") + return + } + + mirror := si.Config.Mirror + state := si.Mirror + + if mirror == nil { + log.Debugf("CRITICAL: not mirrored") + check.Critical("not mirrored") + return + } + + if state == nil { + log.Debugf("CRITICAL: invalid state") + check.Critical("invalid state") + return + } + + ok := true + + if opts.SourcesLagCritical > 0 && state.Lag >= opts.SourcesLagCritical { + log.Debugf("CRITICAL: Mirror lag %d", state.Lag) + check.Critical("Mirror Lag %d", state.Lag) + ok = false + } + + if opts.SourcesSeenCritical > 0 && state.Active >= opts.SourcesSeenCritical { + log.Debugf("CRITICAL: Mirror Seen > %v", state.Active) + check.Critical("Mirror Seen %v", state.Active) + ok = false + } + + if ok { + check.Ok("Mirror %s", mirror.Name) + } +} + +func (s *Stream) checkSources(si *api.StreamInfo, check *monitor.Result, opts StreamHealthCheckOptions, log api.Logger) { + sources := si.Sources + count := len(sources) + + switch { + case opts.MinSources > 0 && count < opts.MinSources: + //log.Debugf("CRITICAL: %d/%d sources", count, opts.MinSources) + check.Critical("%d sources", count) + case opts.MaxSources > 0 && count > opts.MaxSources: + //log.Debugf("CRITICAL: %d/%d sources", count, opts.MaxSources) + check.Critical("%d sources", count) + default: + check.Ok("%d sources", count) + } + + if opts.SourcesLagCritical <= 0 && opts.SourcesSeenCritical <= 0 { + return + } + + lagged := 0 + inactive := 0 + + for _, s := range sources { + if opts.SourcesLagCritical > 0 && s.Lag >= opts.SourcesLagCritical { + lagged++ + } + + if opts.SourcesSeenCritical > 0 && s.Active >= opts.SourcesSeenCritical { + inactive++ + } + } + + if lagged > 0 { + log.Debugf("CRITICAL: %d/%d sources are lagged", lagged, count) + check.Critical("%d sources are lagged", lagged) + } else { + check.Ok("%d sources current", count) + } + if inactive > 0 { + log.Debugf("CRITICAL: %d/%d sources are inactive", inactive, count) + check.Critical("%d sources are inactive", inactive) + } else { + check.Ok("%d sources active", count) + } +} + +func (s *Stream) checkSubjects(si *api.StreamInfo, check *monitor.Result, opts StreamHealthCheckOptions, log api.Logger) { + if opts.SubjectsWarn <= 0 && opts.SubjectsCrit <= 0 { + return + } + + ns := si.State.NumSubjects + lt := opts.SubjectsWarn < opts.SubjectsCrit + + switch { + case lt && ns >= opts.SubjectsCrit: + log.Debugf("CRITICAL subjects %d <= %d", ns, opts.SubjectsCrit) + check.Critical("%d subjects", ns) + case lt && ns >= opts.SubjectsWarn: + log.Debugf("WARNING subjects %d >= %d", ns, opts.SubjectsWarn) + check.Warn("%d subjects", ns) + case !lt && ns <= opts.SubjectsCrit: + check.Critical("%d subjects", ns) + log.Debugf("CRITICAL subjects %d <= %d", ns, opts.SubjectsCrit) + case !lt && ns <= opts.SubjectsWarn: + check.Warn("%d subjects", ns) + log.Debugf("WARNING subjects %d >= %d", ns, opts.SubjectsWarn) + default: + check.Ok("%d subjects", ns) + } +} + +// TODO: support inverting logic and also in cli +func (s *Stream) checkMessages(si *api.StreamInfo, check *monitor.Result, opts StreamHealthCheckOptions, log api.Logger) { + if opts.MessagesCrit <= 0 && opts.MessagesWarn <= 0 { + return + } + + if opts.MessagesCrit > 0 && si.State.Msgs <= opts.MessagesCrit { + log.Debugf("CRITICAL: %d messages", si.State.Msgs) + check.Critical("%d messages", si.State.Msgs) + return + } + + if opts.MessagesWarn > 0 && si.State.Msgs <= opts.MessagesWarn { + log.Debugf("WARNING: %d messages expected <= %d", si.State.Msgs, opts.MessagesWarn) + check.Warn("%d messages", si.State.Msgs) + return + } + + check.Ok("%d messages", si.State.Msgs) +} + +func (s *Stream) checkCluster(si *api.StreamInfo, check *monitor.Result, opts StreamHealthCheckOptions, log api.Logger) { + nfo := si.Cluster + if nfo == nil && opts.ClusterExpectedPeers <= 0 { + return + } + + if nfo == nil { + log.Debugf("Stream is not clustered") + check.Critical("Stream is not clustered") + return + } + + hasLeader := nfo.Leader != "" + nPeer := len(nfo.Replicas) + if hasLeader { + nPeer++ + } else { + check.Critical("No leader") + log.Debugf("No leader found") + return + } + + if nPeer != opts.ClusterExpectedPeers { + log.Debugf("Expected %d replicas got %d", opts.ClusterExpectedPeers, nPeer) + check.Critical("Expected %d replicas got %d", opts.ClusterExpectedPeers, nPeer) + return + } else { + check.Ok("%d peers", nPeer) + } + + inactive := 0 + lagged := 0 + offline := 0 + + for _, p := range nfo.Replicas { + if opts.ClusterLagCritical > 0 && p.Lag > opts.ClusterLagCritical { + lagged++ + } + + if opts.ClusterSeenCritical > 0 && p.Active > opts.ClusterSeenCritical { + inactive++ + } + + if p.Offline { + offline++ + } + } + + if offline > 0 { + log.Debugf("CRITICAL: %d replicas are offline", offline) + check.Critical("%d replicas offline", offline) + } + + switch { + case opts.ClusterLagCritical <= 0: + case lagged > 0: + //log.Debugf("CRITICAL: %d replicas are lagged", lagged) + check.Critical("%d replicas lagged", lagged) + default: + check.Ok("replicas are current") + } + + switch { + case opts.ClusterSeenCritical <= 0: + case inactive > 0: + //log.Debugf("CRITICAL: %d replicas are inactive", inactive) + check.Critical("%d replicas inactive", inactive) + default: + check.Ok("replicas are active") + } +} diff --git a/streams_monitor_test.go b/streams_monitor_test.go new file mode 100644 index 00000000..968d705f --- /dev/null +++ b/streams_monitor_test.go @@ -0,0 +1,580 @@ +package jsm + +import ( + "regexp" + "slices" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/nats-io/jsm.go/api" + "github.com/nats-io/jsm.go/monitor" +) + +func requireLen[A any](t *testing.T, s []A, expect int) { + t.Helper() + + if len(s) != expect { + t.Fatalf("Expected %d elements in collection: %v", expect, s) + } +} +func requireEmpty[A any](t *testing.T, s []A) { + t.Helper() + + if len(s) != 0 { + t.Fatalf("Expected empty collection: %v", s) + } +} + +func requireElement[S ~[]E, E comparable](t *testing.T, s S, v E) { + t.Helper() + + if !slices.Contains(s, v) { + t.Fatalf("Expected %v to contain %v", s, v) + } +} + +func requireRegexElement(t *testing.T, s []string, m string) { + t.Helper() + + r, err := regexp.Compile(m) + if err != nil { + t.Fatalf("invalid regex: %v", err) + } + + if !slices.ContainsFunc(s, func(s string) bool { + return r.MatchString(s) + }) { + t.Fatalf("Expected %v to contain element matching %q", s, m) + } +} + +func TestStream_checkSources(t *testing.T) { + setup := func() (*Stream, *monitor.Result, *api.StreamInfo) { + return &Stream{}, &monitor.Result{}, &api.StreamInfo{ + Config: api.StreamConfig{ + Name: "test_stream", + }, + } + } + + t.Run("Should handle fewer than desired", func(t *testing.T) { + s, check, si := setup() + s.checkSources(si, check, StreamHealthCheckOptions{ + MinSources: 1, + MaxSources: 2, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "0 sources") + + s, check, si = setup() + si.Sources = append(si.Sources, &api.StreamSourceInfo{}) + s.checkSources(si, check, StreamHealthCheckOptions{ + MinSources: 2, + MaxSources: 3, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "1 sources") + }) + + t.Run("Should handle more than desired", func(t *testing.T) { + s, check, si := setup() + si.Sources = append(si.Sources, &api.StreamSourceInfo{}) + si.Sources = append(si.Sources, &api.StreamSourceInfo{}) + s.checkSources(si, check, StreamHealthCheckOptions{ + MaxSources: 1, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "2 sources") + }) + + t.Run("Should handle valid number of sources", func(t *testing.T) { + s, check, si := setup() + si.Sources = append(si.Sources, &api.StreamSourceInfo{}) + si.Sources = append(si.Sources, &api.StreamSourceInfo{}) + s.checkSources(si, check, StreamHealthCheckOptions{ + MinSources: 2, + MaxSources: 3, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireElement(t, check.OKs, "2 sources") + }) + + t.Run("Should detect lagged replicas", func(t *testing.T) { + s, check, si := setup() + si.Sources = append(si.Sources, &api.StreamSourceInfo{ + Lag: 100, + }) + si.Sources = append(si.Sources, &api.StreamSourceInfo{ + Lag: 200, + }) + s.checkSources(si, check, StreamHealthCheckOptions{ + SourcesLagCritical: 100, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "2 sources are lagged") + }) + + t.Run("Should detect not seen replicas", func(t *testing.T) { + s, check, si := setup() + si.Sources = append(si.Sources, &api.StreamSourceInfo{ + Active: time.Second, + }) + si.Sources = append(si.Sources, &api.StreamSourceInfo{ + Active: 2 * time.Second, + }) + s.checkSources(si, check, StreamHealthCheckOptions{ + SourcesSeenCritical: time.Millisecond, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "2 sources are inactive") + }) + + t.Run("Should handle valid replicas", func(t *testing.T) { + s, check, si := setup() + si.Sources = append(si.Sources, &api.StreamSourceInfo{ + Lag: 100, + Active: 100 * time.Millisecond, + }) + si.Sources = append(si.Sources, &api.StreamSourceInfo{ + Lag: 200, + Active: time.Millisecond, + }) + s.checkSources(si, check, StreamHealthCheckOptions{ + SourcesLagCritical: 500, + SourcesSeenCritical: time.Second, + MinSources: 2, + MaxSources: 10, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + if !cmp.Equal(check.OKs, []string{"2 sources", "2 sources current", "2 sources active"}) { + t.Fatalf("invalid OK status: %v", check.OKs) + } + }) +} + +func TestStream_checkMessages(t *testing.T) { + setup := func() (*Stream, *monitor.Result, *api.StreamInfo) { + return &Stream{}, &monitor.Result{}, &api.StreamInfo{ + Config: api.StreamConfig{ + Name: "test_stream", + }, + } + } + + t.Run("Should handle no thresholds", func(t *testing.T) { + s, check, si := setup() + + si.State.Msgs = 1000 + s.checkMessages(si, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle critical situations", func(t *testing.T) { + s, check, si := setup() + si.State.Msgs = 1000 + s.checkMessages(si, check, StreamHealthCheckOptions{ + MessagesCrit: 1000, + }, api.NewDiscardLogger()) + + requireElement(t, check.Criticals, "1000 messages") + requireEmpty(t, check.OKs) + + s, check, si = setup() + si.State.Msgs = 999 + s.checkMessages(si, check, StreamHealthCheckOptions{ + MessagesCrit: 1000, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "999 messages") + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle warning situations", func(t *testing.T) { + s, check, si := setup() + si.State.Msgs = 1000 + s.checkMessages(si, check, StreamHealthCheckOptions{ + MessagesWarn: 1000, + }, api.NewDiscardLogger()) + requireElement(t, check.Warnings, "1000 messages") + requireEmpty(t, check.Criticals) + requireEmpty(t, check.OKs) + + s, check, si = setup() + si.State.Msgs = 999 + + s.checkMessages(si, check, StreamHealthCheckOptions{ + MessagesWarn: 1000, + }, api.NewDiscardLogger()) + requireElement(t, check.Warnings, "999 messages") + requireEmpty(t, check.Criticals) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle ok situations", func(t *testing.T) { + s, check, si := setup() + si.State.Msgs = 1000 + s.checkMessages(si, check, StreamHealthCheckOptions{ + MessagesWarn: 500, + MessagesCrit: 200, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireElement(t, check.OKs, "1000 messages") + }) +} + +func TestStream_checkSubjects(t *testing.T) { + setup := func() (*Stream, *monitor.Result, *api.StreamInfo) { + return &Stream{}, &monitor.Result{}, &api.StreamInfo{ + Config: api.StreamConfig{ + Name: "test_stream", + }, + } + } + + t.Run("Should handle no thresholds", func(t *testing.T) { + s, check, si := setup() + s.checkSubjects(si, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("warn less than crit", func(t *testing.T) { + t.Run("Should handle fewer subjects", func(t *testing.T) { + s, check, si := setup() + si.State.NumSubjects = 100 + + s.checkSubjects(si, check, StreamHealthCheckOptions{ + SubjectsWarn: 200, + SubjectsCrit: 300, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireElement(t, check.OKs, "100 subjects") + }) + + t.Run("Should handle more than subjects", func(t *testing.T) { + s, check, si := setup() + si.State.NumSubjects = 400 + s.checkSubjects(si, check, StreamHealthCheckOptions{ + SubjectsWarn: 200, + SubjectsCrit: 300, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "400 subjects") + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + + s, check, si = setup() + si.State.NumSubjects = 250 + s.checkSubjects(si, check, StreamHealthCheckOptions{ + SubjectsWarn: 200, + SubjectsCrit: 300, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireElement(t, check.Warnings, "250 subjects") + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle valid subject counts", func(t *testing.T) { + s, check, si := setup() + si.State.NumSubjects = 100 + s.checkSubjects(si, check, StreamHealthCheckOptions{ + SubjectsWarn: 200, + SubjectsCrit: 300, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireElement(t, check.OKs, "100 subjects") + }) + }) + + t.Run("warn more than crit", func(t *testing.T) { + t.Run("Should handle fewer subjects", func(t *testing.T) { + s, check, si := setup() + si.State.NumSubjects = 100 + s.checkSubjects(si, check, StreamHealthCheckOptions{ + SubjectsWarn: 300, + SubjectsCrit: 200, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "100 subjects") + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + + s, check, si = setup() + si.State.NumSubjects = 250 + s.checkSubjects(si, check, StreamHealthCheckOptions{ + SubjectsWarn: 300, + SubjectsCrit: 200, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireElement(t, check.Warnings, "250 subjects") + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle valid subject counts", func(t *testing.T) { + s, check, si := setup() + si.State.NumSubjects = 400 + s.checkSubjects(si, check, StreamHealthCheckOptions{ + SubjectsWarn: 300, + SubjectsCrit: 200, + }, api.NewDiscardLogger()) + + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireElement(t, check.OKs, "400 subjects") + }) + }) +} + +func TestStream_checkMirror(t *testing.T) { + s := Stream{} + + t.Run("Should handle no thresholds", func(t *testing.T) { + check := &monitor.Result{} + s.checkMirror(&api.StreamInfo{}, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should handle absent state", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Config: api.StreamConfig{ + Name: "test", + Mirror: &api.StreamSource{}, + }, + } + + s.checkMirror(si, check, StreamHealthCheckOptions{ + SourcesLagCritical: 1, + SourcesSeenCritical: 1, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "invalid state") + }) + + t.Run("Should handle lag greater than critical", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Config: api.StreamConfig{ + Name: "test", + Mirror: &api.StreamSource{}, + }, + Mirror: &api.StreamSourceInfo{ + Lag: 100, + }, + } + + s.checkMirror(si, check, StreamHealthCheckOptions{ + SourcesLagCritical: 100, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Mirror Lag 100") + + check = &monitor.Result{} + si.Mirror.Lag = 200 + s.checkMirror(si, check, StreamHealthCheckOptions{ + SourcesLagCritical: 100, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Mirror Lag 200") + }) + + t.Run("Should handle seen greater than critical", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Config: api.StreamConfig{ + Name: "test", + Mirror: &api.StreamSource{}, + }, + Mirror: &api.StreamSourceInfo{ + Active: time.Millisecond, + }, + } + + s.checkMirror(si, check, StreamHealthCheckOptions{ + SourcesSeenCritical: time.Millisecond, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Mirror Seen 1ms") + + check = &monitor.Result{} + si.Mirror.Active = time.Second + s.checkMirror(si, check, StreamHealthCheckOptions{ + SourcesSeenCritical: time.Millisecond, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Mirror Seen 1s") + }) + + t.Run("Should handle healthy mirrors", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Config: api.StreamConfig{ + Name: "test", + Mirror: &api.StreamSource{ + Name: "X", + }, + }, + Mirror: &api.StreamSourceInfo{ + Active: time.Millisecond, + Lag: 100, + }, + } + + s.checkMirror(si, check, StreamHealthCheckOptions{ + SourcesLagCritical: 200, + SourcesSeenCritical: time.Second, + }, api.NewDiscardLogger()) + + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireElement(t, check.OKs, "Mirror X") + }) +} + +func TestStream_checkCluster(t *testing.T) { + s := Stream{} + + t.Run("Skip without threshold", func(t *testing.T) { + check := &monitor.Result{} + s.checkCluster(&api.StreamInfo{}, check, StreamHealthCheckOptions{}, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should be critical when the stream is not clustered and a threshold is given", func(t *testing.T) { + check := &monitor.Result{} + s.checkCluster(&api.StreamInfo{}, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Stream is not clustered") + requireEmpty(t, check.Warnings) + requireEmpty(t, check.OKs) + }) + + t.Run("Should be critical when replica counts do not match expectation", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Cluster: &api.ClusterInfo{ + Leader: "p2", + Replicas: []*api.PeerInfo{ + {Name: "p1"}, + }, + }} + + s.checkCluster(si, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "Expected 3 replicas got 2") + }) + + t.Run("Should handle no leaders", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Cluster: &api.ClusterInfo{ + Replicas: []*api.PeerInfo{ + {Name: "p1"}, + {Name: "p2"}, + {Name: "p3"}, + }, + }, + } + + s.checkCluster(si, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "No leader") + + check = &monitor.Result{} + si.Cluster = &api.ClusterInfo{ + Leader: "p1", + Replicas: []*api.PeerInfo{ + {Name: "p2"}, + {Name: "p3"}, + }, + } + + s.checkCluster(si, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + }, api.NewDiscardLogger()) + requireEmpty(t, check.Criticals) + requireElement(t, check.OKs, "3 peers") + }) + + t.Run("Should detect lagged peers", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Cluster: &api.ClusterInfo{ + Leader: "p1", + Replicas: []*api.PeerInfo{ + {Name: "p2", Lag: 1000}, + {Name: "p3", Lag: 10}, + }, + }, + } + + s.checkCluster(si, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + ClusterLagCritical: 100, + }, api.NewDiscardLogger()) + requireElement(t, check.Criticals, "1 replicas lagged") + }) + + t.Run("Should detect inactive peers", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Cluster: &api.ClusterInfo{ + Leader: "p1", + Replicas: []*api.PeerInfo{ + {Name: "p2", Lag: 10, Active: time.Second}, + {Name: "p3", Lag: 10, Active: time.Hour}, + }, + }, + } + + s.checkCluster(si, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + ClusterSeenCritical: time.Minute, + }, api.NewDiscardLogger()) + + requireElement(t, check.Criticals, "1 replicas inactive") + }) + + t.Run("Should detect offline peers", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Cluster: &api.ClusterInfo{ + Leader: "p1", + Replicas: []*api.PeerInfo{ + {Name: "p2", Lag: 10, Active: time.Second, Offline: true}, + {Name: "p3", Lag: 10, Active: time.Hour}, + }, + }, + } + + s.checkCluster(si, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + }, api.NewDiscardLogger()) + + requireElement(t, check.Criticals, "1 replicas offline") + }) + + t.Run("Should handle ok streams", func(t *testing.T) { + check := &monitor.Result{} + si := &api.StreamInfo{ + Cluster: &api.ClusterInfo{ + Leader: "p1", + Replicas: []*api.PeerInfo{ + {Name: "p2", Lag: 10, Active: time.Second}, + {Name: "p3", Lag: 10, Active: time.Second}, + }, + }, + } + s.checkCluster(si, check, StreamHealthCheckOptions{ + ClusterExpectedPeers: 3, + ClusterLagCritical: 20, + ClusterSeenCritical: time.Minute, + }, api.NewDiscardLogger()) + + requireEmpty(t, check.Criticals) + requireEmpty(t, check.Warnings) + requireElement(t, check.OKs, "3 peers") + }) +} diff --git a/consumers_test.go b/test/consumers_test.go similarity index 99% rename from consumers_test.go rename to test/consumers_test.go index ad51f0ef..424c7ce8 100644 --- a/consumers_test.go +++ b/test/consumers_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jsm_test +package test import ( "fmt" diff --git a/jsm_test.go b/test/jsm_test.go similarity index 99% rename from jsm_test.go rename to test/jsm_test.go index 19718a1f..31e1b8f2 100644 --- a/jsm_test.go +++ b/test/jsm_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jsm_test +package test import ( "encoding/json" diff --git a/manager_test.go b/test/manager_test.go similarity index 99% rename from manager_test.go rename to test/manager_test.go index 7594e555..5a8ffb6f 100644 --- a/manager_test.go +++ b/test/manager_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jsm_test +package test import ( "context" diff --git a/msginfo_test.go b/test/msginfo_test.go similarity index 99% rename from msginfo_test.go rename to test/msginfo_test.go index d1057f4a..0fb5e1e6 100644 --- a/msginfo_test.go +++ b/test/msginfo_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jsm_test +package test import ( "fmt" diff --git a/snapshots_test.go b/test/snapshots_test.go similarity index 99% rename from snapshots_test.go rename to test/snapshots_test.go index f0b6cfcd..2fda9eb2 100644 --- a/snapshots_test.go +++ b/test/snapshots_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jsm_test +package test import ( "context" @@ -22,7 +22,6 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "github.com/nats-io/jsm.go" ) diff --git a/stream_pager_test.go b/test/stream_pager_test.go similarity index 80% rename from stream_pager_test.go rename to test/stream_pager_test.go index 07707191..c80ba9c7 100644 --- a/stream_pager_test.go +++ b/test/stream_pager_test.go @@ -1,4 +1,17 @@ -package jsm_test +// Copyright 2020-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test import ( "context" diff --git a/stream_query_test.go b/test/stream_query_test.go similarity index 99% rename from stream_query_test.go rename to test/stream_query_test.go index e951ffb3..1928e282 100644 --- a/stream_query_test.go +++ b/test/stream_query_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jsm_test +package test import ( "testing" diff --git a/streams_test.go b/test/streams_test.go similarity index 99% rename from streams_test.go rename to test/streams_test.go index be24ee77..6fb88301 100644 --- a/streams_test.go +++ b/test/streams_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jsm_test +package test import ( "context" diff --git a/testdata/bytes_required.cfg b/test/testdata/bytes_required.cfg similarity index 100% rename from testdata/bytes_required.cfg rename to test/testdata/bytes_required.cfg