Skip to content

Commit

Permalink
Modifying migration tool to import shared (orphaned) items where user…
Browse files Browse the repository at this point in the history
… has already deleted their copy of what was shared. These were not coming through the import before. Also fixed some path reading issues related to this
  • Loading branch information
Peter Nemere committed Sep 19, 2023
1 parent a594794 commit 0500f3b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 127 deletions.
173 changes: 94 additions & 79 deletions internal/cmd-line-tools/v3-importer/quantifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func migrateQuants(
}
}

destQuants := []interface{}{}
userItems := map[string]SrcJobSummaryItem{}

roiSetCount := 0
Expand Down Expand Up @@ -183,99 +182,115 @@ func migrateQuants(
}

if jobSummary.Params.Creator.UserID != userIdFromPath {
fmt.Printf("Unexpected quant user: %v, path had id: %v. Path was: %v\n", jobSummary.Params.Creator.UserID, userIdFromPath, p)
fmt.Printf("Unexpected quant user: %v, path had id: %v. Path was: %v. Quant was likely copied to another user, skipping...\n" /*. Using quant user in output paths...\n"*/, jobSummary.Params.Creator.UserID, userIdFromPath, p)
//userIdFromPath = jobSummary.Params.Creator.UserID
continue
}

userItems[quantId] = jobSummary

var jobStatus protos.JobStatus_Status

switch jobSummary.Status {
case SrcJobStarting:
jobStatus = protos.JobStatus_STARTING
case SrcJobPreparingNodes:
jobStatus = protos.JobStatus_PREPARING_NODES
case SrcJobNodesRunning:
jobStatus = protos.JobStatus_NODES_RUNNING
case SrcJobGatheringResults:
jobStatus = protos.JobStatus_GATHERING_RESULTS
case SrcJobComplete:
jobStatus = protos.JobStatus_COMPLETE
case SrcJobError:
jobStatus = protos.JobStatus_ERROR
}

// Write to DB
destQuant := &protos.QuantificationSummary{
Id: jobSummary.JobID,
ScanId: jobSummary.Params.DatasetID,
Params: &protos.QuantStartingParameters{
Name: jobSummary.Params.Name,
DataBucket: jobSummary.Params.DataBucket,
DatasetPath: jobSummary.Params.DatasetPath,
DatasetID: jobSummary.Params.DatasetID,
PiquantJobsBucket: jobSummary.Params.PiquantJobsBucket,
DetectorConfig: jobSummary.Params.DetectorConfig,
Elements: jobSummary.Params.Elements,
Parameters: jobSummary.Params.Parameters,
RunTimeSec: uint32(jobSummary.Params.RunTimeSec),
CoresPerNode: uint32(jobSummary.Params.CoresPerNode),
StartUnixTimeSec: uint32(jobSummary.Params.StartUnixTime),
RequestorUserId: utils.FixUserId(jobSummary.Params.Creator.UserID),
RoiID: jobSummary.Params.RoiID,
ElementSetID: jobSummary.Params.ElementSetID,
PIQUANTVersion: jobSummary.Params.PIQUANTVersion,
QuantMode: jobSummary.Params.QuantMode,
Comments: jobSummary.Params.Comments,
RoiIDs: jobSummary.Params.RoiIDs,
IncludeDwells: jobSummary.Params.IncludeDwells,
Command: jobSummary.Params.Command,
PmcCount: uint32(jobSummary.Params.PMCCount),
},
Elements: jobSummary.Elements,
Status: &protos.JobStatus{
JobID: jobSummary.JobID,
Status: jobStatus,
Message: jobSummary.Message,
EndUnixTimeSec: uint32(jobSummary.EndUnixTime),
OutputFilePath: path.Join("Quantifications", scanId, utils.FixUserId(jobSummary.Params.Creator.UserID)), //jobSummary.OutputFilePath,
PiquantLogs: jobSummary.PiquantLogList,
},
}

viewerGroupId := ""
if removeIfSharedQuant(jobSummary, sharedItems) {
viewerGroupId = userGroups["PIXL-FM"]
}

