Skip to content

Commit

Permalink
Importer lambda now updates job status
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Nemere committed Dec 4, 2023
1 parent 12de08e commit ce1e5f0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
16 changes: 15 additions & 1 deletion api/dataimport/for-trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package dataimport

import (
"fmt"
"os"

"github.com/pixlise/core/v3/api/dataimport/internal/datasetArchive"
"github.com/pixlise/core/v3/api/job"
"github.com/pixlise/core/v3/core/fileaccess"
"github.com/pixlise/core/v3/core/logger"
"github.com/pixlise/core/v3/core/timestamper"
protos "github.com/pixlise/core/v3/generated-protos"
"go.mongodb.org/mongo-driver/mongo"
)
Expand Down Expand Up @@ -53,7 +56,13 @@ func ImportForTrigger(
db *mongo.Database,
log logger.ILogger,
remoteFS fileaccess.FileAccess) (ImportResult, error) {
sourceBucket, sourceFilePath, datasetID, _, err := decodeImportTrigger(triggerMessage)
sourceBucket, sourceFilePath, datasetID, jobId, err := decodeImportTrigger(triggerMessage)

// Report a status so API/users can track what's going on already
logId := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME") + "/" + os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME ")

ts := timestamper.UnixTimeNowStamper{}
job.UpdateJob(jobId, protos.JobStatus_STARTING, "Starting importer", logId, db, &ts, log)

result := ImportResult{
WorkingDir: "",
Expand Down Expand Up @@ -114,13 +123,18 @@ func ImportForTrigger(
return result, err
}

job.UpdateJob(jobId, protos.JobStatus_RUNNING, "Importing Files", logId, db, &ts, log)

importedSummary := &protos.ScanItem{}
result.WorkingDir, importedSummary, result.WhatChanged, result.IsUpdate, err = ImportDataset(localFS, remoteFS, configBucket, manualBucket, datasetBucket, db, datasetID, log, archived)
result.DatasetID = importedSummary.Id
result.DatasetTitle = importedSummary.Title

if err != nil {
job.UpdateJob(jobId, protos.JobStatus_ERROR, err.Error(), logId, db, &ts, log)
log.Errorf("%v", err)
} else {
job.UpdateJob(jobId, protos.JobStatus_COMPLETE, "Imported successfully", logId, db, &ts, log)
}

// NOTE: We are now passing this responsibility to the caller, because we're very trusting... And they may want
Expand Down
2 changes: 1 addition & 1 deletion api/dataimport/reprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type datasetReprocessSNSRequest struct {
}

// Decoding trigger message
// Returns: sourceBucket (optional), sourceFilePath (optional), datasetID, logID
// Returns: sourceBucket (optional), sourceFilePath (optional), datasetID, jobId
func decodeImportTrigger(triggerMessageBody []byte) (string, string, string, string, error) {
datasetID := ""

Expand Down
12 changes: 10 additions & 2 deletions api/job/jobWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func AddJob(jobTimeoutSec uint32, db *mongo.Database, idgen idgen.IDGenerator, t
activeJobs[jobId] = true

// Start a thread to watch this job
go watchJob(jobId, now, watchUntilUnixSec, db, logger, sendUpdate)
go watchJob(jobId, now, watchUntilUnixSec, db, logger, ts, sendUpdate)

return jobId, nil
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func CompleteJob(jobId string, success bool, message string, outputFilePath stri
return nil
}

func watchJob(jobId string, nowUnixSec uint32, watchUntilUnixSec uint32, db *mongo.Database, logger logger.ILogger, sendUpdate func(*protos.JobStatus)) {
func watchJob(jobId string, nowUnixSec uint32, watchUntilUnixSec uint32, db *mongo.Database, logger logger.ILogger, ts timestamper.ITimeStamper, sendUpdate func(*protos.JobStatus)) {
logger.Infof(">> Start watching job: %v...", jobId)

// Check the DB for updates periodically until watchUntilUnixSec at which point if the job isn't
Expand Down Expand Up @@ -182,4 +182,12 @@ func watchJob(jobId string, nowUnixSec uint32, watchUntilUnixSec uint32, db *mon
}

logger.Errorf(">> Stop watching TIMED-OUT job: %v", jobId)
sendUpdate(&protos.JobStatus{
JobId: jobId,
Status: protos.JobStatus_ERROR,
Message: "Timed out while waiting for status",
EndUnixTimeSec: uint32(ts.GetTimeNowSec()),
OutputFilePath: "",
OtherLogFiles: []string{},
})
}

0 comments on commit ce1e5f0

Please sign in to comment.