Skip to content

Commit

Permalink
Implemented log fetching and fixed bug in log id generation for impor…
Browse files Browse the repository at this point in the history
…t jobs
  • Loading branch information
Peter Nemere committed Dec 4, 2023
1 parent ce1e5f0 commit 4dd99bd
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 22 deletions.
2 changes: 1 addition & 1 deletion api/dataimport/for-trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func ImportForTrigger(
sourceBucket, sourceFilePath, datasetID, jobId, err := decodeImportTrigger(triggerMessage)

// Report a status so API/users can track what's going on already
logId := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME") + "/" + os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME ")
logId := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME") + "/|/" + os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME")

ts := timestamper.UnixTimeNowStamper{}
job.UpdateJob(jobId, protos.JobStatus_STARTING, "Starting importer", logId, db, &ts, log)
Expand Down
79 changes: 58 additions & 21 deletions api/ws/handlers/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,42 @@ package wsHandler

import (
"errors"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/pixlise/core/v3/api/services"
"github.com/pixlise/core/v3/api/ws/wsHelpers"
"github.com/pixlise/core/v3/core/awsutil"
"github.com/pixlise/core/v3/core/errorwithstatus"
"github.com/pixlise/core/v3/core/logger"
protos "github.com/pixlise/core/v3/generated-protos"
)

func HandleLogReadReq(req *protos.LogReadReq, hctx wsHelpers.HandlerContext) (*protos.LogReadResp, error) {
return nil, errors.New("HandleLogReadReq not implemented yet")
/*
if err := wsHelpers.CheckStringField(&req.LogStreamId, "LogStreamId", 1, 512); err != nil {
return nil, err
}
logGroup := "/dataset-importer/" + hctx.Svcs.Config.EnvironmentName
logs, err := cloudwatch.FetchLogs(hctx.Svcs, logGroup, req.LogStreamId)
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
return nil, errorwithstatus.MakeNotFoundError(req.LogStreamId)
}
}
if err := wsHelpers.CheckStringField(&req.LogStreamId, "LogStreamId", 1, 512); err != nil {
return nil, err
}

for _, logEntry := range logs {
// We now just send the AWS cloudwatch log group+stream id out, so expect this to be directly accessible
bits := strings.Split(req.LogStreamId, "/|/")
if len(bits) != 2 {
return nil, errors.New("Failed to get log group and stream from: " + req.LogStreamId)
}

}
logs, err := fetchLogs(hctx.Svcs, bits[0], bits[1])

// Got it, return it
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
return nil, errorwithstatus.MakeNotFoundError(req.LogStreamId)
}
}

return &protos.LogReadResp{
Entries: entries,
}, nil
*/
return &protos.LogReadResp{
Entries: logs,
}, nil
}

func HandleLogGetLevelReq(req *protos.LogGetLevelReq, hctx wsHelpers.HandlerContext) (*protos.LogGetLevelResp, error) {
Expand Down Expand Up @@ -66,3 +69,37 @@ func HandleLogSetLevelReq(req *protos.LogSetLevelReq, hctx wsHelpers.HandlerCont

return &protos.LogSetLevelResp{LogLevelId: req.LogLevelId}, nil
}

func fetchLogs(services *services.APIServices, logGroupName string, logStreamName string) ([]*protos.LogLine, error) {
var limit int64 = 10000

result := []*protos.LogLine{}

sess, err := awsutil.GetSession()
if err != nil {
return []*protos.LogLine{}, fmt.Errorf("Failed to create AWS session. Error: %v", err)
}

// NOTE: previously here we used a session: AWSSessionCW which could be configured to a different region... don't know why
// this was required but seemed redundant, it was in the same region lately...
svc := cloudwatchlogs.New(sess)
resp, err := svc.GetLogEvents(&cloudwatchlogs.GetLogEventsInput{
Limit: &limit,
LogGroupName: aws.String(logGroupName),
LogStreamName: aws.String(logStreamName),
})

if err != nil {
//log.Errorf("Got error getting log events: %v", err)
return result, err
}

for _, event := range resp.Events {
result = append(result, &protos.LogLine{
TimeStampUnixMs: uint32(*event.IngestionTime),
Message: *event.Message,
})
}

return result, nil
}

0 comments on commit 4dd99bd

Please sign in to comment.