Skip to content

Commit

Permalink
Refactor CSVLoaderService into separate class
Browse files Browse the repository at this point in the history
Split CSVLoaderService into its own file for better modularity
and maintainability. Removed the duplicate implementation
from CSVLoaderNewSearch.
  • Loading branch information
TDeSain committed Mar 3, 2025
1 parent 5c18091 commit 805d2e0
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,82 +37,3 @@ ApplicationRunner runCSVLoader(CSVLoaderService csvLoaderService) {
return args -> csvLoaderService.runEtlProcess();
}
}

@Service
class CSVLoaderService {

private static final Logger log = LoggerFactory.getLogger(CSVLoaderService.class);
private final LoadingStore store = new LoadingStore();

@Value("${etl.hpds.directory:/opt/local/hpds/}")
private String hpdsDirectory;

@Value("${etl.rollup.enabled:true}")
private boolean rollupEnabled;

public void runEtlProcess() throws IOException {
log.info("Starting ETL process... Rollup Enabled: {}", rollupEnabled);

store.allObservationsStore = new RandomAccessFile(hpdsDirectory + "allObservationsStore.javabin", "rw");
initialLoad();
store.saveStore(hpdsDirectory);

log.info("ETL process completed.");
}

private void initialLoad() throws IOException {
Crypto.loadDefaultKey();
Reader in = new FileReader(hpdsDirectory + "allConcepts.csv");
Iterable<CSVRecord> records = CSVFormat.DEFAULT
.withFirstRecordAsHeader()
.withSkipHeaderRecord(true)
.parse(new BufferedReader(in, 256 * 1024));

final PhenoCube[] currentConcept = new PhenoCube[1];
for (CSVRecord record : records) {
processRecord(currentConcept, record);
}

}

private void processRecord(final PhenoCube[] currentConcept, CSVRecord record) {
if (record.size() < 4) {
log.warn("Skipping record #{} due to missing fields.", record.getRecordNumber());
return;
}

String conceptPath = CSVParserUtil.parseConceptPath(record, rollupEnabled);
String numericValue = record.get(CSVParserUtil.NUMERIC_VALUE);
boolean isAlpha = (numericValue == null || numericValue.isEmpty());
String value = isAlpha ? record.get(CSVParserUtil.TEXT_VALUE) : numericValue;
currentConcept[0] = getPhenoCube(currentConcept[0], conceptPath, isAlpha);

if (value != null && !value.trim().isEmpty() &&
((isAlpha && currentConcept[0].vType == String.class) || (!isAlpha && currentConcept[0].vType == Double.class))) {
value = value.trim();
currentConcept[0].setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES);
int patientId = Integer.parseInt(record.get(CSVParserUtil.PATIENT_NUM));
Date date = null;
if (record.size() > 4 && record.get(CSVParserUtil.DATETIME) != null && !record.get(CSVParserUtil.DATETIME).isEmpty()) {
date = new Date(Long.parseLong(record.get(CSVParserUtil.DATETIME)));
}
currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date);
store.allIds.add(patientId);
}
}

private PhenoCube getPhenoCube(PhenoCube currentConcept, String conceptPath, boolean isAlpha) {
if (currentConcept == null || !currentConcept.name.equals(conceptPath)) {
currentConcept = store.store.getIfPresent(conceptPath);
if (currentConcept == null) {
log.info("Writing - " + conceptPath);
// safe to invalidate and write store?
store.store.invalidateAll(); // force onremoval to free up cache per concept
store.store.cleanUp();
currentConcept = new PhenoCube(conceptPath, isAlpha ? String.class : Double.class);
store.store.put(conceptPath, currentConcept);
}
}
return currentConcept;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package edu.harvard.hms.dbmi.avillach.hpds.etl.phenotype.csv;


import edu.harvard.hms.dbmi.avillach.hpds.crypto.Crypto;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube;
import edu.harvard.hms.dbmi.avillach.hpds.etl.LoadingStore;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.*;
import java.util.Date;

@Service
public class CSVLoaderService {

private static final Logger log = LoggerFactory.getLogger(CSVLoaderService.class);
private final LoadingStore store = new LoadingStore();

@Value("${etl.hpds.directory:/opt/local/hpds/}")
private String hpdsDirectory;

@Value("${etl.rollup.enabled:true}")
private boolean rollupEnabled;

public void runEtlProcess() throws IOException {
log.info("Starting ETL process... Rollup Enabled: {}", rollupEnabled);

store.allObservationsStore = new RandomAccessFile(hpdsDirectory + "allObservationsStore.javabin", "rw");
initialLoad();
store.saveStore(hpdsDirectory);

log.info("ETL process completed.");
}

private void initialLoad() throws IOException {
Crypto.loadDefaultKey();
Reader in = new FileReader(hpdsDirectory + "allConcepts.csv");
Iterable<CSVRecord> records = CSVFormat.DEFAULT
.withFirstRecordAsHeader()
.withSkipHeaderRecord(true)
.parse(new BufferedReader(in, 256 * 1024));

final PhenoCube[] currentConcept = new PhenoCube[1];
for (CSVRecord record : records) {
processRecord(currentConcept, record);
}

}

private void processRecord(final PhenoCube[] currentConcept, CSVRecord record) {
if (record.size() < 4) {
log.warn("Skipping record #{} due to missing fields.", record.getRecordNumber());
return;
}

String conceptPath = CSVParserUtil.parseConceptPath(record, rollupEnabled);
String numericValue = record.get(CSVParserUtil.NUMERIC_VALUE);
boolean isAlpha = (numericValue == null || numericValue.isEmpty());
String value = isAlpha ? record.get(CSVParserUtil.TEXT_VALUE) : numericValue;
currentConcept[0] = getPhenoCube(currentConcept[0], conceptPath, isAlpha);

if (value != null && !value.trim().isEmpty() &&
((isAlpha && currentConcept[0].vType == String.class) || (!isAlpha && currentConcept[0].vType == Double.class))) {
value = value.trim();
currentConcept[0].setColumnWidth(isAlpha ? Math.max(currentConcept[0].getColumnWidth(), value.getBytes().length) : Double.BYTES);
int patientId = Integer.parseInt(record.get(CSVParserUtil.PATIENT_NUM));
Date date = null;
if (record.size() > 4 && record.get(CSVParserUtil.DATETIME) != null && !record.get(CSVParserUtil.DATETIME).isEmpty()) {
date = new Date(Long.parseLong(record.get(CSVParserUtil.DATETIME)));
}
currentConcept[0].add(patientId, isAlpha ? value : Double.parseDouble(value), date);
store.allIds.add(patientId);
}
}

private PhenoCube getPhenoCube(PhenoCube currentConcept, String conceptPath, boolean isAlpha) {
if (currentConcept == null || !currentConcept.name.equals(conceptPath)) {
currentConcept = store.store.getIfPresent(conceptPath);
if (currentConcept == null) {
log.info("Writing - " + conceptPath);
// safe to invalidate and write store?
store.store.invalidateAll(); // force onremoval to free up cache per concept
store.store.cleanUp();
currentConcept = new PhenoCube(conceptPath, isAlpha ? String.class : Double.class);
store.store.put(conceptPath, currentConcept);
}
}
return currentConcept;
}
}

0 comments on commit 805d2e0

Please sign in to comment.