diff --git a/go.mod b/go.mod index b08f79e2..afbe5598 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/aws/aws-sdk-go v1.37.23 github.com/go-resty/resty/v2 v2.5.0 - github.com/golang/mock v1.4.1 + github.com/golang/mock v1.5.0 github.com/google/uuid v1.1.2 github.com/jarcoal/httpmock v1.0.8 github.com/sirupsen/logrus v1.7.0 diff --git a/go.sum b/go.sum index d4b56ad0..1f0d962f 100644 --- a/go.sum +++ b/go.sum @@ -137,6 +137,8 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.1 h1:ocYkMQY5RrXTYgXl7ICpV0IXwlEQGwKIsery4gyXa1U= github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= +github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/internal/castai/castai.go b/internal/castai/castai.go index c4dbd6b1..2f6818c8 100644 --- a/internal/castai/castai.go +++ b/internal/castai/castai.go @@ -28,6 +28,14 @@ var ( hdrAPIKey = http.CanonicalHeaderKey(headerAPIKey) ) +var DoNotSendLogs = struct{}{} + +func DoNotSendLogsCtx() context.Context { + ctx := context.Background() + ctx = context.WithValue(ctx, DoNotSendLogs, "true") + return ctx +} + // Client responsible for communication between the agent and CAST AI API. type Client interface { // RegisterCluster sends a request to CAST AI containing discovered cluster properties used to authenticate the @@ -38,12 +46,14 @@ type Client interface { ExchangeAgentTelemetry(ctx context.Context, clusterID string, req *AgentTelemetryRequest) (*AgentTelemetryResponse, error) // SendDelta sends the kubernetes state change to CAST AI. Function is noop when items are empty. SendDelta(ctx context.Context, delta *Delta) error + // SendLogEvent sends agent's log event to CAST AI. + SendLogEvent(ctx context.Context, clusterID string, req *IngestAgentLogsRequest) *IngestAgentLogsResponse } // NewClient creates and configures the CAST AI client. -func NewClient(log logrus.FieldLogger, rest *resty.Client) Client { +func NewClient(log *logrus.Logger, rest *resty.Client) Client { return &client{ - log: log.WithField("client", "castai"), + log: log, rest: rest, } } @@ -62,7 +72,7 @@ func NewDefaultClient() *resty.Client { } type client struct { - log logrus.FieldLogger + log *logrus.Logger rest *resty.Client } @@ -138,6 +148,25 @@ func (c *client) RegisterCluster(ctx context.Context, req *RegisterClusterReques return body, nil } +func (c *client) SendLogEvent(ctx context.Context, clusterID string, req *IngestAgentLogsRequest) *IngestAgentLogsResponse { + body := &IngestAgentLogsResponse{} + resp, err := c.rest.R(). + SetBody(req). + SetContext(ctx). + Post(fmt.Sprintf("/v1/kubernetes/clusters/%s/agent-logs", clusterID)) + log := c.log.WithContext(DoNotSendLogsCtx()) + if err != nil { + log.Errorf("failed to send logs: %v", err) + return nil + } + if resp.IsError() { + log.Errorf("send log event: request error status_code=%d body=%s", resp.StatusCode(), resp.Body()) + return nil + } + + return body +} + func (c *client) ExchangeAgentTelemetry(ctx context.Context, clusterID string, req *AgentTelemetryRequest) (*AgentTelemetryResponse, error) { body := &AgentTelemetryResponse{} r := c.rest.R(). diff --git a/internal/castai/mock/client.go b/internal/castai/mock/client.go index 79376461..84d085c9 100644 --- a/internal/castai/mock/client.go +++ b/internal/castai/mock/client.go @@ -78,3 +78,17 @@ func (mr *MockClientMockRecorder) SendDelta(arg0, arg1 interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendDelta", reflect.TypeOf((*MockClient)(nil).SendDelta), arg0, arg1) } + +// SendLogEvent mocks base method. +func (m *MockClient) SendLogEvent(arg0 context.Context, arg1 string, arg2 *castai.IngestAgentLogsRequest) *castai.IngestAgentLogsResponse { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendLogEvent", arg0, arg1, arg2) + ret0, _ := ret[0].(*castai.IngestAgentLogsResponse) + return ret0 +} + +// SendLogEvent indicates an expected call of SendLogEvent. +func (mr *MockClientMockRecorder) SendLogEvent(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendLogEvent", reflect.TypeOf((*MockClient)(nil).SendLogEvent), arg0, arg1, arg2) +} diff --git a/internal/castai/types.go b/internal/castai/types.go index 46dc94c4..c8361fbc 100644 --- a/internal/castai/types.go +++ b/internal/castai/types.go @@ -4,6 +4,7 @@ import ( "time" "github.com/google/uuid" + "github.com/sirupsen/logrus" ) type EKSParams struct { @@ -42,6 +43,19 @@ type RegisterClusterResponse struct { Cluster } +type IngestAgentLogsRequest struct { + LogEvent LogEvent `json:"logEvent"` +} + +type IngestAgentLogsResponse struct{} + +type LogEvent struct { + Level string `json:"level"` + Time time.Time `json:"time"` + Message string `json:"message"` + Fields logrus.Fields `json:"fields"` +} + type AgentTelemetryRequest struct { AgentVersion string `json:"agentVersion"` GitCommit string `json:"gitCommit"` diff --git a/main.go b/main.go index 579c1817..9724e4db 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,8 @@ import ( "io/ioutil" "time" + castailog "castai-agent/pkg/log" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -29,24 +31,28 @@ var ( Version = "local" ) +const LogExporterSendTimeoutSeconds = 15 + func main() { cfg := config.Get() + logger := logrus.New() logger.SetLevel(logrus.Level(cfg.Log.Level)) - var log logrus.FieldLogger = logger - log.Info("starting the agent") + castaiclient := castai.NewClient(logger, castai.NewDefaultClient()) - if err := run(signals.SetupSignalHandler(), log); err != nil { + log := logrus.WithFields(logrus.Fields{}) + if err := run(signals.SetupSignalHandler(), castaiclient, logger); err != nil { logErr := &logContextErr{} if errors.As(err, &logErr) { - log = log.WithFields(logErr.fields) + log = logger.WithFields(logErr.fields) } log.Fatalf("agent failed: %v", err) } } -func run(ctx context.Context, log logrus.FieldLogger) (reterr error) { +func run(ctx context.Context, castaiclient castai.Client, logger *logrus.Logger) (reterr error) { + fields := logrus.Fields{} defer func() { @@ -66,8 +72,9 @@ func run(ctx context.Context, log logrus.FieldLogger) (reterr error) { Version: Version, } + fields["version"] = agentVersion.Version - log = log.WithFields(fields) + log := logger.WithFields(fields) log.Infof("running agent version: %v", agentVersion) restconfig, err := retrieveKubeConfig(log) @@ -86,16 +93,18 @@ func run(ctx context.Context, log logrus.FieldLogger) (reterr error) { } fields["provider"] = provider.Name() - log = log.WithFields(fields) log.Infof("using provider %q", provider.Name()) - castaiclient := castai.NewClient(log, castai.NewDefaultClient()) - reg, err := provider.RegisterCluster(ctx, castaiclient) if err != nil { return fmt.Errorf("registering cluster: %w", err) } + castailog.SetupLogExporter(logger, castaiclient, castailog.Config{ + ClusterID: reg.ClusterID, + MsgSendTimeoutSecs: LogExporterSendTimeoutSeconds, + }) + fields["cluster_id"] = reg.ClusterID log = log.WithFields(fields) log.Infof("cluster registered: %v", reg) diff --git a/pkg/log/exporter.go b/pkg/log/exporter.go new file mode 100644 index 00000000..4b3221fc --- /dev/null +++ b/pkg/log/exporter.go @@ -0,0 +1,90 @@ +package log + +import ( + "context" + "sync" + "time" + + "castai-agent/internal/castai" + + "github.com/sirupsen/logrus" +) + +type Exporter interface { + logrus.Hook + Wait() +} + +func SetupLogExporter(logger *logrus.Logger, castaiclient castai.Client, cfg Config) { + logExporter := newExporter(cfg, castaiclient) + logger.AddHook(logExporter) + logrus.RegisterExitHandler(logExporter.Wait) +} + +func newExporter(cfg Config, client castai.Client) Exporter { + return &exporter{ + cfg: cfg, + client: client, + wg: sync.WaitGroup{}, + } +} + +type exporter struct { + cfg Config + client castai.Client + wg sync.WaitGroup +} + +type Config struct { + ClusterID string + MsgSendTimeoutSecs time.Duration +} + +func (ex *exporter) Levels() []logrus.Level { + return []logrus.Level{ + logrus.ErrorLevel, + logrus.FatalLevel, + logrus.PanicLevel, + logrus.InfoLevel, + logrus.WarnLevel, + } +} + +func (ex *exporter) Fire(entry *logrus.Entry) error { + if entry.Context != nil { + if v, _ := entry.Context.Value(castai.DoNotSendLogs).(string); v == "true" { + // Don't fire the hook + return nil + } + } + + ex.wg.Add(1) + + go func(entry *logrus.Entry) { + defer ex.wg.Done() + ex.sendLogEvent(ex.cfg.ClusterID, entry) + }(entry) + + return nil +} + +func (ex *exporter) Wait() { + ex.wg.Wait() +} + +func (ex *exporter) sendLogEvent(clusterID string, e *logrus.Entry) { + ctx, cancel := context.WithTimeout(context.Background(), ex.cfg.MsgSendTimeoutSecs * time.Second) + defer cancel() + + ex.client.SendLogEvent( + ctx, + clusterID, + &castai.IngestAgentLogsRequest{ + LogEvent: castai.LogEvent{ + Level: e.Level.String(), + Time: e.Time, + Message: e.Message, + Fields: e.Data, + }, + }) +} diff --git a/pkg/log/exporter_test.go b/pkg/log/exporter_test.go new file mode 100644 index 00000000..5dfecc33 --- /dev/null +++ b/pkg/log/exporter_test.go @@ -0,0 +1,59 @@ +package log + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + + "castai-agent/internal/castai" + mock_castai "castai-agent/internal/castai/mock" +) + +func TestSetupLogExporter(t *testing.T) { + logger, hook := test.NewNullLogger() + defer hook.Reset() + mockClusterID := uuid.New().String() + ctrl := gomock.NewController(t) + mockapi := mock_castai.NewMockClient(ctrl) + SetupLogExporter(logger, mockapi, Config{ClusterID: mockClusterID, MsgSendTimeoutSecs: 1}) + + t.Run("sends the log msg", func(t *testing.T) { + r := require.New(t) + mockapi.EXPECT().SendLogEvent(gomock.Any(), gomock.Any(), gomock.Any()). + Do(assertClusterFields(r, mockClusterID, "eks")).Return(&castai.IngestAgentLogsResponse{}).Times(1) + log := logger.WithFields(logrus.Fields{ + "cluster_id": mockClusterID, + "provider": "eks", + }) + log.Log(logrus.ErrorLevel, "failed to discover account id") + time.Sleep(1 * time.Second) + }) + + t.Run("skips sending log msg", func(t *testing.T) { + mockapi.EXPECT().SendLogEvent(gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + log := logrus.WithContext(castai.DoNotSendLogsCtx()) + log = log.WithFields(logrus.Fields{ + "cluster_id": mockClusterID, + "provider": "eks", + }) + log.Log(logrus.ErrorLevel, "failed to discover account id") + time.Sleep(1 * time.Second) + }) +} + +func assertClusterFields( + r *require.Assertions, mockClusterID, provider string, +) func(_ context.Context, clusterID string, req *castai.IngestAgentLogsRequest) *castai.IngestAgentLogsResponse { + return func(_ context.Context, clusterID string, req *castai.IngestAgentLogsRequest) *castai.IngestAgentLogsResponse { + fields := req.LogEvent.Fields + r.Equal(mockClusterID, fields["cluster_id"]) + r.Equal(provider, fields["provider"]) + return &castai.IngestAgentLogsResponse{} + } +}