diff --git a/api/dataimport/for-trigger.go b/api/dataimport/for-trigger.go index af1d0419..878e5b2e 100644 --- a/api/dataimport/for-trigger.go +++ b/api/dataimport/for-trigger.go @@ -21,10 +21,13 @@ package dataimport import ( "fmt" + "os" "github.com/pixlise/core/v3/api/dataimport/internal/datasetArchive" + "github.com/pixlise/core/v3/api/job" "github.com/pixlise/core/v3/core/fileaccess" "github.com/pixlise/core/v3/core/logger" + "github.com/pixlise/core/v3/core/timestamper" protos "github.com/pixlise/core/v3/generated-protos" "go.mongodb.org/mongo-driver/mongo" ) @@ -53,7 +56,13 @@ func ImportForTrigger( db *mongo.Database, log logger.ILogger, remoteFS fileaccess.FileAccess) (ImportResult, error) { - sourceBucket, sourceFilePath, datasetID, _, err := decodeImportTrigger(triggerMessage) + sourceBucket, sourceFilePath, datasetID, jobId, err := decodeImportTrigger(triggerMessage) + + // Report a status so API/users can track what's going on already + logId := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME") + "/" + os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME ") + + ts := timestamper.UnixTimeNowStamper{} + job.UpdateJob(jobId, protos.JobStatus_STARTING, "Starting importer", logId, db, &ts, log) result := ImportResult{ WorkingDir: "", @@ -114,13 +123,18 @@ func ImportForTrigger( return result, err } + job.UpdateJob(jobId, protos.JobStatus_RUNNING, "Importing Files", logId, db, &ts, log) + importedSummary := &protos.ScanItem{} result.WorkingDir, importedSummary, result.WhatChanged, result.IsUpdate, err = ImportDataset(localFS, remoteFS, configBucket, manualBucket, datasetBucket, db, datasetID, log, archived) result.DatasetID = importedSummary.Id result.DatasetTitle = importedSummary.Title if err != nil { + job.UpdateJob(jobId, protos.JobStatus_ERROR, err.Error(), logId, db, &ts, log) log.Errorf("%v", err) + } else { + job.UpdateJob(jobId, protos.JobStatus_COMPLETE, "Imported successfully", logId, db, &ts, log) } // NOTE: We are now passing this responsibility to the caller, because we're very trusting... And they may want diff --git a/api/dataimport/reprocess.go b/api/dataimport/reprocess.go index 37e2d74f..77fc7349 100644 --- a/api/dataimport/reprocess.go +++ b/api/dataimport/reprocess.go @@ -40,7 +40,7 @@ type datasetReprocessSNSRequest struct { } // Decoding trigger message -// Returns: sourceBucket (optional), sourceFilePath (optional), datasetID, logID +// Returns: sourceBucket (optional), sourceFilePath (optional), datasetID, jobId func decodeImportTrigger(triggerMessageBody []byte) (string, string, string, string, error) { datasetID := "" diff --git a/api/job/jobWatcher.go b/api/job/jobWatcher.go index b75305ec..d2abd0b4 100644 --- a/api/job/jobWatcher.go +++ b/api/job/jobWatcher.go @@ -60,7 +60,7 @@ func AddJob(jobTimeoutSec uint32, db *mongo.Database, idgen idgen.IDGenerator, t activeJobs[jobId] = true // Start a thread to watch this job - go watchJob(jobId, now, watchUntilUnixSec, db, logger, sendUpdate) + go watchJob(jobId, now, watchUntilUnixSec, db, logger, ts, sendUpdate) return jobId, nil } @@ -133,7 +133,7 @@ func CompleteJob(jobId string, success bool, message string, outputFilePath stri return nil } -func watchJob(jobId string, nowUnixSec uint32, watchUntilUnixSec uint32, db *mongo.Database, logger logger.ILogger, sendUpdate func(*protos.JobStatus)) { +func watchJob(jobId string, nowUnixSec uint32, watchUntilUnixSec uint32, db *mongo.Database, logger logger.ILogger, ts timestamper.ITimeStamper, sendUpdate func(*protos.JobStatus)) { logger.Infof(">> Start watching job: %v...", jobId) // Check the DB for updates periodically until watchUntilUnixSec at which point if the job isn't @@ -182,4 +182,12 @@ func watchJob(jobId string, nowUnixSec uint32, watchUntilUnixSec uint32, db *mon } logger.Errorf(">> Stop watching TIMED-OUT job: %v", jobId) + sendUpdate(&protos.JobStatus{ + JobId: jobId, + Status: protos.JobStatus_ERROR, + Message: "Timed out while waiting for status", + EndUnixTimeSec: uint32(ts.GetTimeNowSec()), + OutputFilePath: "", + OtherLogFiles: []string{}, + }) }