err = saveOwnershipItem(quantId, protos.ObjectType_OT_QUANTIFICATION, jobSummary.Params.Creator.UserID, "", viewerGroupId, uint32(jobSummary.EndUnixTime), dest)
if err != nil {
return err
if err := migrateQuant(jobSummary, "", coll, userContentBucket, destUserContentBucket, viewerGroupId, fs, dest); err != nil {
log.Fatalln(err)
}

// Save the relevant quantification files to their destination in S3.
// NOTE: if they are not found, this is an error!
saveQuantFiles(scanId, userIdFromPath, quantId, userContentBucket, destUserContentBucket, fs)

destQuants = append(destQuants, destQuant)
}
}

fmt.Printf("Quantification import results: roiID field count: %v, roiIDs field count: %v, elementSetID field count: %v, multiQuants: %v\n",
roiSetCount, roisSetCount, elementSetSetCount, multiQuantCount)

result, err := coll.InsertMany(context.TODO(), destQuants)
fmt.Printf("Quants inserted: %v\n", len(userItems))
fmt.Println("Adding the following orphaned Quants (shared but original not found):")
for _, shared := range sharedItems {
fmt.Printf(" - %v\n", shared.JobID)
// At this point, it's been shared, so no longer in the OutputFilePath dir that's in the summary. We override it with the real
// path to the shared item
srcPath := SrcGetUserQuantPath("shared", shared.Params.DatasetID, "")
if err := migrateQuant(shared, srcPath, coll, userContentBucket, destUserContentBucket, userGroups["PIXL-FM"], fs, dest); err != nil {
log.Fatalf("migrateQuant failed for: %v. Error: %v", shared.JobID, err)
}
}

return nil
}

