-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ALS-6511: Add PFB result type #116
Merged
Merged
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
d530a3d
ALS-6511: Initial commit for PFB response type
ramari16 c6bdaf7
ALS-6511: Fix spring config annotations
ramari16 0175c75
ALS-6511: Fix avro writer configuration
ramari16 79fee23
ALS-6511: Fix refactoring bug
ramari16 1a3fd09
ALS-6511: Fix refactoring bug
ramari16 bc1cda5
ALS-6511: Fix avro formatting issue
ramari16 d149826
ALS-6511: Fix null value handling
ramari16 bd3a945
ALS-6511: Add log message for avro file locaton
ramari16 1360901
ALS-6511: Attempt to fix snappy class not found exception
ramari16 7bc1109
ALS-6511: Attempt to fix snappy codec bug
ramari16 e0f9fad
ALS-6511: Attempt to fix snappy codec bug
ramari16 741aed8
ALS-6511: Fix pfb file writing issues
ramari16 75e2448
ALS-6511: Update pfb result to handle multiple values per variable
ramari16 068e987
ALS-6511: Fix infinite loop
ramari16 93630c3
ALS-6511: Handle single value variables
ramari16 37c739f
ALS-6511: Minor fixes and comments
ramari16 9e2fd0c
Merge branch 'genomic-v2' into ALS-6511
ramari16 7dac201
ALS-6511: Added test case, removed unused code
ramari16 091eab0
ALS-6551: Attempt to fix character encoding issue
ramari16 1be9246
ALS-6511: Attempt to fix encoding issue
ramari16 02f166d
ALS-6511: Attempt to fix encoding issue
ramari16 3321f37
ALS-6511: Revert uneccesary changes
ramari16 f564636
ALS-6511: Revert uneccessary testing changes
ramari16 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
118 changes: 118 additions & 0 deletions
118
processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package edu.harvard.hms.dbmi.avillach.hpds.processing; | ||
|
||
import com.google.common.collect.Lists; | ||
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; | ||
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; | ||
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; | ||
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Component; | ||
|
||
import java.util.*; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
@Component | ||
public class PfbProcessor implements HpdsProcessor { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is somewhat similar to the QueryProcessor which generates the CSVs, but since PFB needs to write row by row, we batch the rows and then transform the column-based data to row based data before writing |
||
|
||
public static final String PATIENT_ID_FIELD_NAME = "patient_id"; | ||
private final int ID_BATCH_SIZE; | ||
private final AbstractProcessor abstractProcessor; | ||
|
||
private Logger log = LoggerFactory.getLogger(PfbProcessor.class); | ||
|
||
|
||
@Autowired | ||
public PfbProcessor(AbstractProcessor abstractProcessor) { | ||
this.abstractProcessor = abstractProcessor; | ||
ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0")); | ||
} | ||
|
||
@Override | ||
public String[] getHeaderRow(Query query) { | ||
String[] header = new String[query.getFields().size()+1]; | ||
header[0] = PATIENT_ID_FIELD_NAME; | ||
System.arraycopy(query.getFields().toArray(), 0, header, 1, query.getFields().size()); | ||
return header; | ||
} | ||
|
||
@Override | ||
public void runQuery(Query query, AsyncResult result) { | ||
Set<Integer> idList = abstractProcessor.getPatientSubsetForQuery(query); | ||
log.info("Processing " + idList.size() + " rows for result " + result.getId()); | ||
Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream() | ||
.forEach(patientIds -> { | ||
Map<String, Map<Integer, List<String>>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds)); | ||
List<List<List<String>>> fieldValuesPerPatient = patientIds.stream().map(patientId -> { | ||
List<List<String>> objectStream = Arrays.stream(getHeaderRow(query)).map(field -> { | ||
if (PATIENT_ID_FIELD_NAME.equals(field)) { | ||
return List.of(patientId.toString()); | ||
} else { | ||
return pathToPatientToValueMap.get(field).get(patientId); | ||
} | ||
}).collect(Collectors.toList()); | ||
return objectStream; | ||
}).collect(Collectors.toList()); | ||
result.appendMultiValueResults(fieldValuesPerPatient); | ||
}); | ||
result.closeWriter(); | ||
} | ||
|
||
private Map<String, Map<Integer, List<String>>> buildResult(AsyncResult result, Query query, TreeSet<Integer> ids) { | ||
ConcurrentHashMap<String, Map<Integer, List<String>>> pathToPatientToValueMap = new ConcurrentHashMap<>(); | ||
List<ColumnMeta> columns = query.getFields().stream() | ||
.map(abstractProcessor.getDictionary()::get) | ||
.filter(Objects::nonNull) | ||
.collect(Collectors.toList()); | ||
List<String> paths = columns.stream() | ||
.map(ColumnMeta::getName) | ||
.collect(Collectors.toList()); | ||
int columnCount = paths.size() + 1; | ||
|
||
ArrayList<Integer> columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount); | ||
ResultStore results = new ResultStore(result.getId(), columns, ids); | ||
|
||
// todo: investigate if the parallel stream will thrash the cache if the number of executors is > number of resident cubes | ||
columnIndex.parallelStream().forEach((columnId)->{ | ||
String columnPath = paths.get(columnId-1); | ||
Map<Integer, List<String>> patientIdToValueMap = processColumn(ids, columnPath); | ||
pathToPatientToValueMap.put(columnPath, patientIdToValueMap); | ||
}); | ||
|
||
return pathToPatientToValueMap; | ||
} | ||
|
||
private Map<Integer, List<String>> processColumn(TreeSet<Integer> patientIds, String path) { | ||
|
||
Map<Integer, List<String>> patientIdToValueMap = new HashMap<>(); | ||
PhenoCube<?> cube = abstractProcessor.getCube(path); | ||
|
||
KeyAndValue<?>[] cubeValues = cube.sortedByKey(); | ||
|
||
int idPointer = 0; | ||
for(int patientId : patientIds) { | ||
while(idPointer < cubeValues.length) { | ||
int key = cubeValues[idPointer].getKey(); | ||
if(key < patientId) { | ||
idPointer++; | ||
} else if(key == patientId){ | ||
String value = getResultField(cube, cubeValues, idPointer); | ||
patientIdToValueMap.computeIfAbsent(patientId, k -> new ArrayList<>()).add(value); | ||
idPointer++; | ||
} else { | ||
break; | ||
} | ||
} | ||
} | ||
return patientIdToValueMap; | ||
} | ||
|
||
private String getResultField(PhenoCube<?> cube, KeyAndValue<?>[] cubeValues, | ||
int idPointer) { | ||
Comparable<?> value = cubeValues[idPointer].getValue(); | ||
return value.toString(); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was a huge pain. I tried to improve it but I'm not super happy with how this turned out. But at least this supports multi value exporting, to some degree