diff --git a/internal/cmd-line-tools/v3-importer/quantifications.go b/internal/cmd-line-tools/v3-importer/quantifications.go index ce7b4b41..4f00ba04 100644 --- a/internal/cmd-line-tools/v3-importer/quantifications.go +++ b/internal/cmd-line-tools/v3-importer/quantifications.go @@ -117,7 +117,6 @@ func migrateQuants( } } - destQuants := []interface{}{} userItems := map[string]SrcJobSummaryItem{} roiSetCount := 0 @@ -183,99 +182,115 @@ func migrateQuants( } if jobSummary.Params.Creator.UserID != userIdFromPath { - fmt.Printf("Unexpected quant user: %v, path had id: %v. Path was: %v\n", jobSummary.Params.Creator.UserID, userIdFromPath, p) + fmt.Printf("Unexpected quant user: %v, path had id: %v. Path was: %v. Quant was likely copied to another user, skipping...\n" /*. Using quant user in output paths...\n"*/, jobSummary.Params.Creator.UserID, userIdFromPath, p) + //userIdFromPath = jobSummary.Params.Creator.UserID + continue } userItems[quantId] = jobSummary - var jobStatus protos.JobStatus_Status - - switch jobSummary.Status { - case SrcJobStarting: - jobStatus = protos.JobStatus_STARTING - case SrcJobPreparingNodes: - jobStatus = protos.JobStatus_PREPARING_NODES - case SrcJobNodesRunning: - jobStatus = protos.JobStatus_NODES_RUNNING - case SrcJobGatheringResults: - jobStatus = protos.JobStatus_GATHERING_RESULTS - case SrcJobComplete: - jobStatus = protos.JobStatus_COMPLETE - case SrcJobError: - jobStatus = protos.JobStatus_ERROR - } - - // Write to DB - destQuant := &protos.QuantificationSummary{ - Id: jobSummary.JobID, - ScanId: jobSummary.Params.DatasetID, - Params: &protos.QuantStartingParameters{ - Name: jobSummary.Params.Name, - DataBucket: jobSummary.Params.DataBucket, - DatasetPath: jobSummary.Params.DatasetPath, - DatasetID: jobSummary.Params.DatasetID, - PiquantJobsBucket: jobSummary.Params.PiquantJobsBucket, - DetectorConfig: jobSummary.Params.DetectorConfig, - Elements: jobSummary.Params.Elements, - Parameters: jobSummary.Params.Parameters, - RunTimeSec: uint32(jobSummary.Params.RunTimeSec), - CoresPerNode: uint32(jobSummary.Params.CoresPerNode), - StartUnixTimeSec: uint32(jobSummary.Params.StartUnixTime), - RequestorUserId: utils.FixUserId(jobSummary.Params.Creator.UserID), - RoiID: jobSummary.Params.RoiID, - ElementSetID: jobSummary.Params.ElementSetID, - PIQUANTVersion: jobSummary.Params.PIQUANTVersion, - QuantMode: jobSummary.Params.QuantMode, - Comments: jobSummary.Params.Comments, - RoiIDs: jobSummary.Params.RoiIDs, - IncludeDwells: jobSummary.Params.IncludeDwells, - Command: jobSummary.Params.Command, - PmcCount: uint32(jobSummary.Params.PMCCount), - }, - Elements: jobSummary.Elements, - Status: &protos.JobStatus{ - JobID: jobSummary.JobID, - Status: jobStatus, - Message: jobSummary.Message, - EndUnixTimeSec: uint32(jobSummary.EndUnixTime), - OutputFilePath: path.Join("Quantifications", scanId, utils.FixUserId(jobSummary.Params.Creator.UserID)), //jobSummary.OutputFilePath, - PiquantLogs: jobSummary.PiquantLogList, - }, - } - viewerGroupId := "" if removeIfSharedQuant(jobSummary, sharedItems) { viewerGroupId = userGroups["PIXL-FM"] } - err = saveOwnershipItem(quantId, protos.ObjectType_OT_QUANTIFICATION, jobSummary.Params.Creator.UserID, "", viewerGroupId, uint32(jobSummary.EndUnixTime), dest) - if err != nil { - return err + if err := migrateQuant(jobSummary, "", coll, userContentBucket, destUserContentBucket, viewerGroupId, fs, dest); err != nil { + log.Fatalln(err) } - - // Save the relevant quantification files to their destination in S3. - // NOTE: if they are not found, this is an error! - saveQuantFiles(scanId, userIdFromPath, quantId, userContentBucket, destUserContentBucket, fs) - - destQuants = append(destQuants, destQuant) } } fmt.Printf("Quantification import results: roiID field count: %v, roiIDs field count: %v, elementSetID field count: %v, multiQuants: %v\n", roiSetCount, roisSetCount, elementSetSetCount, multiQuantCount) - result, err := coll.InsertMany(context.TODO(), destQuants) + fmt.Printf("Quants inserted: %v\n", len(userItems)) + fmt.Println("Adding the following orphaned Quants (shared but original not found):") + for _, shared := range sharedItems { + fmt.Printf(" - %v\n", shared.JobID) + // At this point, it's been shared, so no longer in the OutputFilePath dir that's in the summary. We override it with the real + // path to the shared item + srcPath := SrcGetUserQuantPath("shared", shared.Params.DatasetID, "") + if err := migrateQuant(shared, srcPath, coll, userContentBucket, destUserContentBucket, userGroups["PIXL-FM"], fs, dest); err != nil { + log.Fatalf("migrateQuant failed for: %v. Error: %v", shared.JobID, err) + } + } + + return nil +} + +func migrateQuant(jobSummary SrcJobSummaryItem, overrideSrcPath string, coll *mongo.Collection, userContentBucket string, destUserContentBucket string, viewerGroupId string, fs fileaccess.FileAccess, dest *mongo.Database) error { + var jobStatus protos.JobStatus_Status + + switch jobSummary.Status { + case SrcJobStarting: + jobStatus = protos.JobStatus_STARTING + case SrcJobPreparingNodes: + jobStatus = protos.JobStatus_PREPARING_NODES + case SrcJobNodesRunning: + jobStatus = protos.JobStatus_NODES_RUNNING + case SrcJobGatheringResults: + jobStatus = protos.JobStatus_GATHERING_RESULTS + case SrcJobComplete: + jobStatus = protos.JobStatus_COMPLETE + case SrcJobError: + jobStatus = protos.JobStatus_ERROR + } + + // Write to DB + destQuant := &protos.QuantificationSummary{ + Id: jobSummary.JobID, + ScanId: jobSummary.Params.DatasetID, + Params: &protos.QuantStartingParameters{ + Name: jobSummary.Params.Name, + DataBucket: jobSummary.Params.DataBucket, + DatasetPath: jobSummary.Params.DatasetPath, + DatasetID: jobSummary.Params.DatasetID, + PiquantJobsBucket: jobSummary.Params.PiquantJobsBucket, + DetectorConfig: jobSummary.Params.DetectorConfig, + Elements: jobSummary.Params.Elements, + Parameters: jobSummary.Params.Parameters, + RunTimeSec: uint32(jobSummary.Params.RunTimeSec), + CoresPerNode: uint32(jobSummary.Params.CoresPerNode), + StartUnixTimeSec: uint32(jobSummary.Params.StartUnixTime), + RequestorUserId: utils.FixUserId(jobSummary.Params.Creator.UserID), + RoiID: jobSummary.Params.RoiID, + ElementSetID: jobSummary.Params.ElementSetID, + PIQUANTVersion: jobSummary.Params.PIQUANTVersion, + QuantMode: jobSummary.Params.QuantMode, + Comments: jobSummary.Params.Comments, + RoiIDs: jobSummary.Params.RoiIDs, + IncludeDwells: jobSummary.Params.IncludeDwells, + Command: jobSummary.Params.Command, + PmcCount: uint32(jobSummary.Params.PMCCount), + }, + Elements: jobSummary.Elements, + Status: &protos.JobStatus{ + JobID: jobSummary.JobID, + Status: jobStatus, + Message: jobSummary.Message, + EndUnixTimeSec: uint32(jobSummary.EndUnixTime), + OutputFilePath: path.Join("Quantifications", jobSummary.Params.DatasetID, utils.FixUserId(jobSummary.Params.Creator.UserID)), //jobSummary.OutputFilePath, + PiquantLogs: jobSummary.PiquantLogList, + }, + } + + _, err := coll.InsertOne(context.TODO(), destQuant) if err != nil { return err } - fmt.Printf("Quants inserted: %v\n", len(result.InsertedIDs)) - fmt.Println("Quants orphaned (shared but original not found):") - for _, shared := range sharedItems { - fmt.Printf("%v\n", shared.JobID) + err = saveOwnershipItem(jobSummary.JobID, protos.ObjectType_OT_QUANTIFICATION, jobSummary.Params.Creator.UserID, "", viewerGroupId, uint32(jobSummary.EndUnixTime), dest) + if err != nil { + return err } - return nil + // Save the relevant quantification files to their destination in S3. + // NOTE: if they are not found, this is an error! + srcPath := jobSummary.OutputFilePath + if len(overrideSrcPath) > 0 { + srcPath = overrideSrcPath + } + return saveQuantFiles(jobSummary.Params.DatasetID, jobSummary.JobID, userContentBucket, srcPath, destUserContentBucket, destQuant.Status.OutputFilePath, fs) } func removeIfSharedQuant(jobSummary SrcJobSummaryItem, sharedQuantSummaries map[string]SrcJobSummaryItem) bool { @@ -313,15 +328,15 @@ func SrcGetUserQuantPath(userID string, datasetID string, fileName string) strin return path.Join(filepaths.RootUserContent, userID, datasetID, quantificationSubPath) } -func saveQuantFiles(datasetId string, userId string, quantId string, userContentBucket string, destUserContentBucket string, fs fileaccess.FileAccess) error { +func saveQuantFiles(datasetId string, quantId string, userContentBucket string, srcPath string, destUserContentBucket string, destPath string, fs fileaccess.FileAccess) error { srcPaths := []string{ - SrcGetUserQuantPath(userId, datasetId, quantId+".bin"), - SrcGetUserQuantPath(userId, datasetId, quantId+".csv"), + path.Join(srcPath, quantId+".bin"), + path.Join(srcPath, quantId+".csv"), } dstPaths := []string{ - filepaths.GetQuantPath(utils.FixUserId(userId), datasetId, quantId+".bin"), - filepaths.GetQuantPath(utils.FixUserId(userId), datasetId, quantId+".csv"), + path.Join(destPath, quantId+".bin"), + path.Join(destPath, quantId+".csv"), } failOnError := []bool{ @@ -330,7 +345,7 @@ func saveQuantFiles(datasetId string, userId string, quantId string, userContent } // Quant log files, we need a listing of these - s3Path := SrcGetUserQuantPath(userId, datasetId, filepaths.MakeQuantLogDirName(quantId)) + s3Path := path.Join(srcPath, filepaths.MakeQuantLogDirName(quantId)) logFiles, err := fs.ListObjects(userContentBucket, s3Path) if err != nil { // We just warn here @@ -343,7 +358,7 @@ func saveQuantFiles(datasetId string, userId string, quantId string, userContent } srcPaths = append(srcPaths, logPath) - dstPaths = append(dstPaths, filepaths.GetQuantPath(utils.FixUserId(userId), datasetId, path.Join(quantId+"-logs", path.Base(logPath)))) + dstPaths = append(dstPaths, path.Join(destPath, path.Join(quantId+"-logs", path.Base(logPath)))) failOnError = append(failOnError, false) // optional } } diff --git a/internal/cmd-line-tools/v3-importer/roi.go b/internal/cmd-line-tools/v3-importer/roi.go index 4bb887ae..5e430d83 100644 --- a/internal/cmd-line-tools/v3-importer/roi.go +++ b/internal/cmd-line-tools/v3-importer/roi.go @@ -50,6 +50,7 @@ func migrateROIs( } sharedItems := SrcROILookup{} + sharedItemScanIds := map[string]string{} for _, p := range userContentFiles { if strings.HasSuffix(p, "ROI.json") && strings.HasPrefix(p, "UserContent/shared/") { scanId := filepath.Base(filepath.Dir(p)) @@ -69,12 +70,12 @@ func migrateROIs( // Store these till we're finished here for id, item := range items { sharedItems[ /*scanId+"_"+*/ id] = item + sharedItemScanIds[id] = scanId } sharedItems = items } } - destROIs := []interface{}{} allItems := SrcROILookup{} for _, p := range userContentFiles { @@ -101,70 +102,33 @@ func migrateROIs( // Write these to DB and also remember them for later... for id, item := range items { - saveId := /*scanId + "_" +*/ id - - if ex, ok := allItems[saveId]; ok { - fmt.Printf("Duplicate: %v - %v vs %v\n", saveId, item.Name, ex.Name) + if ex, ok := allItems[id]; ok { + fmt.Printf("Duplicate: %v - %v vs %v\n", id, item.Name, ex.Name) continue } if item.SrcAPIObjectItem.Creator.UserID != userIdFromPath { - fmt.Printf("Unexpected ROI user: %v, path had id: %v\n", item.SrcAPIObjectItem.Creator.UserID, userIdFromPath) - } - - allItems[saveId] = item - - tags := item.Tags - if tags == nil { - tags = []string{} - } - - scanIdxs, err := indexcompression.EncodeIndexList(item.LocationIndexes) - if err != nil { - return fmt.Errorf("ROI %v: location list error: %v", saveId, err) - } - pixIdxs, err := indexcompression.EncodeIndexList(item.PixelIndexes) - if err != nil { - return fmt.Errorf("ROI %v: pixel list error: %v", saveId, err) + fmt.Printf("Unexpected ROI user: %v, path had id: %v. ROI was likely copied to another user, skipping...\n", item.SrcAPIObjectItem.Creator.UserID, userIdFromPath) + continue } - destROI := protos.ROIItem{ - Id: saveId, - ScanId: scanId, - Name: item.Name, - Description: item.Description, - Tags: tags, - ScanEntryIndexesEncoded: scanIdxs, - ImageName: item.ImageName, - PixelIndexesEncoded: pixIdxs, - ModifiedUnixSec: uint32(item.CreatedUnixTimeSec), - // MistROIItem - } + allItems[id] = item viewerGroupId := "" if removeIfSharedROI(item, sharedItems) { viewerGroupId = userGroups["PIXL-FM"] } - err = saveOwnershipItem(destROI.Id, protos.ObjectType_OT_ROI, item.Creator.UserID, "", viewerGroupId, uint32(item.CreatedUnixTimeSec), dest) - if err != nil { - return err - } - - destROIs = append(destROIs, destROI) + migrateROI(id, scanId, item, coll, dest, viewerGroupId) } } } - result, err := coll.InsertMany(context.TODO(), destROIs) - if err != nil { - return err - } - - fmt.Printf("ROIs inserted: %v\n", len(result.InsertedIDs)) - fmt.Println("ROIs orphaned (shared but original not found):") - for id := range sharedItems { + fmt.Printf("ROIs inserted: %v\n", len(allItems)) + fmt.Println("Adding the following orphaned ROIs (shared but original not found):") + for id, shared := range sharedItems { fmt.Printf("%v\n", id) + migrateROI(id, sharedItemScanIds[id], shared, coll, dest, userGroups["PIXL-FM"]) } return err @@ -182,3 +146,44 @@ func removeIfSharedROI(roi SrcROISavedItem, sharedROIs SrcROILookup) bool { return false } + +func migrateROI(roiId string, scanId string, item SrcROISavedItem, coll *mongo.Collection, dest *mongo.Database, viewerGroupId string) error { + tags := item.Tags + if tags == nil { + tags = []string{} + } + + scanIdxs, err := indexcompression.EncodeIndexList(item.LocationIndexes) + if err != nil { + return fmt.Errorf("ROI %v: location list error: %v", roiId, err) + } + pixIdxs, err := indexcompression.EncodeIndexList(item.PixelIndexes) + if err != nil { + return fmt.Errorf("ROI %v: pixel list error: %v", roiId, err) + } + + destROI := protos.ROIItem{ + Id: roiId, + ScanId: scanId, + Name: item.Name, + Description: item.Description, + Tags: tags, + ScanEntryIndexesEncoded: scanIdxs, + ImageName: item.ImageName, + PixelIndexesEncoded: pixIdxs, + ModifiedUnixSec: uint32(item.CreatedUnixTimeSec), + // MistROIItem + } + + _, err = coll.InsertOne(context.TODO(), destROI) + if err != nil { + return err + } + + err = saveOwnershipItem(destROI.Id, protos.ObjectType_OT_ROI, item.Creator.UserID, "", viewerGroupId, uint32(item.CreatedUnixTimeSec), dest) + if err != nil { + return err + } + + return nil +}