Skip to content

Commit

Permalink
fix(exporter): Fixed monotonicity, performance and behaviour in gener…
Browse files Browse the repository at this point in the history
…al (#137)
  • Loading branch information
dmotterlini authored Dec 5, 2023
1 parent 43884b0 commit 54f9e77
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 244 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Flags:
--auth0.client-id string Auth0 management api client-id.
--auth0.client-secret string Auth0 management api client-secret.
--auth0.domain string Auth0 tenant's domain. (i.e: <tenant_name>.eu.auth0.com).
--auth0.from string Point in time from were to start fetching auth0 logs. (format: YYYY-MM-DD) (default "2023-04-02")
--auth0.from string Point in time from were to start fetching auth0 logs. (format: RFC3339) (default Now)
--auth0.token string Auth0 management api static token. (the token can be used instead of client credentials).
-h, --help help for export
--log.level string Exporter log level (debug, info, warn, error). (default "warn")
Expand Down
2 changes: 1 addition & 1 deletion cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func serveExporterCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
log := logging.NewPromLoggerWithOpts(opts.LogLevel)
from, err := time.Parse("2006-01-02", opts.FromFetchTime)
from, err := time.Parse(time.RFC3339, opts.FromFetchTime)
if err != nil {
return errors.Annotate(err, "failed to parse value in --auth0.from flag")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/options/exporter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (o *Options) addAppFlags(fs *pflag.FlagSet) {
fs.StringVar(
&o.FromFetchTime,
"auth0.from",
(time.Now()).Format("2006-01-02"),
"Point in time from were to start fetching auth0 logs. (format: YYYY-MM-DD)",
(time.Now()).Format(time.RFC3339),
"Point in time from were to start fetching auth0 logs. (format: RFC3339)",
)
fs.StringVar(
&o.cfg.Domain,
Expand Down
5 changes: 4 additions & 1 deletion pkg/client/applications/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package applications

import (
"context"

"github.com/auth0/go-auth0/management"
"github.com/juju/errors"
"go.uber.org/multierr"
)

//go:generate moq -out apps_mock.go . Client

const ItemCountPerPage = 100

var ErrAPIRateLimitReached = errors.New("client reached api rate limit")

type (
Expand All @@ -29,7 +32,7 @@ func (l *applicationClient) List(ctx context.Context, args ...interface{}) (inte
apps, err := l.mgmt.List(
ctx,
management.IncludeFields("name"),
management.PerPage(100),
management.PerPage(ItemCountPerPage),
management.Page(page),
)
switch {
Expand Down
91 changes: 26 additions & 65 deletions pkg/client/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ type (
}
)

// max number of items returned by Auth0 for each API call (see
// https://auth0.com/docs/troubleshoot/product-lifecycle/past-migrations/migrate-to-paginated-queries)
const ItemCountPerPage = 50

var ErrAPIRateLimitReached = errors.New("client reached api rate limit")
var errLastCheckpointMaxAttemptsReached = errors.New("max number of attempts was reached")

// New returns a new instance of the log fetching client, plus possible errors
func New(domain, clientID, clientSecret, token string) (*logClient, error) {
Expand Down Expand Up @@ -67,80 +70,38 @@ func (l *logClient) List(ctx context.Context, args ...interface{}) (interface{},
}
}

// Get the last log from the list of logs for the previous day.
// This is used as the starting point for the fetching of the logs.
// This allows us to use the checkpoint pagination style
var checkpoint *management.Log
var err error
checkpoint, err = l.findLatestCheckpoint(ctx, from, 0, 30)
switch {
case errors.Is(err, errLastCheckpointMaxAttemptsReached):
// do nothing
case errors.Is(err, errors.QuotaLimitExceeded):
return allLogs, ErrAPIRateLimitReached
case err != nil:
return allLogs, err
}

for {
logs, err := l.fetchLogs(ctx, checkpoint)
query := fmt.Sprintf("date:{%s TO *]", from.UTC().Format(time.RFC3339))

logs, err := l.mgmt.List(
ctx,
management.IncludeFields("type", "log_id", "date", "client_name"),
management.Query(query),
management.Sort("date:1"),
management.Take(ItemCountPerPage),
)

switch {
case errors.Is(err, errors.QuotaLimitExceeded):
return allLogs, ErrAPIRateLimitReached
case err != nil:
return allLogs, err
}
allLogs = append(allLogs, logs...)

if len(logs) == 0 {
return allLogs, nil
}
checkpoint = logs[len(logs)-1]
}
}
break
} else if len(logs) == ItemCountPerPage {
// the last item is used as checkpoint (it will be the first
// of the next response)
from = *logs[len(logs)-1].Date
logs = logs[:len(logs)-1]

// fetchLogs returns the list of logs given a starting checkpoint. If no checkpoint is passed it returns the list of the
// latest logs. (Default: 100 items)
func (l *logClient) fetchLogs(ctx context.Context, checkpoint *management.Log) ([]*management.Log, error) {
if checkpoint != nil {
return l.mgmt.List(
ctx,
management.IncludeFields("type", "log_id", "date", "client_name"),
management.From(checkpoint.GetLogID()),
management.Take(100),
)
}
return l.mgmt.List(
ctx,
management.IncludeFields("type", "log_id", "date", "client_name"),
management.Take(100),
)
}

// findLatestCheckpoint recursively polls the logs api to find the latest available log to be use for the checkpoint pagination.
// Keeps polling the auth0 api until max attempts are reached or a checkpoint log is found.
func (l *logClient) findLatestCheckpoint(ctx context.Context, from time.Time, attempt, maxAttempts int) (*management.Log, error) {
var checkpoint *management.Log
if attempt > maxAttempts {
return nil, errLastCheckpointMaxAttemptsReached
}

if !from.IsZero() {
previousDay := from.Add(-24 * time.Hour)
logs, err := l.mgmt.List(
ctx,
management.IncludeFields("type", "log_id", "date", "client_name"),
management.PerPage(1),
management.Page(0),
management.Query(fmt.Sprintf("date:[%s TO %s]", previousDay.Format("2006-01-02"), previousDay.Format("2006-01-02"))),
)
if err != nil {
return nil, err
}
if len(logs) > 0 {
return logs[0], nil
allLogs = append(allLogs, logs...)
} else {
allLogs = append(allLogs, logs...)
break
}
return l.findLatestCheckpoint(ctx, previousDay, attempt+1, maxAttempts)
}
return checkpoint, nil

return allLogs, nil
}
73 changes: 8 additions & 65 deletions pkg/client/logs/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,87 +55,30 @@ func TestClient(t *testing.T) {

t.Run("successfully fetch all logs across multiple pages with client.List", func(t *testing.T) {
totalLogNumber := 220
take := 1
checkpoint := 0
firstCall := true
taken := 0

storedLogs := make([]*management.Log, totalLogNumber)
for i := 0; i < totalLogNumber; i++ {
var code = "f"
var logID = fmt.Sprintf("log-%d", i)
storedLogs[i] = &management.Log{LogID: &logID, Type: &code}
storedLogs[i] = &management.Log{LogID: &logID, Type: &code, Date: &time.Time{}}
}

c := logClient{mgmt: &logManagementMock{
ListFunc: func(ctx context.Context, opts ...management.RequestOption) ([]*management.Log, error) {
var result []*management.Log
if !firstCall {
take = 100
}
take := min(totalLogNumber-taken, ItemCountPerPage)
result := storedLogs[taken : taken+take]

if checkpoint >= totalLogNumber {
return result, nil
}

if (checkpoint + take) >= totalLogNumber {
result = storedLogs[checkpoint:totalLogNumber]
} else {
result = storedLogs[checkpoint:(checkpoint + take)]
}

checkpoint += take
firstCall = false
// the -1 here is to compensate the fact that the c.List
// function sacrifices the last item (except on the last call)
taken += take - 1

return result, nil
},
}}

totalActualLogs, err := c.List(context.Background(), time.Now())
require.NoError(t, err)
assert.Len(t, totalActualLogs, totalLogNumber-1)
})
}

func TestFindLatestCheckpoint(t *testing.T) {
var checkpointID = "foo"
t.Run("successfully find latest checkpoint, 2 days before auth0.from", func(t *testing.T) {
from := time.Now()
maxAttempts := 30
expected := &management.Log{LogID: &checkpointID}
var globalCounter = 2

client := logClient{mgmt: &logManagementMock{
ListFunc: func(ctx context.Context, opts ...management.RequestOption) ([]*management.Log, error) {
var result []*management.Log
if globalCounter == 2 {
return append(result, &management.Log{LogID: &checkpointID}), nil
}
return result, nil
},
}}

checkpoint, err := client.findLatestCheckpoint(context.TODO(), from, globalCounter, maxAttempts)
require.NoError(t, err)
assert.EqualValues(t, expected.LogID, checkpoint.LogID)
})

t.Run("fails to find latest checkpoint, max attempt are reached", func(t *testing.T) {
from := time.Now()
maxAttempts := 10
var globalCounter = 12

client := logClient{mgmt: &logManagementMock{
ListFunc: func(ctx context.Context, opts ...management.RequestOption) ([]*management.Log, error) {
var result []*management.Log
if globalCounter == 2 {
return append(result, &management.Log{LogID: &checkpointID}), nil
}
return result, nil
},
}}

_, err := client.findLatestCheckpoint(context.TODO(), from, globalCounter, maxAttempts)
require.Error(t, err)
assert.ErrorIs(t, err, errLastCheckpointMaxAttemptsReached)
assert.Len(t, totalActualLogs, totalLogNumber)
})
}
5 changes: 4 additions & 1 deletion pkg/client/users/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package users

import (
"context"

"github.com/auth0/go-auth0/management"
"github.com/juju/errors"
"go.uber.org/multierr"
Expand All @@ -11,6 +12,8 @@ import (

var ErrAPIRateLimitReached = errors.New("client reached api rate limit")

const ItemCountPerPage = 100

type (
Client interface {
List(ctx context.Context, args ...interface{}) (interface{}, error)
Expand All @@ -29,7 +32,7 @@ func (l *usersClient) List(ctx context.Context, args ...interface{}) (interface{
users, err := l.mgmt.List(
ctx,
management.IncludeFields("user_id", "blocked", "last_login"),
management.PerPage(100),
management.PerPage(ItemCountPerPage),
management.Page(page),
)
switch {
Expand Down
19 changes: 13 additions & 6 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
totalScrapes prometheus.Counter
targetScrapeRequestErrors prometheus.Counter
probeRegistry *prometheus.Registry
metricsObject *metrics.Metrics
metricsRegistry *prometheus.Registry
}
Option func(e *exporter)
)
Expand All @@ -72,7 +74,8 @@ func New(ctx context.Context, opts ...Option) *exporter {
Name: "target_scrape_request_total",
Help: "Total requests to the exporter",
}),
probeRegistry: prometheus.NewRegistry(),
probeRegistry: prometheus.NewRegistry(),
metricsRegistry: prometheus.NewRegistry(),
}
for _, opt := range opts {
// apply options
Expand All @@ -99,13 +102,10 @@ func (e *exporter) metrics() echo.HandlerFunc {
return func(ctx echo.Context) error {
log := logging.LoggerFromEchoContext(ctx)
log.Info("handling request for the auth0 tenant metrics")
metrics := ctx.Get(metrics.ListCtxKey).(*metrics.Metrics)
registry := prometheus.NewRegistry()
registry.MustRegister(metrics.List()...)

e.totalScrapes.Inc()
log.Info("handling request for the auth0 tenant metrics")
err := e.collect(ctx.Request().Context(), metrics)
err := e.collect(ctx.Request().Context(), e.metricsObject)
switch {
case errors.Is(err, logs.ErrAPIRateLimitReached):
log.Error(err, "reached the Auth0 rate limit, fetching should resume shortly")
Expand All @@ -117,7 +117,7 @@ func (e *exporter) metrics() echo.HandlerFunc {
}

log.Info("successfully collected metrics from the Auth0 tenant")
promhttp.HandlerFor(registry, promhttp.HandlerOpts{}).ServeHTTP(ctx.Response(), ctx.Request())
promhttp.HandlerFor(e.metricsRegistry, promhttp.HandlerOpts{}).ServeHTTP(ctx.Response(), ctx.Request())
return nil
}
}
Expand Down Expand Up @@ -157,6 +157,11 @@ func (e *exporter) collect(ctx context.Context, m *metrics.Metrics) error {
return errors.New("Auth0 log client did not return the expected list of Log type")
}

// updating the exporter start time with latest
if len(tenantLogEvents) > 0 {
e.startTime = *tenantLogEvents[len(tenantLogEvents)-1].Date
}

for _, event := range tenantLogEvents {
if err := m.Update(event); err != nil {
e.logger.V(0).Error(err, err.Error())
Expand All @@ -175,10 +180,12 @@ func (e *exporter) collect(ctx context.Context, m *metrics.Metrics) error {
case err != nil:
return errors.Annotate(err, "error fetching the users from Auth0")
}

tenantUsers, ok := list.([]*management.User)
if !ok {
return errors.New("auth0 client users fetch didn't return the expected list of User type")
}

if err := m.ProcessUsers(tenantUsers); err != nil {
e.logger.V(0).Error(err, err.Error())
}
Expand Down
Loading

0 comments on commit 54f9e77

Please sign in to comment.