From 4dd99bd9f3f6a64cdb46531baa8448401b7b6948 Mon Sep 17 00:00:00 2001 From: Peter Nemere Date: Mon, 4 Dec 2023 15:06:48 +1000 Subject: [PATCH] Implemented log fetching and fixed bug in log id generation for import jobs --- api/dataimport/for-trigger.go | 2 +- api/ws/handlers/log.go | 79 +++++++++++++++++++++++++---------- 2 files changed, 59 insertions(+), 22 deletions(-) diff --git a/api/dataimport/for-trigger.go b/api/dataimport/for-trigger.go index 878e5b2e..8db3209e 100644 --- a/api/dataimport/for-trigger.go +++ b/api/dataimport/for-trigger.go @@ -59,7 +59,7 @@ func ImportForTrigger( sourceBucket, sourceFilePath, datasetID, jobId, err := decodeImportTrigger(triggerMessage) // Report a status so API/users can track what's going on already - logId := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME") + "/" + os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME ") + logId := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME") + "/|/" + os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME") ts := timestamper.UnixTimeNowStamper{} job.UpdateJob(jobId, protos.JobStatus_STARTING, "Starting importer", logId, db, &ts, log) diff --git a/api/ws/handlers/log.go b/api/ws/handlers/log.go index d527c0d5..0d91ac60 100644 --- a/api/ws/handlers/log.go +++ b/api/ws/handlers/log.go @@ -2,39 +2,42 @@ package wsHandler import ( "errors" + "fmt" + "strings" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/pixlise/core/v3/api/services" "github.com/pixlise/core/v3/api/ws/wsHelpers" + "github.com/pixlise/core/v3/core/awsutil" "github.com/pixlise/core/v3/core/errorwithstatus" "github.com/pixlise/core/v3/core/logger" protos "github.com/pixlise/core/v3/generated-protos" ) func HandleLogReadReq(req *protos.LogReadReq, hctx wsHelpers.HandlerContext) (*protos.LogReadResp, error) { - return nil, errors.New("HandleLogReadReq not implemented yet") - /* - if err := wsHelpers.CheckStringField(&req.LogStreamId, "LogStreamId", 1, 512); err != nil { - return nil, err - } - - logGroup := "/dataset-importer/" + hctx.Svcs.Config.EnvironmentName - logs, err := cloudwatch.FetchLogs(hctx.Svcs, logGroup, req.LogStreamId) - - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { - return nil, errorwithstatus.MakeNotFoundError(req.LogStreamId) - } - } + if err := wsHelpers.CheckStringField(&req.LogStreamId, "LogStreamId", 1, 512); err != nil { + return nil, err + } - for _, logEntry := range logs { + // We now just send the AWS cloudwatch log group+stream id out, so expect this to be directly accessible + bits := strings.Split(req.LogStreamId, "/|/") + if len(bits) != 2 { + return nil, errors.New("Failed to get log group and stream from: " + req.LogStreamId) + } - } + logs, err := fetchLogs(hctx.Svcs, bits[0], bits[1]) - // Got it, return it + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { + return nil, errorwithstatus.MakeNotFoundError(req.LogStreamId) + } + } - return &protos.LogReadResp{ - Entries: entries, - }, nil - */ + return &protos.LogReadResp{ + Entries: logs, + }, nil } func HandleLogGetLevelReq(req *protos.LogGetLevelReq, hctx wsHelpers.HandlerContext) (*protos.LogGetLevelResp, error) { @@ -66,3 +69,37 @@ func HandleLogSetLevelReq(req *protos.LogSetLevelReq, hctx wsHelpers.HandlerCont return &protos.LogSetLevelResp{LogLevelId: req.LogLevelId}, nil } + +func fetchLogs(services *services.APIServices, logGroupName string, logStreamName string) ([]*protos.LogLine, error) { + var limit int64 = 10000 + + result := []*protos.LogLine{} + + sess, err := awsutil.GetSession() + if err != nil { + return []*protos.LogLine{}, fmt.Errorf("Failed to create AWS session. Error: %v", err) + } + + // NOTE: previously here we used a session: AWSSessionCW which could be configured to a different region... don't know why + // this was required but seemed redundant, it was in the same region lately... + svc := cloudwatchlogs.New(sess) + resp, err := svc.GetLogEvents(&cloudwatchlogs.GetLogEventsInput{ + Limit: &limit, + LogGroupName: aws.String(logGroupName), + LogStreamName: aws.String(logStreamName), + }) + + if err != nil { + //log.Errorf("Got error getting log events: %v", err) + return result, err + } + + for _, event := range resp.Events { + result = append(result, &protos.LogLine{ + TimeStampUnixMs: uint32(*event.IngestionTime), + Message: *event.Message, + }) + } + + return result, nil +}