Skip to content

Commit

Permalink
Tweaks to logging for importer, making sure it calls complete for job…
Browse files Browse the repository at this point in the history
…, modified timestamp fields
  • Loading branch information
Peter Nemere committed Dec 5, 2023
1 parent 4dd99bd commit 1c01992
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 297 deletions.
17 changes: 2 additions & 15 deletions api/dataimport/for-trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,7 @@ func ImportForTrigger(
if err != nil {
return result, err
}
/*
// Initialise stuff
sess, err := awsutil.GetSession()
if err != nil {
return result, err
}

if log == nil {
log, err = logger.InitCloudWatchLogger(sess, "/dataset-importer/"+envName, datasetID+"-"+logID, logger.LogDebug, 30, 3)
if err != nil {
return result, err
}
}
*/
// Return the logger...
result.Logger = log

Expand Down Expand Up @@ -131,10 +118,10 @@ func ImportForTrigger(
result.DatasetTitle = importedSummary.Title

if err != nil {
job.UpdateJob(jobId, protos.JobStatus_ERROR, err.Error(), logId, db, &ts, log)
job.CompleteJob(jobId, false, err.Error(), "", []string{}, db, &ts, log)
log.Errorf("%v", err)
} else {
job.UpdateJob(jobId, protos.JobStatus_COMPLETE, "Imported successfully", logId, db, &ts, log)
job.CompleteJob(jobId, true, "Imported successfully", "", []string{}, db, &ts, log)
}

// NOTE: We are now passing this responsibility to the caller, because we're very trusting... And they may want
Expand Down
25 changes: 17 additions & 8 deletions api/ws/handlers/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,29 @@ func HandleLogSetLevelReq(req *protos.LogSetLevelReq, hctx wsHelpers.HandlerCont
return &protos.LogSetLevelResp{LogLevelId: req.LogLevelId}, nil
}

var cloudwatchSvc *cloudwatchlogs.CloudWatchLogs = nil

func fetchLogs(services *services.APIServices, logGroupName string, logStreamName string) ([]*protos.LogLine, error) {
var limit int64 = 10000

result := []*protos.LogLine{}

sess, err := awsutil.GetSession()
if err != nil {
return []*protos.LogLine{}, fmt.Errorf("Failed to create AWS session. Error: %v", err)
if cloudwatchSvc == nil {
sess, err := awsutil.GetSession()
if err != nil {
return result, fmt.Errorf("Failed to create AWS session. Error: %v", err)
}

// NOTE: previously here we used a session: AWSSessionCW which could be configured to a different region... don't know why
// this was required but seemed redundant, it was in the same region lately...
cloudwatchSvc = cloudwatchlogs.New(sess)
}

if cloudwatchSvc == nil {
return result, fmt.Errorf("No connection to cloudwatch")
}

// NOTE: previously here we used a session: AWSSessionCW which could be configured to a different region... don't know why
// this was required but seemed redundant, it was in the same region lately...
svc := cloudwatchlogs.New(sess)
resp, err := svc.GetLogEvents(&cloudwatchlogs.GetLogEventsInput{
resp, err := cloudwatchSvc.GetLogEvents(&cloudwatchlogs.GetLogEventsInput{
Limit: &limit,
LogGroupName: aws.String(logGroupName),
LogStreamName: aws.String(logStreamName),
Expand All @@ -96,7 +105,7 @@ func fetchLogs(services *services.APIServices, logGroupName string, logStreamNam

for _, event := range resp.Events {
result = append(result, &protos.LogLine{
TimeStampUnixMs: uint32(*event.IngestionTime),
TimeStampUnixMs: uint64(*event.IngestionTime),
Message: *event.Message,
})
}
Expand Down
22 changes: 22 additions & 0 deletions api/ws/handlers/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
protos "github.com/pixlise/core/v3/generated-protos"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"google.golang.org/protobuf/proto"
)

func HandleScanListReq(req *protos.ScanListReq, hctx wsHelpers.HandlerContext) (*protos.ScanListResp, error) {
Expand Down Expand Up @@ -213,6 +214,7 @@ func HandleScanTriggerReImportReq(req *protos.ScanTriggerReImportReq, hctx wsHel

i := importUpdater{
hctx.Session,
hctx.Melody,
}

jobId, err := job.AddJob(uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendReimportUpdate)
Expand Down Expand Up @@ -414,6 +416,7 @@ func HandleScanUploadReq(req *protos.ScanUploadReq, hctx wsHelpers.HandlerContex

i := importUpdater{
hctx.Session,
hctx.Melody,
}

// Add a job watcher for this
Expand All @@ -436,6 +439,7 @@ func HandleScanUploadReq(req *protos.ScanUploadReq, hctx wsHelpers.HandlerContex

type importUpdater struct {
session *melody.Session
melody *melody.Melody
}

func (i *importUpdater) sendReimportUpdate(status *protos.JobStatus) {
Expand All @@ -461,4 +465,22 @@ func (i *importUpdater) sendImportUpdate(status *protos.JobStatus) {
}

wsHelpers.SendForSession(i.session, &wsUpd)

// If this is the final complete success message of a scan import, fire off a ScanListUpd to trigger
// anyone who is connected to do a listing of scans
// NOTE: IDEALLY this should happen when the scan notification happens. That process is not yet
// implemented in the "new" way - Lambda completes but still needs to notify all instances of API
// of the notification... For now this should work though
if status.Status == protos.JobStatus_COMPLETE && status.EndUnixTimeSec > 0 {
wsScanListUpd := protos.WSMessage{
Contents: &protos.WSMessage_ScanListUpd{
ScanListUpd: &protos.ScanListUpd{},
},
}

bytes, err := proto.Marshal(&wsScanListUpd)
if err == nil {
i.melody.BroadcastBinary(bytes)
}
}
}
Loading

0 comments on commit 1c01992

Please sign in to comment.