func migrateQuant(jobSummary SrcJobSummaryItem, overrideSrcPath string, coll *mongo.Collection, userContentBucket string, destUserContentBucket string, viewerGroupId string, fs fileaccess.FileAccess, dest *mongo.Database) error {
var jobStatus protos.JobStatus_Status

switch jobSummary.Status {
case SrcJobStarting:
jobStatus = protos.JobStatus_STARTING
case SrcJobPreparingNodes:
jobStatus = protos.JobStatus_PREPARING_NODES
case SrcJobNodesRunning:
jobStatus = protos.JobStatus_NODES_RUNNING
case SrcJobGatheringResults:
jobStatus = protos.JobStatus_GATHERING_RESULTS
case SrcJobComplete:
jobStatus = protos.JobStatus_COMPLETE
case SrcJobError:
jobStatus = protos.JobStatus_ERROR
}

// Write to DB
destQuant := &protos.QuantificationSummary{
Id: jobSummary.JobID,
ScanId: jobSummary.Params.DatasetID,
Params: &protos.QuantStartingParameters{
Name: jobSummary.Params.Name,
DataBucket: jobSummary.Params.DataBucket,
DatasetPath: jobSummary.Params.DatasetPath,
DatasetID: jobSummary.Params.DatasetID,
PiquantJobsBucket: jobSummary.Params.PiquantJobsBucket,
DetectorConfig: jobSummary.Params.DetectorConfig,
Elements: jobSummary.Params.Elements,
Parameters: jobSummary.Params.Parameters,
RunTimeSec: uint32(jobSummary.Params.RunTimeSec),
CoresPerNode: uint32(jobSummary.Params.CoresPerNode),
StartUnixTimeSec: uint32(jobSummary.Params.StartUnixTime),
RequestorUserId: utils.FixUserId(jobSummary.Params.Creator.UserID),
RoiID: jobSummary.Params.RoiID,
ElementSetID: jobSummary.Params.ElementSetID,
PIQUANTVersion: jobSummary.Params.PIQUANTVersion,
QuantMode: jobSummary.Params.QuantMode,
Comments: jobSummary.Params.Comments,
RoiIDs: jobSummary.Params.RoiIDs,
IncludeDwells: jobSummary.Params.IncludeDwells,
Command: jobSummary.Params.Command,
PmcCount: uint32(jobSummary.Params.PMCCount),
},
Elements: jobSummary.Elements,
Status: &protos.JobStatus{
JobID: jobSummary.JobID,
Status: jobStatus,
Message: jobSummary.Message,
EndUnixTimeSec: uint32(jobSummary.EndUnixTime),
OutputFilePath: path.Join("Quantifications", jobSummary.Params.DatasetID, utils.FixUserId(jobSummary.Params.Creator.UserID)), //jobSummary.OutputFilePath,
PiquantLogs: jobSummary.PiquantLogList,
},
}

_, err := coll.InsertOne(context.TODO(), destQuant)
if err != nil {
return err
}

fmt.Printf("Quants inserted: %v\n", len(result.InsertedIDs))
fmt.Println("Quants orphaned (shared but original not found):")
for _, shared := range sharedItems {
fmt.Printf("%v\n", shared.JobID)
err = saveOwnershipItem(jobSummary.JobID, protos.ObjectType_OT_QUANTIFICATION, jobSummary.Params.Creator.UserID, "", viewerGroupId, uint32(jobSummary.EndUnixTime), dest)
if err != nil {
return err
}

return nil
// Save the relevant quantification files to their destination in S3.
// NOTE: if they are not found, this is an error!
srcPath := jobSummary.OutputFilePath
if len(overrideSrcPath) > 0 {
srcPath = overrideSrcPath
}
return saveQuantFiles(jobSummary.Params.DatasetID, jobSummary.JobID, userContentBucket, srcPath, destUserContentBucket, destQuant.Status.OutputFilePath, fs)
}

func removeIfSharedQuant(jobSummary SrcJobSummaryItem, sharedQuantSummaries map[string]SrcJobSummaryItem) bool {
Expand Down Expand Up @@ -313,15 +328,15 @@ func SrcGetUserQuantPath(userID string, datasetID string, fileName string) strin
return path.Join(filepaths.RootUserContent, userID, datasetID, quantificationSubPath)
}

func saveQuantFiles(datasetId string, userId string, quantId string, userContentBucket string, destUserContentBucket string, fs fileaccess.FileAccess) error {
func saveQuantFiles(datasetId string, quantId string, userContentBucket string, srcPath string, destUserContentBucket string, destPath string, fs fileaccess.FileAccess) error {
srcPaths := []string{
SrcGetUserQuantPath(userId, datasetId, quantId+".bin"),
SrcGetUserQuantPath(userId, datasetId, quantId+".csv"),
path.Join(srcPath, quantId+".bin"),
path.Join(srcPath, quantId+".csv"),
}

dstPaths := []string{
filepaths.GetQuantPath(utils.FixUserId(userId), datasetId, quantId+".bin"),
filepaths.GetQuantPath(utils.FixUserId(userId), datasetId, quantId+".csv"),
path.Join(destPath, quantId+".bin"),
path.Join(destPath, quantId+".csv"),
}

failOnError := []bool{
Expand All @@ -330,7 +345,7 @@ func saveQuantFiles(datasetId string, userId string, quantId string, userContent
}

// Quant log files, we need a listing of these
s3Path := SrcGetUserQuantPath(userId, datasetId, filepaths.MakeQuantLogDirName(quantId))
s3Path := path.Join(srcPath, filepaths.MakeQuantLogDirName(quantId))
logFiles, err := fs.ListObjects(userContentBucket, s3Path)
if err != nil {
// We just warn here
Expand All @@ -343,7 +358,7 @@ func saveQuantFiles(datasetId string, userId string, quantId string, userContent
}

srcPaths = append(srcPaths, logPath)
dstPaths = append(dstPaths, filepaths.GetQuantPath(utils.FixUserId(userId), datasetId, path.Join(quantId+"-logs", path.Base(logPath))))
dstPaths = append(dstPaths, path.Join(destPath, path.Join(quantId+"-logs", path.Base(logPath))))
failOnError = append(failOnError, false) // optional
}
}
Expand Down
101 changes: 53 additions & 48 deletions internal/cmd-line-tools/v3-importer/roi.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func migrateROIs(
}

sharedItems := SrcROILookup{}
sharedItemScanIds := map[string]string{}
for _, p := range userContentFiles {
if strings.HasSuffix(p, "ROI.json") && strings.HasPrefix(p, "UserContent/shared/") {
scanId := filepath.Base(filepath.Dir(p))
Expand All @@ -69,12 +70,12 @@ func migrateROIs(
// Store these till we're finished here
for id, item := range items {
sharedItems[ /*scanId+"_"+*/ id] = item
sharedItemScanIds[id] = scanId
}
sharedItems = items
}
}

destROIs := []interface{}{}
allItems := SrcROILookup{}

for _, p := range userContentFiles {
Expand All @@ -101,70 +102,33 @@ func migrateROIs(

// Write these to DB and also remember them for later...
for id, item := range items {
saveId := /*scanId + "_" +*/ id

if ex, ok := allItems[saveId]; ok {
fmt.Printf("Duplicate: %v - %v vs %v\n", saveId, item.Name, ex.Name)
if ex, ok := allItems[id]; ok {
fmt.Printf("Duplicate: %v - %v vs %v\n", id, item.Name, ex.Name)
continue
}

if item.SrcAPIObjectItem.Creator.UserID != userIdFromPath {
fmt.Printf("Unexpected ROI user: %v, path had id: %v\n", item.SrcAPIObjectItem.Creator.UserID, userIdFromPath)
}

allItems[saveId] = item

tags := item.Tags
if tags == nil {
tags = []string{}
}

scanIdxs, err := indexcompression.EncodeIndexList(item.LocationIndexes)
if err != nil {
return fmt.Errorf("ROI %v: location list error: %v", saveId, err)
}
pixIdxs, err := indexcompression.EncodeIndexList(item.PixelIndexes)
if err != nil {
return fmt.Errorf("ROI %v: pixel list error: %v", saveId, err)
fmt.Printf("Unexpected ROI user: %v, path had id: %v. ROI was likely copied to another user, skipping...\n", item.SrcAPIObjectItem.Creator.UserID, userIdFromPath)
continue
}

destROI := protos.ROIItem{
Id: saveId,
ScanId: scanId,
Name: item.Name,
Description: item.Description,
Tags: tags,
ScanEntryIndexesEncoded: scanIdxs,
ImageName: item.ImageName,
PixelIndexesEncoded: pixIdxs,
ModifiedUnixSec: uint32(item.CreatedUnixTimeSec),
// MistROIItem
}
allItems[id] = item

viewerGroupId := ""
if removeIfSharedROI(item, sharedItems) {
viewerGroupId = userGroups["PIXL-FM"]
}

err = saveOwnershipItem(destROI.Id, protos.ObjectType_OT_ROI, item.Creator.UserID, "", viewerGroupId, uint32(item.CreatedUnixTimeSec), dest)
if err != nil {
return err
}

destROIs = append(destROIs, destROI)
migrateROI(id, scanId, item, coll, dest, viewerGroupId)
}
}
}

result, err := coll.InsertMany(context.TODO(), destROIs)
if err != nil {
return err
}

fmt.Printf("ROIs inserted: %v\n", len(result.InsertedIDs))
fmt.Println("ROIs orphaned (shared but original not found):")
for id := range sharedItems {
fmt.Printf("ROIs inserted: %v\n", len(allItems))
fmt.Println("Adding the following orphaned ROIs (shared but original not found):")
for id, shared := range sharedItems {
fmt.Printf("%v\n", id)
migrateROI(id, sharedItemScanIds[id], shared, coll, dest, userGroups["PIXL-FM"])
}

return err
Expand All @@ -182,3 +146,44 @@ func removeIfSharedROI(roi SrcROISavedItem, sharedROIs SrcROILookup) bool {

return false
}

func migrateROI(roiId string, scanId string, item SrcROISavedItem, coll *mongo.Collection, dest *mongo.Database, viewerGroupId string) error {
tags := item.Tags
if tags == nil {
tags = []string{}
}

scanIdxs, err := indexcompression.EncodeIndexList(item.LocationIndexes)
if err != nil {
return fmt.Errorf("ROI %v: location list error: %v", roiId, err)
}
pixIdxs, err := indexcompression.EncodeIndexList(item.PixelIndexes)
if err != nil {
return fmt.Errorf("ROI %v: pixel list error: %v", roiId, err)
}

destROI := protos.ROIItem{
Id: roiId,
ScanId: scanId,
Name: item.Name,
Description: item.Description,
Tags: tags,
ScanEntryIndexesEncoded: scanIdxs,
ImageName: item.ImageName,
PixelIndexesEncoded: pixIdxs,
ModifiedUnixSec: uint32(item.CreatedUnixTimeSec),
// MistROIItem
}

_, err = coll.InsertOne(context.TODO(), destROI)
if err != nil {
return err
}

err = saveOwnershipItem(destROI.Id, protos.ObjectType_OT_ROI, item.Creator.UserID, "", viewerGroupId, uint32(item.CreatedUnixTimeSec), dest)
if err != nil {
return err
}

return nil
}

0 comments on commit 0500f3b

Please sign in to comment.