Skip to content

Commit

Permalink
ALS-6511: Fix avro writer configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Jul 19, 2024
1 parent c6bdaf7 commit 0175c75
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -168,16 +171,11 @@ public HpdsProcessor getProcessor() {
return processor;
}

public AsyncResult setProcessor(HpdsProcessor processor) {
this.processor = processor;
return this;
}

public AsyncResult(Query query, String[] headerRow) {
public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) {
this.query = query;
this.headerRow = headerRow;
this.headerRow = processor.getHeaderRow(query);
try {
stream = new ResultStoreStream(headerRow);
stream = new ResultStoreStream(headerRow, writer);
} catch (IOException e) {
log.error("Exception creating result stream", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ public class ResultStoreStream extends InputStream {
private int numRows;
private String[] originalHeader;

public ResultStoreStream(String[] header) throws IOException {
tempFile = File.createTempFile("result-"+ System.nanoTime(), ".sstmp");
writer = new CsvWriter(tempFile);
public ResultStoreStream(String[] header, ResultWriter writer) throws IOException {
this.writer = writer;
this.originalHeader = header;
writeHeader(this.originalHeader);
numRows = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package edu.harvard.hms.dbmi.avillach.hpds.service;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,30 +115,36 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException,

HpdsProcessor p;
switch(query.getExpectedResultType()) {
case DATAFRAME :
case SECRET_ADMIN_DATAFRAME:
p = queryProcessor;
break;
case DATAFRAME_TIMESERIES :
p = timeseriesProcessor;
break;
case COUNT :
case CATEGORICAL_CROSS_COUNT :
case CONTINUOUS_CROSS_COUNT :
p = countProcessor;
break;
case DATAFRAME_PFB:
p = pfbProcessor;
break;
default :
throw new RuntimeException("UNSUPPORTED RESULT TYPE");
case DATAFRAME :
case SECRET_ADMIN_DATAFRAME:
p = queryProcessor;
break;
case DATAFRAME_TIMESERIES :
p = timeseriesProcessor;
break;
case COUNT :
case CATEGORICAL_CROSS_COUNT :
case CONTINUOUS_CROSS_COUNT :
p = countProcessor;
break;
case DATAFRAME_PFB:
p = pfbProcessor;
break;
default :
throw new RuntimeException("UNSUPPORTED RESULT TYPE");
}

AsyncResult result = new AsyncResult(query, p.getHeaderRow(query))

ResultWriter writer;
if (ResultType.DATAFRAME_PFB.equals(query.getExpectedResultType())) {
writer = new CsvWriter(File.createTempFile("result-" + query.getId(), ".avro"));
} else {
writer = new CsvWriter(File.createTempFile("result-" + System.nanoTime(), ".sstmp"));
}

AsyncResult result = new AsyncResult(query, p, writer)
.setStatus(AsyncResult.Status.PENDING)
.setQueuedTime(System.currentTimeMillis())
.setId(UUIDv5.UUIDFromString(query.toString()).toString())
.setProcessor(p);
.setId(UUIDv5.UUIDFromString(query.toString()).toString());
query.setId(result.getId());
results.put(result.getId(), result);
return result;
Expand Down

0 comments on commit 0175c75

Please sign in to comment.