Skip to content

Commit

Permalink
Compiles again after changing Params field of QuantCreateParams to Us…
Browse files Browse the repository at this point in the history
…erParams just to have less .Params.Params going on
  • Loading branch information
Peter Nemere committed Dec 7, 2023
1 parent 071aad5 commit 76d2f57
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 47 deletions.
67 changes: 34 additions & 33 deletions api/quantification/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func CreateJob(createParams *protos.QuantCreateParams, requestorUserId string, h

// Set up starting parameters
params := &protos.QuantStartingParameters{
Params: createParams,
UserParams: createParams,
PmcCount: uint32(len(createParams.Pmcs)),
ScanFilePath: scanFilePath,
DataBucket: svcs.Config.DatasetsBucket,
Expand Down Expand Up @@ -112,17 +112,18 @@ func CreateJob(createParams *protos.QuantCreateParams, requestorUserId string, h
}

// This should be triggered as a go routine from quant creation endpoint so we can return a job id there quickly and do the processing offline
func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, hctx wsHelpers.HandlerContext, wg *sync.WaitGroup) {
func triggerPiquantNodes(jobId string, quantStartSettings *protos.QuantStartingParameters, hctx wsHelpers.HandlerContext, wg *sync.WaitGroup) {
defer wg.Done()

svcs := hctx.Svcs
userParams := quantStartSettings.UserParams

// TODO: figure out log id!
logId := jobId
job.UpdateJob(jobId, protos.JobStatus_PREPARING_NODES, "", logId, svcs.MongoDB, svcs.TimeStamper, svcs.Log)

jobRoot := filepaths.GetJobDataPath(params.Params.ScanId, "", "")
jobDataPath := filepaths.GetJobDataPath(params.Params.ScanId, jobId, "")
jobRoot := filepaths.GetJobDataPath(userParams.ScanId, "", "")
jobDataPath := filepaths.GetJobDataPath(userParams.ScanId, jobId, "")

// Get quant runner interface
runner, err := quantRunner.GetQuantRunner(svcs.Config.QuantExecutor)
Expand All @@ -131,10 +132,10 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
return
}

job.UpdateJob(jobId, protos.JobStatus_PREPARING_NODES, fmt.Sprintf("Cores/Node: %v", params.CoresPerNode), logId, svcs.MongoDB, svcs.TimeStamper, svcs.Log)
job.UpdateJob(jobId, protos.JobStatus_PREPARING_NODES, fmt.Sprintf("Cores/Node: %v", quantStartSettings.CoresPerNode), logId, svcs.MongoDB, svcs.TimeStamper, svcs.Log)

datasetFileName := path.Base(params.ScanFilePath)
datasetPathOnly := path.Dir(params.ScanFilePath)
datasetFileName := path.Base(quantStartSettings.ScanFilePath)
datasetPathOnly := path.Dir(quantStartSettings.ScanFilePath)

// Gather required params (these are static, same data passed to each node)
piquantParams := quantRunner.PiquantParams{
Expand All @@ -144,17 +145,17 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
DatasetPath: datasetPathOnly,
// NOTE: not using path.Join because we want this as / deliberately, this is being
// saved in a config file that runs in docker/linux
DetectorConfig: filepaths.RootDetectorConfig + "/" + params.Params.DetectorConfig + "/",
Elements: params.Params.Elements,
Parameters: fmt.Sprintf("%v -t,%v", params.Params.Parameters, params.CoresPerNode),
DetectorConfig: filepaths.RootDetectorConfig + "/" + userParams.DetectorConfig + "/",
Elements: userParams.Elements,
Parameters: fmt.Sprintf("%v -t,%v", userParams.Parameters, quantStartSettings.CoresPerNode),
//DatasetsBucket: params.DatasetsBucket,
//ConfigBucket: params.ConfigBucket,
DatasetsBucket: svcs.Config.DatasetsBucket,
ConfigBucket: svcs.Config.ConfigBucket,
PiquantJobsBucket: params.PiquantJobsBucket,
QuantName: params.Params.Name,
PiquantJobsBucket: quantStartSettings.PiquantJobsBucket,
QuantName: userParams.Name,
PMCListName: "", // PMC List Name will be filled in later
Command: params.Params.Command,
Command: userParams.Command,
}

piquantParamsStr, err := json.MarshalIndent(piquantParams, "", utils.PrettyPrintIndentForJSON)
Expand All @@ -164,8 +165,8 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h

// Generate the lists, and then save each, and start the quantification
// NOTE: empty == combined, just to honor the previous mode of operation before quantMode field was added
combined := params.Params.QuantMode == "" || params.Params.QuantMode == quantModeCombinedABBulk || params.Params.QuantMode == quantModeCombinedAB
quantByROI := params.Params.QuantMode == quantModeCombinedABBulk || params.Params.QuantMode == quantModeSeparateABBulk || params.Params.Command != "map"
combined := userParams.QuantMode == "" || userParams.QuantMode == quantModeCombinedABBulk || userParams.QuantMode == quantModeCombinedAB
quantByROI := userParams.QuantMode == quantModeCombinedABBulk || userParams.QuantMode == quantModeSeparateABBulk || userParams.Command != "map"

// If we're quantifying ROIs, do that
pmcFiles := []string{}
Expand All @@ -174,17 +175,17 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
rois := []roiItemWithPMCs{}

// Download the dataset itself because we'll need it to generate our .pmcs files for each node to run
dataset, err := wsHelpers.ReadDatasetFile(params.Params.ScanId, svcs)
dataset, err := wsHelpers.ReadDatasetFile(userParams.ScanId, svcs)
if err != nil {
job.CompleteJob(jobId, false, fmt.Sprintf("Error: %v", err), "", []string{}, svcs.MongoDB, svcs.TimeStamper, svcs.Log)
return
}
if quantByROI {
pmcFile := ""
pmcFile, spectraPerNode, rois, err = makePMCListFilesForQuantROI(hctx, combined, svcs.Config, datasetFileName, jobDataPath, params, dataset)
pmcFile, spectraPerNode, rois, err = makePMCListFilesForQuantROI(hctx, combined, svcs.Config, datasetFileName, jobDataPath, quantStartSettings, dataset)
pmcFiles = []string{pmcFile}
} else {
pmcFiles, spectraPerNode, err = makePMCListFilesForQuantPMCs(svcs, combined, svcs.Config, datasetFileName, jobDataPath, params, dataset)
pmcFiles, spectraPerNode, err = makePMCListFilesForQuantPMCs(svcs, combined, svcs.Config, datasetFileName, jobDataPath, quantStartSettings, dataset)
}

if err != nil {
Expand All @@ -196,33 +197,33 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
job.UpdateJob(jobId, protos.JobStatus_RUNNING, fmt.Sprintf("Node count: %v, Spectra/Node: %v", len(pmcFiles), spectraPerNode), logId, svcs.MongoDB, svcs.TimeStamper, svcs.Log)

// Run piquant job(s)
runner.RunPiquant(params.PIQUANTVersion, piquantParams, pmcFiles, svcs.Config, params.RequestorUserId, svcs.Log)
runner.RunPiquant(quantStartSettings.PIQUANTVersion, piquantParams, pmcFiles, svcs.Config, quantStartSettings.RequestorUserId, svcs.Log)

// Generate the output path for all generated data files & logs
quantOutPath := filepaths.GetQuantPath(params.RequestorUserId, params.Params.ScanId, "")
quantOutPath := filepaths.GetQuantPath(quantStartSettings.RequestorUserId, userParams.ScanId, "")

outputCSVName := ""
outputCSVBytes := []byte{}
outputCSV := ""

piquantLogList := []string{}

if params.Params.Command == "map" {
if userParams.Command == "map" {
job.UpdateJob(jobId, protos.JobStatus_GATHERING_RESULTS, fmt.Sprintf("Combining CSVs from %v nodes...", len(pmcFiles)), logId, svcs.MongoDB, svcs.TimeStamper, svcs.Log)

// Gather log files straight away, we want any status updates to include the logs!
piquantLogList, err = copyAllLogs(
svcs.FS,
svcs.Log,
params.PiquantJobsBucket,
quantStartSettings.PiquantJobsBucket,
jobDataPath,
svcs.Config.UsersBucket,
path.Join(quantOutPath, filepaths.MakeQuantLogDirName(jobId)),
jobId,
)

// Now we can combine the outputs from all runners
csvTitleRow := fmt.Sprintf("PIQUANT version: %v DetectorConfig: %v", params.PIQUANTVersion, params.Params.DetectorConfig)
csvTitleRow := fmt.Sprintf("PIQUANT version: %v DetectorConfig: %v", quantStartSettings.PIQUANTVersion, userParams.DetectorConfig)
err = nil

// Again, if we're in ROI mode, we act differently
Expand Down Expand Up @@ -267,21 +268,21 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
csvOutPath := path.Join(jobRoot, jobId, "output", outputCSVName)
svcs.FS.WriteObject(svcs.Config.PiquantJobsBucket, csvOutPath, outputCSVBytes)

if params.Params.Command != "map" {
if userParams.Command != "map" {
// Map commands are more complicated, where they generate status and summaries, the csv, and the protobuf bin version of the csv, etc
// but all other commands are far simpler.

// Clear the previously written files
csvUserFilePath := filepaths.GetUserLastPiquantOutputPath(params.RequestorUserId, params.Params.ScanId, params.Params.Command, filepaths.QuantLastOutputFileName+".csv")
userLogFilePath := filepaths.GetUserLastPiquantOutputPath(params.RequestorUserId, params.Params.ScanId, params.Params.Command, filepaths.QuantLastOutputLogName)
csvUserFilePath := filepaths.GetUserLastPiquantOutputPath(quantStartSettings.RequestorUserId, userParams.ScanId, userParams.Command, filepaths.QuantLastOutputFileName+".csv")
userLogFilePath := filepaths.GetUserLastPiquantOutputPath(quantStartSettings.RequestorUserId, userParams.ScanId, userParams.Command, filepaths.QuantLastOutputLogName)

err = svcs.FS.DeleteObject(svcs.Config.UsersBucket, csvUserFilePath)
if err != nil {
svcs.Log.Errorf("Failed to delete previous piquant output for command %v from s3://%v/%v. Error: %v", params.Params.Command, svcs.Config.UsersBucket, csvUserFilePath, err)
svcs.Log.Errorf("Failed to delete previous piquant output for command %v from s3://%v/%v. Error: %v", userParams.Command, svcs.Config.UsersBucket, csvUserFilePath, err)
}
err = svcs.FS.DeleteObject(svcs.Config.UsersBucket, userLogFilePath)
if err != nil {
svcs.Log.Errorf("Failed to delete previous piquant log for command %v from s3://%v/%v. Error: %v", params.Params.Command, svcs.Config.UsersBucket, userLogFilePath, err)
svcs.Log.Errorf("Failed to delete previous piquant log for command %v from s3://%v/%v. Error: %v", userParams.Command, svcs.Config.UsersBucket, userLogFilePath, err)
}

// Upload the results file to the user bucket spot
Expand Down Expand Up @@ -327,8 +328,8 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
}

// Figure out file paths
binFilePath := filepaths.GetQuantPath(params.RequestorUserId, params.Params.ScanId, filepaths.MakeQuantDataFileName(jobId))
csvFilePath := filepaths.GetQuantPath(params.RequestorUserId, params.Params.ScanId, filepaths.MakeQuantCSVFileName(jobId))
binFilePath := filepaths.GetQuantPath(quantStartSettings.RequestorUserId, userParams.ScanId, filepaths.MakeQuantDataFileName(jobId))
csvFilePath := filepaths.GetQuantPath(quantStartSettings.RequestorUserId, userParams.ScanId, filepaths.MakeQuantCSVFileName(jobId))

// Save bin quant to S3
err = svcs.FS.WriteObject(svcs.Config.UsersBucket, binFilePath, binFileBytes)
Expand All @@ -351,7 +352,7 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
// Save ownership item for this quant
now := svcs.TimeStamper.GetTimeNowSec()
coll = svcs.MongoDB.Collection(dbCollections.OwnershipName)
ownerItem, err := wsHelpers.MakeOwnerForWrite(jobId, protos.ObjectType_OT_QUANTIFICATION, params.RequestorUserId, now)
ownerItem, err := wsHelpers.MakeOwnerForWrite(jobId, protos.ObjectType_OT_QUANTIFICATION, quantStartSettings.RequestorUserId, now)
_, err = coll.InsertOne(ctx, ownerItem, options.InsertOne())
if err != nil {
job.CompleteJob(jobId, false, fmt.Sprintf("Failed to write ownership item to DB: %v", err), quantOutPath, piquantLogList, svcs.MongoDB, svcs.TimeStamper, svcs.Log)
Expand All @@ -365,8 +366,8 @@ func triggerPiquantNodes(jobId string, params *protos.QuantStartingParameters, h
// Finally, write a DB entry summarising our quant and ownership info
summary := &protos.QuantificationSummary{
Id: jobId,
ScanId: params.Params.ScanId,
Params: params,
ScanId: userParams.ScanId,
Params: quantStartSettings,
Elements: elements,
Status: &protos.JobStatus{
JobId: jobId,
Expand Down
2 changes: 1 addition & 1 deletion api/quantification/importCSV.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func ImportQuantCSV(
Id: quantId,
ScanId: scanId,
Params: &protos.QuantStartingParameters{
Params: &protos.QuantCreateParams{
UserParams: &protos.QuantCreateParams{
Command: "map",
Name: quantName,
ScanId: scanId,
Expand Down
15 changes: 8 additions & 7 deletions api/quantification/quantifyPMCs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,26 @@ func makePMCListFilesForQuantPMCs(
cfg config.APIConfig,
datasetFileName string,
jobDataPath string,
params *protos.QuantStartingParameters,
quantStartSettings *protos.QuantStartingParameters,
dataset *protos.Experiment) ([]string, int32, error) {
pmcFiles := []string{}
userParams := quantStartSettings.UserParams

// Work out how many quants we're running, therefore how many nodes we need to generate in a reasonable time frame
spectraCount := int32(len(params.Params.Pmcs))
spectraCount := int32(len(userParams.Pmcs))
if !combinedSpectra {
spectraCount *= 2
}

nodeCount := quantRunner.EstimateNodeCount(spectraCount, int32(len(params.Params.Elements)), int32(params.Params.RunTimeSec), int32(params.CoresPerNode), cfg.MaxQuantNodes)
nodeCount := quantRunner.EstimateNodeCount(spectraCount, int32(len(userParams.Elements)), int32(userParams.RunTimeSec), int32(quantStartSettings.CoresPerNode), cfg.MaxQuantNodes)

if cfg.NodeCountOverride > 0 {
nodeCount = cfg.NodeCountOverride
svcs.Log.Infof("Using node count override: %v", nodeCount)
}

// NOTE: if we're running anything but the map command, the result is pretty quick, so we don't need to farm it out to multiple nodes
if params.Params.Command != "map" {
if userParams.Command != "map" {
nodeCount = 1
}

Expand All @@ -54,7 +55,7 @@ func makePMCListFilesForQuantPMCs(
svcs.Log.Debugf("spectraPerNode: %v, PMCs per node: %v for %v spectra, nodes: %v", spectraPerNode, pmcsPerNode, spectraCount, nodeCount)

// Generate the lists and save to S3
pmcLists := makeQuantJobPMCLists(params.Params.Pmcs, int(pmcsPerNode))
pmcLists := makeQuantJobPMCLists(userParams.Pmcs, int(pmcsPerNode))

pmcHasDwellLookup, err := makePMCHasDwellLookup(dataset)
if err != nil {
Expand All @@ -63,13 +64,13 @@ func makePMCListFilesForQuantPMCs(

for i, pmcList := range pmcLists {
// Serialise the data for the list
contents, err := makeIndividualPMCListFileContents(pmcList, datasetFileName, combinedSpectra, params.Params.IncludeDwells, pmcHasDwellLookup)
contents, err := makeIndividualPMCListFileContents(pmcList, datasetFileName, combinedSpectra, userParams.IncludeDwells, pmcHasDwellLookup)

if err != nil {
return pmcFiles, 0, fmt.Errorf("Error when preparing node PMC list: %v. Error: %v", i, err)
}

pmcListName, err := savePMCList(svcs, params.PiquantJobsBucket, contents, i+1, jobDataPath)
pmcListName, err := savePMCList(svcs, quantStartSettings.PiquantJobsBucket, contents, i+1, jobDataPath)
if err != nil {
return []string{}, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions api/quantification/quantifyROIs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func makePMCListFilesForQuantROI(
return "", 0, []roiItemWithPMCs{}, err
}

rois, err := getROIs(params.Params.Command, params.Params.ScanId, params.Params.RoiIDs, hctx, locIdxToPMCLookup, dataset)
rois, err := getROIs(params.UserParams.Command, params.UserParams.ScanId, params.UserParams.RoiIDs, hctx, locIdxToPMCLookup, dataset)
if err != nil {
return "", 0, rois, err
}
Expand All @@ -45,7 +45,7 @@ func makePMCListFilesForQuantROI(
return "", 0, rois, err
}

contents, err := makeROIPMCListFileContents(rois, datasetFileName, combinedSpectra, params.Params.IncludeDwells, pmcHasDwellLookup)
contents, err := makeROIPMCListFileContents(rois, datasetFileName, combinedSpectra, params.UserParams.IncludeDwells, pmcHasDwellLookup)
if err != nil {
return "", 0, rois, fmt.Errorf("Error when preparing quant ROI node list. Error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion api/quantification/uniqueName.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func checkQuantificationNameExists(name string, scanId string, hctx wsHelpers.Ha

// Check if it exists
for _, item := range items {
if item.Params.Params.Name == name {
if item.Params.UserParams.Name == name {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd-line-tools/api-integration-test/testQuant.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func testQuants(apiHost string) {
Id: quantId,
ScanId: scanId,
Params: &protos.QuantStartingParameters{
Params: &protos.QuantCreateParams{
UserParams: &protos.QuantCreateParams{
Command: "",
Name: "Trial quant with Rh",
ScanId: scanId,
Expand Down
4 changes: 2 additions & 2 deletions internal/cmd-line-tools/v3-importer/quantifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func migrateQuant(jobSummary SrcJobSummaryItem, overrideSrcPath string, coll *mo
}

if len(jobSummary.Params.RoiIDs) > 0 && len(jobSummary.Params.RoiID) > 0 && !utils.ItemInSlice(jobSummary.Params.RoiID, jobSummary.Params.RoiIDs) {
return fmt.Errorf("Both Roi (%v) and Roi IDs is set for quant %v, scan %v, and Roi IDs doesn't contain Roi!", jobSummary.JobID, jobSummary.Params.DatasetID)
return fmt.Errorf("Both Roi (%v) and Roi IDs is set for quant %v, scan %v, and Roi IDs doesn't contain Roi!", jobSummary.JobID, jobSummary.JobID, jobSummary.Params.DatasetID)
}

rois := jobSummary.Params.RoiIDs
Expand All @@ -250,7 +250,7 @@ func migrateQuant(jobSummary SrcJobSummaryItem, overrideSrcPath string, coll *mo
Id: jobSummary.JobID,
ScanId: jobSummary.Params.DatasetID,
Params: &protos.QuantStartingParameters{
Params: &protos.QuantCreateParams{
UserParams: &protos.QuantCreateParams{
Command: jobSummary.Params.Command,
Name: jobSummary.Params.Name,
ScanId: jobSummary.Params.DatasetID,
Expand Down

0 comments on commit 76d2f57

Please sign in to comment.