diff --git a/pkg/barrier/barrier.go b/pkg/barrier/barrier.go new file mode 100644 index 0000000..5fc6e02 --- /dev/null +++ b/pkg/barrier/barrier.go @@ -0,0 +1,64 @@ +package barrier + +import ( + "context" + "github.com/juju/errors" + "sync" + "time" +) + +type ( + Barrier struct { + Capacity int + Value int + RefillPeriod time.Duration + mutex sync.RWMutex + } +) + +func New(cap int, rate time.Duration) *Barrier { + return &Barrier{ + Capacity: cap, + Value: cap, + RefillPeriod: rate, + } +} + +func (b *Barrier) RecordBadEvent() { + b.mutex.Lock() + defer b.mutex.Unlock() + if b.Value <= 0 { + return + } + b.Value = b.Value - 1 +} + +func (b *Barrier) IsBadState() bool { + b.mutex.Lock() + defer b.mutex.Unlock() + return b.Value == 0 +} + +func (b *Barrier) refill() { + b.mutex.Lock() + defer b.mutex.Unlock() + if b.Value == b.Capacity { + return + } + b.Value = b.Value + 1 +} + +func (b *Barrier) Start(ctx context.Context) error { + if b.Value < 0 { + return errors.New("The instance cannot be initialised with a negative number") + } + + for { + select { + case <-ctx.Done(): + return nil + case <-time.NewTicker(b.RefillPeriod).C: + b.refill() + } + } +} diff --git a/pkg/barrier/barrier_test.go b/pkg/barrier/barrier_test.go new file mode 100644 index 0000000..93e70e9 --- /dev/null +++ b/pkg/barrier/barrier_test.go @@ -0,0 +1,46 @@ +package barrier + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBarrier(t *testing.T) { + t.Parallel() + t.Run("successfully record bad event", func(t *testing.T) { + b := New(10, 1*time.Second) + b.RecordBadEvent() + assert.Equal(t, 9, b.Value) + }) + t.Run("errors if the instance capacity is set to a negative number", func(t *testing.T) { + ctx := context.Background() + b := New(-1, 1*time.Second) + assert.Error(t, b.Start(ctx)) + }) + t.Run("record bad event doesn't go to a negative number", func(t *testing.T) { + b := New(0, 1*time.Second) + b.RecordBadEvent() + assert.Equal(t, 0, b.Value) + }) + t.Run("isBadState returns true when instance reaches 0", func(t *testing.T) { + b := New(0, 1*time.Second) + assert.True(t, true, b.IsBadState()) + }) + t.Run("successfully refill barrier at the refill rate", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + b := New(10, 1*time.Second) + go func() { + require.NoError(t, b.Start(ctx)) + }() + b.RecordBadEvent() + b.RecordBadEvent() + assert.Equal(t, 8, b.Value) + time.Sleep(3 * time.Second) + cancel() + assert.Equal(t, 10, b.Value) + }) +} diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index cde185e..9522cb4 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -10,6 +10,7 @@ import ( "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/tfadeyi/auth0-simple-exporter/pkg/barrier" "github.com/tfadeyi/auth0-simple-exporter/pkg/client" "github.com/tfadeyi/auth0-simple-exporter/pkg/client/logs" "github.com/tfadeyi/auth0-simple-exporter/pkg/exporter/metrics" @@ -28,6 +29,8 @@ type ( // exporter namespace string subsystem string + // detects whether the exporter is in a bad state + state *barrier.Barrier // checkpoint from where to start fetching logs startTime time.Time userMetricDisabled bool @@ -63,6 +66,7 @@ func New(ctx context.Context, opts ...Option) *exporter { namespace: "auth0", subsystem: "", ctx: ctx, + state: barrier.New(10, 5*time.Minute), startTime: time.Now(), targetScrapeRequestErrors: prometheus.NewCounter( prometheus.CounterOpts{ @@ -149,6 +153,7 @@ func (e *exporter) collect(ctx context.Context, m *metrics.Metrics) error { eventLogs := list.([]*management.Log) e.logger.V(0).Error(err, "Request was terminated by Prometheus. The exporter could not finish polling the Auth0 log client to fetch the tenant logs."+ "Please try increase the prometheus scrape period", "logs_events_found", len(eventLogs), "from", e.startTime) + e.state.RecordBadEvent() case errors.Is(err, context.DeadlineExceeded): eventLogs := list.([]*management.Log) e.logger.V(0).Error(err, "Request could not be completed in the current request timeout. The exporter could not finish polling the Auth0 log client to fetch the tenant logs."+ @@ -182,6 +187,7 @@ func (e *exporter) collect(ctx context.Context, m *metrics.Metrics) error { eventUsers := list.([]*management.User) e.logger.V(0).Error(err, "Request was terminated by Prometheus. The exporter could not finish polling the Auth0 user client to fetch the tenant users."+ "Please increase the prometheus scrape period ", "users_found", len(eventUsers)) + e.state.RecordBadEvent() case errors.Is(err, context.DeadlineExceeded): eventUsers := list.([]*management.User) e.logger.V(0).Error(err, "Request could not be completed in the current request timeout. The exporter could not finish polling the Auth0 user client to fetch the tenant users."+ diff --git a/pkg/exporter/server.go b/pkg/exporter/server.go index d2c49cd..aa112b7 100644 --- a/pkg/exporter/server.go +++ b/pkg/exporter/server.go @@ -75,6 +75,10 @@ func (e *exporter) Export() error { `, e.metricsAddr)) }) server.GET("/healthz", func(ctx echo.Context) error { + // check if the exporter might be stuck + if e.state.IsBadState() { + return echo.ErrInternalServerError + } return ctx.JSON(http.StatusOK, "ok") }) @@ -112,6 +116,9 @@ func (e *exporter) Export() error { return server.Start(fmt.Sprintf(":%d", e.hostPort)) }) } + grp.Go(func() error { + return e.state.Start(ctx) + }) grp.Go(func() error { <-ctx.Done() return server.Shutdown(context.Background())