Skip to content

Commit

Permalink
feat(logging): export logs (#30)
Browse files Browse the repository at this point in the history
Co-authored-by: jmazionis <[email protected]>
  • Loading branch information
zilvinasu and jmazionis authored Jun 30, 2021
1 parent ff7035b commit 39059aa
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 13 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.16
require (
github.com/aws/aws-sdk-go v1.37.23
github.com/go-resty/resty/v2 v2.5.0
github.com/golang/mock v1.4.1
github.com/golang/mock v1.5.0
github.com/google/uuid v1.1.2
github.com/jarcoal/httpmock v1.0.8
github.com/sirupsen/logrus v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.1 h1:ocYkMQY5RrXTYgXl7ICpV0IXwlEQGwKIsery4gyXa1U=
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
35 changes: 32 additions & 3 deletions internal/castai/castai.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ var (
hdrAPIKey = http.CanonicalHeaderKey(headerAPIKey)
)

var DoNotSendLogs = struct{}{}

func DoNotSendLogsCtx() context.Context {
ctx := context.Background()
ctx = context.WithValue(ctx, DoNotSendLogs, "true")
return ctx
}

// Client responsible for communication between the agent and CAST AI API.
type Client interface {
// RegisterCluster sends a request to CAST AI containing discovered cluster properties used to authenticate the
Expand All @@ -38,12 +46,14 @@ type Client interface {
ExchangeAgentTelemetry(ctx context.Context, clusterID string, req *AgentTelemetryRequest) (*AgentTelemetryResponse, error)
// SendDelta sends the kubernetes state change to CAST AI. Function is noop when items are empty.
SendDelta(ctx context.Context, delta *Delta) error
// SendLogEvent sends agent's log event to CAST AI.
SendLogEvent(ctx context.Context, clusterID string, req *IngestAgentLogsRequest) *IngestAgentLogsResponse
}

// NewClient creates and configures the CAST AI client.
func NewClient(log logrus.FieldLogger, rest *resty.Client) Client {
func NewClient(log *logrus.Logger, rest *resty.Client) Client {
return &client{
log: log.WithField("client", "castai"),
log: log,
rest: rest,
}
}
Expand All @@ -62,7 +72,7 @@ func NewDefaultClient() *resty.Client {
}

type client struct {
log logrus.FieldLogger
log *logrus.Logger
rest *resty.Client
}

Expand Down Expand Up @@ -138,6 +148,25 @@ func (c *client) RegisterCluster(ctx context.Context, req *RegisterClusterReques
return body, nil
}

func (c *client) SendLogEvent(ctx context.Context, clusterID string, req *IngestAgentLogsRequest) *IngestAgentLogsResponse {
body := &IngestAgentLogsResponse{}
resp, err := c.rest.R().
SetBody(req).
SetContext(ctx).
Post(fmt.Sprintf("/v1/kubernetes/clusters/%s/agent-logs", clusterID))
log := c.log.WithContext(DoNotSendLogsCtx())
if err != nil {
log.Errorf("failed to send logs: %v", err)
return nil
}
if resp.IsError() {
log.Errorf("send log event: request error status_code=%d body=%s", resp.StatusCode(), resp.Body())
return nil
}

return body
}

func (c *client) ExchangeAgentTelemetry(ctx context.Context, clusterID string, req *AgentTelemetryRequest) (*AgentTelemetryResponse, error) {
body := &AgentTelemetryResponse{}
r := c.rest.R().
Expand Down
14 changes: 14 additions & 0 deletions internal/castai/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions internal/castai/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
)

type EKSParams struct {
Expand Down Expand Up @@ -42,6 +43,19 @@ type RegisterClusterResponse struct {
Cluster
}

type IngestAgentLogsRequest struct {
LogEvent LogEvent `json:"logEvent"`
}

type IngestAgentLogsResponse struct{}

type LogEvent struct {
Level string `json:"level"`
Time time.Time `json:"time"`
Message string `json:"message"`
Fields logrus.Fields `json:"fields"`
}

type AgentTelemetryRequest struct {
AgentVersion string `json:"agentVersion"`
GitCommit string `json:"gitCommit"`
Expand Down
27 changes: 18 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io/ioutil"
"time"

castailog "castai-agent/pkg/log"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -29,24 +31,28 @@ var (
Version = "local"
)

const LogExporterSendTimeoutSeconds = 15

func main() {
cfg := config.Get()

logger := logrus.New()
logger.SetLevel(logrus.Level(cfg.Log.Level))

var log logrus.FieldLogger = logger
log.Info("starting the agent")
castaiclient := castai.NewClient(logger, castai.NewDefaultClient())

if err := run(signals.SetupSignalHandler(), log); err != nil {
log := logrus.WithFields(logrus.Fields{})
if err := run(signals.SetupSignalHandler(), castaiclient, logger); err != nil {
logErr := &logContextErr{}
if errors.As(err, &logErr) {
log = log.WithFields(logErr.fields)
log = logger.WithFields(logErr.fields)
}
log.Fatalf("agent failed: %v", err)
}
}

func run(ctx context.Context, log logrus.FieldLogger) (reterr error) {
func run(ctx context.Context, castaiclient castai.Client, logger *logrus.Logger) (reterr error) {

fields := logrus.Fields{}

defer func() {
Expand All @@ -66,8 +72,9 @@ func run(ctx context.Context, log logrus.FieldLogger) (reterr error) {
Version: Version,
}


fields["version"] = agentVersion.Version
log = log.WithFields(fields)
log := logger.WithFields(fields)
log.Infof("running agent version: %v", agentVersion)

restconfig, err := retrieveKubeConfig(log)
Expand All @@ -86,16 +93,18 @@ func run(ctx context.Context, log logrus.FieldLogger) (reterr error) {
}

fields["provider"] = provider.Name()
log = log.WithFields(fields)
log.Infof("using provider %q", provider.Name())

castaiclient := castai.NewClient(log, castai.NewDefaultClient())

reg, err := provider.RegisterCluster(ctx, castaiclient)
if err != nil {
return fmt.Errorf("registering cluster: %w", err)
}

castailog.SetupLogExporter(logger, castaiclient, castailog.Config{
ClusterID: reg.ClusterID,
MsgSendTimeoutSecs: LogExporterSendTimeoutSeconds,
})

fields["cluster_id"] = reg.ClusterID
log = log.WithFields(fields)
log.Infof("cluster registered: %v", reg)
Expand Down
90 changes: 90 additions & 0 deletions pkg/log/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package log

import (
"context"
"sync"
"time"

"castai-agent/internal/castai"

"github.com/sirupsen/logrus"
)

type Exporter interface {
logrus.Hook
Wait()
}

func SetupLogExporter(logger *logrus.Logger, castaiclient castai.Client, cfg Config) {
logExporter := newExporter(cfg, castaiclient)
logger.AddHook(logExporter)
logrus.RegisterExitHandler(logExporter.Wait)
}

func newExporter(cfg Config, client castai.Client) Exporter {
return &exporter{
cfg: cfg,
client: client,
wg: sync.WaitGroup{},
}
}

type exporter struct {
cfg Config
client castai.Client
wg sync.WaitGroup
}

type Config struct {
ClusterID string
MsgSendTimeoutSecs time.Duration
}

func (ex *exporter) Levels() []logrus.Level {
return []logrus.Level{
logrus.ErrorLevel,
logrus.FatalLevel,
logrus.PanicLevel,
logrus.InfoLevel,
logrus.WarnLevel,
}
}

func (ex *exporter) Fire(entry *logrus.Entry) error {
if entry.Context != nil {
if v, _ := entry.Context.Value(castai.DoNotSendLogs).(string); v == "true" {
// Don't fire the hook
return nil
}
}

ex.wg.Add(1)

go func(entry *logrus.Entry) {
defer ex.wg.Done()
ex.sendLogEvent(ex.cfg.ClusterID, entry)
}(entry)

return nil
}

func (ex *exporter) Wait() {
ex.wg.Wait()
}

func (ex *exporter) sendLogEvent(clusterID string, e *logrus.Entry) {
ctx, cancel := context.WithTimeout(context.Background(), ex.cfg.MsgSendTimeoutSecs * time.Second)
defer cancel()

ex.client.SendLogEvent(
ctx,
clusterID,
&castai.IngestAgentLogsRequest{
LogEvent: castai.LogEvent{
Level: e.Level.String(),
Time: e.Time,
Message: e.Message,
Fields: e.Data,
},
})
}
59 changes: 59 additions & 0 deletions pkg/log/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package log

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"

"castai-agent/internal/castai"
mock_castai "castai-agent/internal/castai/mock"
)

func TestSetupLogExporter(t *testing.T) {
logger, hook := test.NewNullLogger()
defer hook.Reset()
mockClusterID := uuid.New().String()
ctrl := gomock.NewController(t)
mockapi := mock_castai.NewMockClient(ctrl)
SetupLogExporter(logger, mockapi, Config{ClusterID: mockClusterID, MsgSendTimeoutSecs: 1})

t.Run("sends the log msg", func(t *testing.T) {
r := require.New(t)
mockapi.EXPECT().SendLogEvent(gomock.Any(), gomock.Any(), gomock.Any()).
Do(assertClusterFields(r, mockClusterID, "eks")).Return(&castai.IngestAgentLogsResponse{}).Times(1)
log := logger.WithFields(logrus.Fields{
"cluster_id": mockClusterID,
"provider": "eks",
})
log.Log(logrus.ErrorLevel, "failed to discover account id")
time.Sleep(1 * time.Second)
})

t.Run("skips sending log msg", func(t *testing.T) {
mockapi.EXPECT().SendLogEvent(gomock.Any(), gomock.Any(), gomock.Any()).Times(0)
log := logrus.WithContext(castai.DoNotSendLogsCtx())
log = log.WithFields(logrus.Fields{
"cluster_id": mockClusterID,
"provider": "eks",
})
log.Log(logrus.ErrorLevel, "failed to discover account id")
time.Sleep(1 * time.Second)
})
}

func assertClusterFields(
r *require.Assertions, mockClusterID, provider string,
) func(_ context.Context, clusterID string, req *castai.IngestAgentLogsRequest) *castai.IngestAgentLogsResponse {
return func(_ context.Context, clusterID string, req *castai.IngestAgentLogsRequest) *castai.IngestAgentLogsResponse {
fields := req.LogEvent.Fields
r.Equal(mockClusterID, fields["cluster_id"])
r.Equal(provider, fields["provider"])
return &castai.IngestAgentLogsResponse{}
}
}

0 comments on commit 39059aa

Please sign in to comment.