Skip to content

Commit

Permalink
Merge pull request #246 from pixlise/feature/quant-tracker
Browse files Browse the repository at this point in the history
Feature/quant tracker
  • Loading branch information
pnemere authored Jun 25, 2024
2 parents b5e6cd1 + 3983e65 commit 3a393ec
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 20 deletions.
2 changes: 1 addition & 1 deletion api/coreg/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func StartCoregImport(triggerUrl string, hctx wsHelpers.HandlerContext) (string,

// Start an image coreg import job (this is a Lambda function)
// Once it completes, we have the data we need, so we can treat it as a "normal" image importing task
jobStatus, err := job.AddJob("coreg", protos.JobStatus_JT_IMPORT_IMAGE, triggerUrl, uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendUpdate)
jobStatus, err := job.AddJob("coreg", hctx.SessUser.User.Id, protos.JobStatus_JT_IMPORT_IMAGE, triggerUrl, fmt.Sprintf("Import: %v", path.Base(triggerUrl)), []string{}, uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendUpdate)
jobId := ""
if jobStatus != nil {
jobId = jobStatus.JobId
Expand Down
2 changes: 2 additions & 0 deletions api/dataimport/for-trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pixlise/core/v4/api/dataimport/internal/datasetArchive"
"github.com/pixlise/core/v4/api/dbCollections"
"github.com/pixlise/core/v4/api/job"
"github.com/pixlise/core/v4/api/specialUserIds"
"github.com/pixlise/core/v4/core/fileaccess"
"github.com/pixlise/core/v4/core/logger"
"github.com/pixlise/core/v4/core/timestamper"
Expand Down Expand Up @@ -183,6 +184,7 @@ func completeJobState(jobId string, success bool, scanId string, message string,
EndUnixTimeSec: now,
OutputFilePath: outputFilePath,
OtherLogFiles: otherLogFiles,
RequestorUserId: specialUserIds.PIXLISESystemUserId, // We don't have a requestor ID to write, we're an auto import
}

insertResult, err := coll.InsertOne(ctx, jobStatus, opt)
Expand Down
17 changes: 16 additions & 1 deletion api/job/jobWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@ var activeJobLock = sync.Mutex{}
// Expected to be called by API to create the initial record of a job. It can then trigger it however it needs to
// (eg AWS lambda or running PIQUANT nodes) and this sticks around monitoring the DB entry for changes, calling
// the sendUpdate callback function on change. Returns the snapshot of the "added" job that was saved
func AddJob(idPrefix string, jobType protos.JobStatus_JobType, jobItemId string, jobTimeoutSec uint32, db *mongo.Database, idgen idgen.IDGenerator, ts timestamper.ITimeStamper, logger logger.ILogger, sendUpdate func(*protos.JobStatus)) (*protos.JobStatus, error) {

func AddJob(
idPrefix string,
requestorUserId string,
jobType protos.JobStatus_JobType,
jobItemId string,
jobName string,
elementList []string, // optional, only set if it's a quant!
jobTimeoutSec uint32,
db *mongo.Database,
idgen idgen.IDGenerator,
ts timestamper.ITimeStamper,
logger logger.ILogger,
sendUpdate func(*protos.JobStatus)) (*protos.JobStatus, error) {
// Generate a new job Id that this job will write to
// which we also return to the caller, so they can track what happens
// with this async task
Expand All @@ -40,6 +53,8 @@ func AddJob(idPrefix string, jobType protos.JobStatus_JobType, jobItemId string,
OtherLogFiles: []string{},
JobType: jobType,
JobItemId: jobItemId,
Name: jobName,
Elements: elementList,
}

if _, ok := activeJobs[jobId]; ok {
Expand Down
10 changes: 10 additions & 0 deletions api/job/statusChanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func UpdateJob(jobId string, status protos.JobStatus_Status, message string, log
logger.Errorf("Failed to read existing job status when writing UpdateJob %v: %v", jobId, err)
} else {
jobStatus.StartUnixTimeSec = existingStatus.StartUnixTimeSec
jobStatus.JobType = existingStatus.JobType
jobStatus.JobItemId = existingStatus.JobItemId
jobStatus.RequestorUserId = existingStatus.RequestorUserId
jobStatus.Name = existingStatus.Name
jobStatus.Elements = existingStatus.Elements
}

replaceResult, err := coll.ReplaceOne(ctx, filter, jobStatus, opt)
Expand Down Expand Up @@ -86,6 +91,11 @@ func CompleteJob(jobId string, success bool, message string, outputFilePath stri
} else {
jobStatus.LogId = existingStatus.LogId
jobStatus.StartUnixTimeSec = existingStatus.StartUnixTimeSec
jobStatus.JobType = existingStatus.JobType
jobStatus.JobItemId = existingStatus.JobItemId
jobStatus.RequestorUserId = existingStatus.RequestorUserId
jobStatus.Name = existingStatus.Name
jobStatus.Elements = existingStatus.Elements
}

replaceResult, err := coll.ReplaceOne(ctx, filter, jobStatus, opt)
Expand Down
19 changes: 12 additions & 7 deletions api/quantification/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func CreateJob(createParams *protos.QuantCreateParams, requestorUserId string, s
// Make the name and ID the same, and start with something that stands out
jobId = fmt.Sprintf("cmd-%v-%s", createParams.Command, svcs.IDGen.GenObjectID())
} else {
jobStatus, err = job.AddJob("quant", protos.JobStatus_JT_RUN_QUANT, "", uint32(svcs.Config.ImportJobMaxTimeSec), svcs.MongoDB, svcs.IDGen, svcs.TimeStamper, svcs.Log, sendUpdate)
jobStatus, err = job.AddJob("quant", requestorUserId, protos.JobStatus_JT_RUN_QUANT, "", createParams.Name, createParams.Elements, uint32(svcs.Config.ImportJobMaxTimeSec), svcs.MongoDB, svcs.IDGen, svcs.TimeStamper, svcs.Log, sendUpdate)
if jobStatus != nil {
jobId = jobStatus.JobId
}
Expand Down Expand Up @@ -378,12 +378,17 @@ func (r *quantNodeRunner) triggerPiquantNodes(wg *sync.WaitGroup) {
Params: r.quantStartSettings,
Elements: elements,
Status: &protos.JobStatus{
JobId: r.jobId,
Status: protos.JobStatus_COMPLETE,
Message: completeMsg,
EndUnixTimeSec: uint32(now),
OutputFilePath: quantOutPath,
OtherLogFiles: piquantLogList,
JobId: r.jobId,
JobItemId: r.jobId,
Status: protos.JobStatus_COMPLETE,
Message: completeMsg,
StartUnixTimeSec: r.quantStartSettings.StartUnixTimeSec,
EndUnixTimeSec: uint32(now),
OutputFilePath: quantOutPath,
OtherLogFiles: piquantLogList,
Name: r.quantStartSettings.UserParams.Name,
Elements: r.quantStartSettings.UserParams.Elements,
RequestorUserId: r.quantStartSettings.RequestorUserId,
},
}

Expand Down
16 changes: 10 additions & 6 deletions api/quantification/importCSV.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,16 @@ func ImportQuantCSV(
},
Elements: elements,
Status: &protos.JobStatus{
JobId: quantId,
Status: protos.JobStatus_COMPLETE,
Message: csvOrigin + " quantification processed",
OtherLogFiles: []string{},
EndUnixTimeSec: uint32(ownerItem.CreatedUnixSec),
OutputFilePath: quantOutPath,
JobId: quantId,
JobItemId: quantId,
Status: protos.JobStatus_COMPLETE,
Message: csvOrigin + " quantification processed",
OtherLogFiles: []string{},
EndUnixTimeSec: uint32(ownerItem.CreatedUnixSec),
OutputFilePath: quantOutPath,
StartUnixTimeSec: uint32(ownerItem.CreatedUnixSec),
Name: quantName,
Elements: elements,
},
}

Expand Down
48 changes: 45 additions & 3 deletions api/ws/handlers/job.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,53 @@
package wsHandler

import (
"errors"
protos "github.com/pixlise/core/v4/generated-protos"
"context"

"github.com/pixlise/core/v4/api/dbCollections"
"github.com/pixlise/core/v4/api/ws/wsHelpers"
protos "github.com/pixlise/core/v4/generated-protos"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)

func HandleJobListReq(req *protos.JobListReq, hctx wsHelpers.HandlerContext) (*protos.JobListResp, error) {
return nil, errors.New("HandleJobListReq not implemented yet")
// Work out if requestor is an admin or a normal user
isAdmin := hctx.SessUser.Permissions["PIXLISE_ADMIN"]

/*filter, _, err := wsHelpers.MakeFilter(req.SearchParams, false, protos.ObjectType_OT_QUANTIFICATION, hctx)
if err != nil {
return nil, err
}*/
filter := bson.M{}

ctx := context.TODO()
coll := hctx.Svcs.MongoDB.Collection(dbCollections.JobStatusName)
opts := options.Find()

cursor, err := coll.Find(ctx, filter, opts)
if err != nil {
return nil, err
}

items := []*protos.JobStatus{}
err = cursor.All(ctx, &items)
if err != nil {
return nil, err
}

itemsToSend := []*protos.JobStatus{}
if isAdmin {
itemsToSend = items
} else {
// Find only the jobs that were requested by this user
for _, item := range items {
if item.RequestorUserId == hctx.SessUser.User.Id {
itemsToSend = append(itemsToSend, item)
}
}
}

return &protos.JobListResp{
Jobs: itemsToSend,
}, nil
}
6 changes: 6 additions & 0 deletions api/ws/handlers/quantification-retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func HandleQuantGetReq(req *protos.QuantGetReq, hctx wsHelpers.HandlerContext) (
}
}

// We seem to have some old quants where the status struct says start time was 0, but there is another start time in quant params, so
// substitute a non-zero value in this case
if dbItem.Status.StartUnixTimeSec == 0 && dbItem.Params.StartUnixTimeSec > 0 {
dbItem.Status.StartUnixTimeSec = dbItem.Params.StartUnixTimeSec
}

dbItem.Owner = wsHelpers.MakeOwnerSummary(ownerItem, hctx.SessUser, hctx.Svcs.MongoDB, hctx.Svcs.TimeStamper)

return &protos.QuantGetResp{
Expand Down
4 changes: 2 additions & 2 deletions api/ws/handlers/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func HandleScanTriggerReImportReq(req *protos.ScanTriggerReImportReq, hctx wsHel
hctx.Svcs.MongoDB,
}

jobStatus, err := job.AddJob("reimport", protos.JobStatus_JT_REIMPORT_SCAN, req.ScanId, uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendReimportUpdate)
jobStatus, err := job.AddJob("reimport", hctx.SessUser.User.Id, protos.JobStatus_JT_REIMPORT_SCAN, req.ScanId, fmt.Sprintf("Reimport: %v", req.ScanId), []string{}, uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendReimportUpdate)
jobId := ""
if jobStatus != nil {
jobId = jobStatus.JobId
Expand Down Expand Up @@ -549,7 +549,7 @@ func HandleScanUploadReq(req *protos.ScanUploadReq, hctx wsHelpers.HandlerContex
}

// Add a job watcher for this
jobStatus, err := job.AddJob("import", protos.JobStatus_JT_IMPORT_SCAN, datasetID, uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendImportUpdate)
jobStatus, err := job.AddJob("import", hctx.SessUser.User.Id, protos.JobStatus_JT_IMPORT_SCAN, datasetID, fmt.Sprintf("Import: %v", datasetID), []string{}, uint32(hctx.Svcs.Config.ImportJobMaxTimeSec), hctx.Svcs.MongoDB, hctx.Svcs.IDGen, hctx.Svcs.TimeStamper, hctx.Svcs.Log, i.sendImportUpdate)
jobId := ""
if jobStatus != nil {
jobId = jobStatus.JobId
Expand Down

0 comments on commit 3a393ec

Please sign in to comment.