Skip to content

Commit

Permalink
Add Streaming support for excel source
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Aug 13, 2024
1 parent 4f3fc8b commit 506d840
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 13 deletions.
24 changes: 22 additions & 2 deletions core-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,35 @@
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.12</version>
<version>5.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.11</version>
<version>5.2.5</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
<version>4.2.1</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>poi-shared-strings</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.batch.source;

import com.github.pjfanning.xlsx.StreamingReader;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -26,19 +27,24 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.poi.hssf.usermodel.HSSFDateUtil;
import org.apache.poi.EmptyFileException;
import org.apache.poi.poifs.filesystem.FileMagic;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DateUtil;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.ss.util.CellReference;
import org.apache.poi.util.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;


Expand All @@ -61,16 +67,23 @@ public class ExcelInputFormat extends TextInputFormat {
public static final String FILE_PATTERN = "filePattern";
public static final String SHEET = "sheet";
public static final String SHEET_VALUE = "sheetValue";
public static final String EXCEL_BYTE_ARRAY_MAX_OVERRIDE = "excel.byteArrayMaxOverride";
public static final int EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT = Integer.MAX_VALUE / 2;

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new ExcelRecordReader();
}

@Override
public boolean isSplitable(JobContext context, Path file) {
return false;
}

public static void setConfigurations(Job job, String filePattern, String sheet, boolean reprocess,
String sheetValue, String columnList, boolean skipFirstRow,
String terminateIfEmptyRow, String rowLimit, String ifErrorRecord,
String processedFiles) {
String processedFiles, int byteArrayMaxOverride) {

Configuration configuration = job.getConfiguration();
configuration.set(FILE_PATTERN, filePattern);
Expand All @@ -90,6 +103,7 @@ public static void setConfigurations(Job job, String filePattern, String sheet,

configuration.set(IF_ERROR_RECORD, ifErrorRecord);
configuration.set(PROCESSED_FILES, processedFiles);
configuration.set(EXCEL_BYTE_ARRAY_MAX_OVERRIDE, String.valueOf(byteArrayMaxOverride));
}


Expand Down Expand Up @@ -145,9 +159,34 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
String sheet = job.get(SHEET);
String sheetValue = job.get(SHEET_VALUE);

Sheet workSheet; // sheet can be used as common for XSSF and HSSF workbook
Sheet workSheet;
Workbook workbook;
boolean isStreaming = false;
try {
Workbook workbook = WorkbookFactory.create(fileIn);
// Use Magic Bytes to detect the file type
InputStream is = FileMagic.prepareToCheckMagic(fileIn);
byte[] emptyFileCheck = new byte[1];
is.mark(emptyFileCheck.length);
if (is.read(emptyFileCheck) < emptyFileCheck.length) {
throw new EmptyFileException();
}
is.reset();

final FileMagic fm = FileMagic.valueOf(is);
switch (fm) {
case OOXML:
workbook = StreamingReader.builder().rowCacheSize(10).open(is);
isStreaming = true;
break;
case OLE2:
// workaround for large files
IOUtils.setByteArrayMaxOverride(job.getInt(EXCEL_BYTE_ARRAY_MAX_OVERRIDE,
ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT));
workbook = WorkbookFactory.create(is);
break;
default:
throw new IOException("Can't open workbook - unsupported file type: " + fm);
}
if (sheet.equalsIgnoreCase(SHEET_NAME)) {
workSheet = workbook.getSheet(sheetValue);
} else {
Expand All @@ -157,7 +196,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e);
}

rowCount = job.getInt(ROWS_LIMIT, workSheet.getPhysicalNumberOfRows());
// As we cannot get the number of rows in a sheet while streaming.
// -1 is used as rowCount to indicate that all rows should be read.
rowCount = job.getInt(ROWS_LIMIT, isStreaming ? -1 : workSheet.getPhysicalNumberOfRows());
rows = workSheet.iterator();
lastRowNum = workSheet.getLastRowNum();
rowIdx = 0;
Expand All @@ -171,7 +212,7 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
public boolean nextKeyValue() {
if (!rows.hasNext() || rowCount == 0) {
return false;
}
Expand Down Expand Up @@ -200,18 +241,18 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
Cell cell = cellIterator.next();
String colName = CellReference.convertNumToColString(cell.getColumnIndex());
switch (cell.getCellType()) {
case Cell.CELL_TYPE_STRING:
case STRING:
sb.append(colName)
.append(COLUMN_SEPERATOR).append(cell.getStringCellValue()).append(CELL_SEPERATOR);
break;

case Cell.CELL_TYPE_BOOLEAN:
case BOOLEAN:
sb.append(colName)
.append(COLUMN_SEPERATOR).append(cell.getBooleanCellValue()).append(CELL_SEPERATOR);
break;

case Cell.CELL_TYPE_NUMERIC:
if (HSSFDateUtil.isCellDateFormatted(cell)) {
case NUMERIC:
if (DateUtil.isCellDateFormatted(cell)) {
sb.append(colName).append(COLUMN_SEPERATOR).append(cell.getDateCellValue()).append(CELL_SEPERATOR);
} else {
sb.append(colName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,16 @@ public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
processFiles = GSON.toJson(getAllProcessedFiles(batchSourceContext), ARRAYLIST_PREPROCESSED_FILES);
}

Map<String, String> arguments = new HashMap<>(batchSourceContext.getArguments().asMap());
int byteArrayMaxOverride = arguments.containsKey(ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE) ?
Integer.parseInt(arguments.get(ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE)) :
ExcelInputFormat.EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT;

ExcelInputFormat.setConfigurations(job, excelInputreaderConfig.filePattern, excelInputreaderConfig.sheet,
excelInputreaderConfig.reprocess, excelInputreaderConfig.sheetValue,
excelInputreaderConfig.columnList, excelInputreaderConfig.skipFirstRow,
excelInputreaderConfig.terminateIfEmptyRow, excelInputreaderConfig.rowsLimit,
excelInputreaderConfig.ifErrorRecord, processFiles);
excelInputreaderConfig.ifErrorRecord, processFiles, byteArrayMaxOverride);

// Sets the input path(s).
ExcelInputFormat.addInputPaths(job, excelInputreaderConfig.filePath);
Expand Down

0 comments on commit 506d840

Please sign in to comment.