diff --git a/api/dataimport/for-trigger.go b/api/dataimport/for-trigger.go index 8db3209e..f1e861b1 100644 --- a/api/dataimport/for-trigger.go +++ b/api/dataimport/for-trigger.go @@ -75,20 +75,7 @@ func ImportForTrigger( if err != nil { return result, err } - /* - // Initialise stuff - sess, err := awsutil.GetSession() - if err != nil { - return result, err - } - if log == nil { - log, err = logger.InitCloudWatchLogger(sess, "/dataset-importer/"+envName, datasetID+"-"+logID, logger.LogDebug, 30, 3) - if err != nil { - return result, err - } - } - */ // Return the logger... result.Logger = log @@ -131,10 +118,10 @@ func ImportForTrigger( result.DatasetTitle = importedSummary.Title if err != nil { - job.UpdateJob(jobId, protos.JobStatus_ERROR, err.Error(), logId, db, &ts, log) + job.CompleteJob(jobId, false, err.Error(), "", []string{}, db, &ts, log) log.Errorf("%v", err) } else { - job.UpdateJob(jobId, protos.JobStatus_COMPLETE, "Imported successfully", logId, db, &ts, log) + job.CompleteJob(jobId, true, "Imported successfully", "", []string{}, db, &ts, log) } // NOTE: We are now passing this responsibility to the caller, because we're very trusting... And they may want diff --git a/api/ws/handlers/log.go b/api/ws/handlers/log.go index 0d91ac60..df1cee55 100644 --- a/api/ws/handlers/log.go +++ b/api/ws/handlers/log.go @@ -70,20 +70,29 @@ func HandleLogSetLevelReq(req *protos.LogSetLevelReq, hctx wsHelpers.HandlerCont return &protos.LogSetLevelResp{LogLevelId: req.LogLevelId}, nil } +var cloudwatchSvc *cloudwatchlogs.CloudWatchLogs = nil + func fetchLogs(services *services.APIServices, logGroupName string, logStreamName string) ([]*protos.LogLine, error) { var limit int64 = 10000 result := []*protos.LogLine{} - sess, err := awsutil.GetSession() - if err != nil { - return []*protos.LogLine{}, fmt.Errorf("Failed to create AWS session. Error: %v", err) + if cloudwatchSvc == nil { + sess, err := awsutil.GetSession() + if err != nil { + return result, fmt.Errorf("Failed to create AWS session. Error: %v", err) + } + + // NOTE: previously here we used a session: AWSSessionCW which could be configured to a different region... don't know why + // this was required but seemed redundant, it was in the same region lately... + cloudwatchSvc = cloudwatchlogs.New(sess) + } + + if cloudwatchSvc == nil { + return result, fmt.Errorf("No connection to cloudwatch") } - // NOTE: previously here we used a session: AWSSessionCW which could be configured to a different region... don't know why - // this was required but seemed redundant, it was in the same region lately... - svc := cloudwatchlogs.New(sess) - resp, err := svc.GetLogEvents(&cloudwatchlogs.GetLogEventsInput{ + resp, err := cloudwatchSvc.GetLogEvents(&cloudwatchlogs.GetLogEventsInput{ Limit: &limit, LogGroupName: aws.String(logGroupName), LogStreamName: aws.String(logStreamName), @@ -96,7 +105,7 @@ func fetchLogs(services *services.APIServices, logGroupName string, logStreamNam for _, event := range resp.Events { result = append(result, &protos.LogLine{ - TimeStampUnixMs: uint32(*event.IngestionTime), + TimeStampUnixMs: uint64(*event.IngestionTime), Message: *event.Message, }) } diff --git a/api/ws/handlers/scan.go b/api/ws/handlers/scan.go index d1108a46..159ddc4e 100644 --- a/api/ws/handlers/scan.go +++ b/api/ws/handlers/scan.go @@ -23,6 +23,7 @@ import ( protos "github.com/pixlise/core/v3/generated-protos" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" + "google.golang.org/protobuf/proto" ) func HandleScanListReq(req *protos.ScanListReq, hctx wsHelpers.HandlerContext) (*protos.ScanListResp, error) { @@ -213,6 +214,7 @@ func HandleScanTriggerReImportReq(req *protos.ScanTriggerReImportReq, hctx wsHel i := importUpdater{ hctx.Session, + hctx.Melody, } jobId, err := job.AddJob(uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendReimportUpdate) @@ -414,6 +416,7 @@ func HandleScanUploadReq(req *protos.ScanUploadReq, hctx wsHelpers.HandlerContex i := importUpdater{ hctx.Session, + hctx.Melody, } // Add a job watcher for this @@ -436,6 +439,7 @@ func HandleScanUploadReq(req *protos.ScanUploadReq, hctx wsHelpers.HandlerContex type importUpdater struct { session *melody.Session + melody *melody.Melody } func (i *importUpdater) sendReimportUpdate(status *protos.JobStatus) { @@ -461,4 +465,22 @@ func (i *importUpdater) sendImportUpdate(status *protos.JobStatus) { } wsHelpers.SendForSession(i.session, &wsUpd) + + // If this is the final complete success message of a scan import, fire off a ScanListUpd to trigger + // anyone who is connected to do a listing of scans + // NOTE: IDEALLY this should happen when the scan notification happens. That process is not yet + // implemented in the "new" way - Lambda completes but still needs to notify all instances of API + // of the notification... For now this should work though + if status.Status == protos.JobStatus_COMPLETE && status.EndUnixTimeSec > 0 { + wsScanListUpd := protos.WSMessage{ + Contents: &protos.WSMessage_ScanListUpd{ + ScanListUpd: &protos.ScanListUpd{}, + }, + } + + bytes, err := proto.Marshal(&wsScanListUpd) + if err == nil { + i.melody.BroadcastBinary(bytes) + } + } } diff --git a/core/logger/log-cloudwatch.go b/core/logger/log-cloudwatch.go deleted file mode 100644 index b0111ad3..00000000 --- a/core/logger/log-cloudwatch.go +++ /dev/null @@ -1,270 +0,0 @@ -// Licensed to NASA JPL under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. NASA JPL licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package logger - -import ( - "fmt" - "log" - "sync" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/cloudwatchlogs" -) - -// This is heavily based on: github.com/jcxplorer/cwlogger and github.com/mathisve/golang-cloudwatch-logs-example - -// CloudwatchLogger - Structure holding API logger internals -type CloudwatchLogger struct { - cwClient *cloudwatchlogs.CloudWatchLogs - logGroupName string - logStreamName string - logLevel LogLevel - sequenceToken string - queue []string - lock sync.Mutex - retentionDays int - logIntervalSec time.Duration - running bool -} - -// InitCloudWatchLogger - initialises the logger, given settings and AWS session -func InitCloudWatchLogger(sess *session.Session, logGroupName string, logStreamName string, logLevel LogLevel, retentionDays int, logIntervalSec time.Duration) (ILogger, error) { - result := CloudwatchLogger{ - cwClient: cloudwatchlogs.New(sess), - logGroupName: logGroupName, - logStreamName: logStreamName, - logLevel: logLevel, - sequenceToken: "", - queue: []string{}, - lock: sync.Mutex{}, - retentionDays: retentionDays, - logIntervalSec: logIntervalSec, - running: true, - } - /* - tok := "" - totalDel := 0 - for i := 0; i < 100; i++ { - var err error - var delcount int - tok, delcount, err = result.deleteOldGroups("/aws/lambda/", tok) - if err != nil { - return &result, err - } - totalDel += delcount - } - */ - // Make sure log group exists - err := result.ensureLogGroupExists(logGroupName, int64(retentionDays)) - if err != nil { - return &result, err - } - - // Now open the log stream - go result.processQueue(logIntervalSec) - - return &result, nil -} - -func (l *CloudwatchLogger) Close() { - // Sleep this thread for long enough that the other thread pumps any messages left in its queue to cloudwatch - time.Sleep(time.Second * l.logIntervalSec * 2) -} - -/* -func (l *CloudwatchLogger) deleteOldGroups(prefix string, contToken string) (string, int, error) { - - in := cloudwatchlogs.DescribeLogGroupsInput{ - Limit: aws.Int64(50), - } - if len(contToken) > 0 { - in.NextToken = aws.String(contToken) - } - - resp, err := l.cwClient.DescribeLogGroups(&in) - if err != nil { - return "", 0, err - } - - delcount := 0 - for _, logGroup := range resp.LogGroups { - if strings.HasPrefix(*logGroup.LogGroupName, prefix) { - _, err := l.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ - LogGroupName: logGroup.LogGroupName, - }) - if err != nil { - fmt.Printf("DeleteLogGroup %v failed: %v\n", *logGroup.LogGroupName, err) - } else { - delcount++ - } - } - } - - return *resp.NextToken, delcount, nil - } -*/ -func (l *CloudwatchLogger) ensureLogGroupExists(name string, retentionDays int64) error { - _, err := l.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ - LogGroupName: aws.String(name), - }) - - if err != nil { - // If it already exists, don't fail! - if aerr, ok := err.(awserr.Error); ok { - if aerr.Message() == "The specified log group already exists" { - return nil - } - } - return err - } - - _, err = l.cwClient.PutRetentionPolicy(&cloudwatchlogs.PutRetentionPolicyInput{ - RetentionInDays: aws.Int64(retentionDays), - LogGroupName: aws.String(name), - }) - - return err -} - -func (l *CloudwatchLogger) createLogStream(name string) error { - // Ensure it exists here because it may be deleted at runtime from cloudwatch, if we're creating or re-creating - // our log stream, it's good to know that the group is there - err := l.ensureLogGroupExists(l.logGroupName, int64(l.retentionDays)) - if err != nil { - return err - } - - _, err = l.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ - LogGroupName: aws.String(l.logGroupName), - LogStreamName: aws.String(name), - }) - - return err -} - -// processQueue will process the log queue -func (l *CloudwatchLogger) processQueue(logIntervalSec time.Duration) error { - var logQueue []*cloudwatchlogs.InputLogEvent - - for l.running { - l.lock.Lock() - if len(l.queue) > 0 { - for _, item := range l.queue { - logQueue = append(logQueue, &cloudwatchlogs.InputLogEvent{ - Message: aws.String(item), - Timestamp: aws.Int64(time.Now().UnixNano() / int64(time.Millisecond)), - }) - } - - l.queue = []string{} - } - - l.lock.Unlock() - - if len(logQueue) > 0 { - input := cloudwatchlogs.PutLogEventsInput{ - LogEvents: logQueue, - LogGroupName: aws.String(l.logGroupName), - } - - if l.sequenceToken == "" { - err := l.createLogStream(l.logStreamName) - if err != nil { - // Write to stderr - log.Printf("createLogStream failed: %v", err) - } - } else { - input = *input.SetSequenceToken(l.sequenceToken) - } - - input = *input.SetLogStreamName(l.logStreamName) - - resp, err := l.cwClient.PutLogEvents(&input) - if err != nil { - // Write to stderr - log.Printf("PutLogEvents failed: %v", err) - } - - if resp != nil { - if resp.NextSequenceToken != nil { - l.sequenceToken = *resp.NextSequenceToken - } else { - l.sequenceToken = "" - } - } - - logQueue = []*cloudwatchlogs.InputLogEvent{} - } - - time.Sleep(time.Second * logIntervalSec) - } - - // Might be useful seeing this on shutdown... - fmt.Println("logger processQueue complete") - return nil -} - -// Log enqueues a log message to be written to a log stream. -// -// The log message must be less than 1,048,550 bytes, and the time must not be -// more than 2 hours in the future, 14 days in the past, or older than the -// retention period of the log group. -// -// This method is safe for concurrent access by multiple goroutines. -func (l *CloudwatchLogger) Printf(level LogLevel, format string, a ...interface{}) { - // If we're not on this log level, skip - if l.logLevel > level { - return - } - - txt := logLevelPrefix[level] + ": " + fmt.Sprintf(format, a...) - - defer l.lock.Unlock() - l.lock.Lock() - - // Add to the log queue - l.queue = append(l.queue, txt) - - // Also write to local stdout - log.Println(txt) -} - -// Debugf - Print debug to log, with format string -func (l *CloudwatchLogger) Debugf(format string, a ...interface{}) { - l.Printf(LogDebug, format, a...) -} - -// Infof - Print info to log, with format string -func (l *CloudwatchLogger) Infof(format string, a ...interface{}) { - l.Printf(LogInfo, format, a...) -} - -// Errorf - Print error to log, with format string -func (l *CloudwatchLogger) Errorf(format string, a ...interface{}) { - l.Printf(LogError, format, a...) -} - -func (l *CloudwatchLogger) SetLogLevel(level LogLevel) { - l.logLevel = level -} -func (l *CloudwatchLogger) GetLogLevel() LogLevel { - return l.logLevel -} diff --git a/data-formats b/data-formats index e05324b9..633bcb88 160000 --- a/data-formats +++ b/data-formats @@ -1 +1 @@ -Subproject commit e05324b9ec522241787c4b57e8bb357dd1b6ee55 +Subproject commit 633bcb8862e7479e3ec56a4f01f7c254ea3dfa4f diff --git a/generated-protos/log.pb.go b/generated-protos/log.pb.go index 502d15e6..5dfd9f68 100644 --- a/generated-protos/log.pb.go +++ b/generated-protos/log.pb.go @@ -25,7 +25,7 @@ type LogLine struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TimeStampUnixMs uint32 `protobuf:"varint,1,opt,name=timeStampUnixMs,proto3" json:"timeStampUnixMs,omitempty"` + TimeStampUnixMs uint64 `protobuf:"varint,1,opt,name=timeStampUnixMs,proto3" json:"timeStampUnixMs,omitempty"` Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` } @@ -61,7 +61,7 @@ func (*LogLine) Descriptor() ([]byte, []int) { return file_log_proto_rawDescGZIP(), []int{0} } -func (x *LogLine) GetTimeStampUnixMs() uint32 { +func (x *LogLine) GetTimeStampUnixMs() uint64 { if x != nil { return x.TimeStampUnixMs } @@ -80,7 +80,7 @@ var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a, 0x07, 0x4c, 0x6f, 0x67, 0x4c, 0x69, 0x6e, 0x65, 0x12, 0x28, 0x0a, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, - 0x61, 0x6d, 0x70, 0x55, 0x6e, 0x69, 0x78, 0x4d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x61, 0x6d, 0x70, 0x55, 0x6e, 0x69, 0x78, 0x4d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x6d, 0x70, 0x55, 0x6e, 0x69, 0x78, 0x4d, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x3b,