Skip to content

Commit

Permalink
feat: Add possibility to fetch only cluster scoped logs (#47)
Browse files Browse the repository at this point in the history
* feat: Add possibility to fetch only cluster scoped logs
  • Loading branch information
apasyniuk authored Dec 20, 2023
1 parent 5d46afa commit 6fc69a5
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 94 deletions.
11 changes: 10 additions & 1 deletion auditlogsreceiver/audit_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ const (
timestampLayout = "2006-01-02T15:04:05.999999999Z"
)

type filters struct {
clusterID *string
}

type auditLogsReceiver struct {
logger *zap.Logger
pollInterval time.Duration
pageLimit int

pageLimit int
filter filters

wg *sync.WaitGroup
stopPolling context.CancelFunc
Expand Down Expand Up @@ -117,6 +123,9 @@ func (a *auditLogsReceiver) poll(ctx context.Context, stopFunc func()) error {
"toDate": pollData.ToDate.UTC().Format(timestampLayout),
"fromDate": pollData.CheckPoint.UTC().Format(timestampLayout),
}
if a.filter.clusterID != nil && *a.filter.clusterID != "" {
queryParams["clusterId"] = *a.filter.clusterID
}
}

resp, err := a.rest.R().
Expand Down
24 changes: 17 additions & 7 deletions auditlogsreceiver/audit_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,22 @@ func TestPoll(t *testing.T) {
httpmock.ActivateNonDefault(rest.GetClient())
defer httpmock.Reset()

expectedClusterID := uuid.NewString()

// Polling parameters are not known at the moment of registering a responder, so asserting params in the responder vs using an exact query.
httpmock.RegisterResponder(
http.MethodGet,
`=~^https:\/\/api\.cast\.ai/v1/audit.?`,
defaultResponderWithAssertions(t, &data, restConfig.PageLimit, `{}`))
defaultResponderWithAssertions(t, &data, restConfig.PageLimit, `{}`, expectedClusterID))

receiver := auditLogsReceiver{
logger: logger,
pageLimit: restConfig.PageLimit,
storage: storageMock,
rest: rest,
filter: filters{
clusterID: &expectedClusterID,
},
storage: storageMock,
rest: rest,
}
err := receiver.poll(ctx, nil)
r.NoError(err)
Expand Down Expand Up @@ -157,18 +162,23 @@ func TestPoll(t *testing.T) {
httpmock.ActivateNonDefault(rest.GetClient())
defer httpmock.Reset()

expectedClusterID := uuid.NewString()

// Polling parameters are not known at the moment of registering a responder, so asserting params in the responder vs using an exact query.
httpmock.RegisterResponder(
http.MethodGet,
`=~^https:\/\/api\.cast\.ai/v1/audit.?`,
defaultResponderWithAssertions(t, &data, restConfig.PageLimit, newResponseWithOneItem(lastLogTimestamp)))
defaultResponderWithAssertions(t, &data, restConfig.PageLimit, newResponseWithOneItem(lastLogTimestamp), expectedClusterID))

receiver := auditLogsReceiver{
logger: logger,
pageLimit: restConfig.PageLimit,
storage: storageMock,
rest: rest,
consumer: consumerMock,
filter: filters{
clusterID: &expectedClusterID,
},
storage: storageMock,
rest: rest,
consumer: consumerMock,
}
err := receiver.poll(ctx, nil)
r.NoError(err)
Expand Down
13 changes: 13 additions & 0 deletions auditlogsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/url"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"go.opentelemetry.io/collector/component"
)
Expand All @@ -20,6 +21,11 @@ type Config struct {
PollIntervalSec int `mapstructure:"poll_interval_sec"`
PageLimit int `mapstructure:"page_limit"`
Storage map[string]interface{} `mapstructure:"storage"`
Filters FilterConfig `mapstructure:"filters"`
}

type FilterConfig struct {
ClusterID *string `mapstructure:"cluster_id,omitempty"`
}

type InMemoryStorageConfig struct {
Expand Down Expand Up @@ -64,6 +70,13 @@ func (c Config) Validate() error {
return errors.New("page limit must be within 10...1000 interval")
}

if c.Filters.ClusterID != nil && *c.Filters.ClusterID != "" {
_, err := uuid.Parse(*c.Filters.ClusterID)
if err != nil {
return errors.New("cluster id must be a valid UUID")
}
}

// Validating storage configuration based on its type.
t, ok := c.Storage["type"]
if !ok {
Expand Down
13 changes: 8 additions & 5 deletions auditlogsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ func NewAuditLogsReceiver(
logger: logger,
pollInterval: time.Second * time.Duration(cfg.PollIntervalSec),
pageLimit: cfg.PageLimit,
wg: &sync.WaitGroup{},
stopPolling: func() {},
storage: st,
rest: newRestyClient(cfg),
consumer: consumer,
filter: filters{
clusterID: cfg.Filters.ClusterID,
},
wg: &sync.WaitGroup{},
stopPolling: func() {},
storage: st,
rest: newRestyClient(cfg),
consumer: consumer,
}, nil
}

Expand Down
44 changes: 23 additions & 21 deletions auditlogsreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,45 @@ go 1.20
require (
github.com/go-resty/resty/v2 v2.7.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/mitchellh/mapstructure v1.5.0
github.com/google/uuid v1.3.1
github.com/jarcoal/httpmock v1.3.0
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/samber/lo v1.38.1
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.80.0
go.opentelemetry.io/collector/consumer v0.80.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0013
go.opentelemetry.io/collector/receiver v0.80.0
go.uber.org/zap v1.24.0
go.opentelemetry.io/collector/component v0.91.0
go.opentelemetry.io/collector/consumer v0.91.0
go.opentelemetry.io/collector/pdata v1.0.0
go.opentelemetry.io/collector/receiver v0.91.0
go.uber.org/zap v1.26.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jarcoal/httpmock v1.3.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/knadh/koanf/v2 v2.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.80.0 // indirect
go.opentelemetry.io/collector/confmap v0.80.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0013 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect
go.opentelemetry.io/collector/confmap v0.91.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/goleak v1.2.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/grpc v1.56.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 6fc69a5

Please sign in to comment.