Skip to content

Commit

Permalink
Merge pull request #1794 from cloudsufi/CSV_Multiline_support
Browse files Browse the repository at this point in the history
Changes for SFTP multiline support for csv
  • Loading branch information
albertshau authored Jul 11, 2023
2 parents 316ea12 + 7fe87ef commit 92e9cbe
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public static String[] setColumnNames(String rawLine, boolean skipHeader,
if (skipHeader) {
// need to check enableQuotedValues to remove quotes on headers
if (enableQuotedValues) {
Iterator<String> splitsIterator = new SplitQuotesIterator(rawLine, delimiter);
// RecordReader and enable multiline support is set to null and false for header values.
Iterator<String> splitsIterator = new SplitQuotesIterator(rawLine, delimiter, null, false);
List<String> tempHeaders = new ArrayList<String>();
while (splitsIterator.hasNext()) {
tempHeaders.add(splitsIterator.next());
Expand Down Expand Up @@ -155,14 +156,15 @@ public static void validateSchemaFieldNames(String[] fieldNames) {
* Cleans an array of field names to make sure they comply with avro field naming standard.
* It also makes sure each name is unique in the list.
* Field names can start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_].
*
* <p>
* Steps:
* 1) Trim surrounding spaces
* 2) If its empty replace it with BLANK
* 3) If it starts with a number, prepend "col_"
* 4) Replace invalid characters with "_" (multiple invalid characters gets replaced with one symbol)
* 5) Check if the name has been found before (without considering case)
* if so add _# where # is the number of times seen before + 1
* if so add _# where # is the number of times seen before + 1
*
* @param fieldNames an array of field names to be cleaned
*/
private static String[] cleanSchemaFieldNames(String[] fieldNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
import io.cdap.plugin.format.input.PathTrackingConfig;
import io.cdap.plugin.format.input.PathTrackingInputFormatProvider;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -78,6 +79,11 @@ protected void addFormatProperties(Map<String, String> properties) {
properties.put(PathTrackingDelimitedInputFormat.DELIMITER, ",");
properties.put(PathTrackingDelimitedInputFormat.SKIP_HEADER, String.valueOf(conf.getSkipHeader()));
properties.put(PathTrackingDelimitedInputFormat.ENABLE_QUOTES_VALUE, String.valueOf(conf.getEnableQuotedValues()));
properties.put(PathTrackingDelimitedInputFormat.ENABLE_MULTILINE_SUPPORT,
String.valueOf(conf.getEnableMultilineSupport()));
if (conf.getEnableMultilineSupport()) {
properties.put(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE));
}
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,45 @@ public class DelimitedConfig extends PathTrackingConfig {
public static final String NAME_ENABLE_QUOTES_VALUES = "enableQuotedValues";
public static final String NAME_OVERRIDE = "override";
public static final String NAME_SAMPLE_SIZE = "sampleSize";
public static final String NAME_ENABLE_MULTILINE_SUPPORT = "enableMultilineSupport";
public static final Map<String, PluginPropertyField> DELIMITED_FIELDS;

// description
public static final String DESC_ENABLE_QUOTES =
"Whether to treat content between quotes as a value. The default value is false.";
public static final String DESC_SKIP_HEADER =
"Whether to skip the first line of each file. The default value is false.";
public static final String DESC_ENABLE_MULTILINE =
"Whether to support content spread over multiple lines if it is between quotes. The default value is false";

static {
Map<String, PluginPropertyField> fields = new HashMap<>(FIELDS);
fields.put("skipHeader", new PluginPropertyField("skipHeader", DESC_SKIP_HEADER, "boolean", false, true));
fields.put(NAME_ENABLE_QUOTES_VALUES,
new PluginPropertyField(NAME_ENABLE_QUOTES_VALUES, DESC_ENABLE_QUOTES, "boolean", false, true));
new PluginPropertyField(NAME_ENABLE_QUOTES_VALUES, DESC_ENABLE_QUOTES, "boolean", false, true));
fields.put(NAME_ENABLE_MULTILINE_SUPPORT,
new PluginPropertyField(NAME_ENABLE_MULTILINE_SUPPORT, DESC_ENABLE_MULTILINE, "boolean", false, true));
DELIMITED_FIELDS = Collections.unmodifiableMap(fields);
}

@Macro
@Nullable
@Description(DESC_SKIP_HEADER)
private Boolean skipHeader;
@Description(DESC_ENABLE_QUOTES)
protected Boolean enableQuotedValues;

@Macro
@Nullable
@Description(DESC_ENABLE_QUOTES)
protected Boolean enableQuotedValues;
@Description(DESC_ENABLE_MULTILINE)
protected Boolean enableMultilineSupport;

@Macro
@Nullable
@Description(DESC_SKIP_HEADER)
private Boolean skipHeader;

public DelimitedConfig() {
super();
}

public boolean getSkipHeader() {
return skipHeader != null && skipHeader;
Expand All @@ -73,12 +87,12 @@ public boolean getEnableQuotedValues() {
return enableQuotedValues != null && enableQuotedValues;
}

public long getSampleSize() {
return Long.parseLong(getProperties().getProperties().getOrDefault(NAME_SAMPLE_SIZE, "1000"));
public Boolean getEnableMultilineSupport() {
return enableMultilineSupport != null && enableMultilineSupport;
}

public DelimitedConfig() {
super();
public long getSampleSize() {
return Long.parseLong(getProperties().getProperties().getOrDefault(NAME_SAMPLE_SIZE, "1000"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.cdap.plugin.format.delimited.common.DataTypeDetectorUtils;
import io.cdap.plugin.format.input.PathTrackingConfig;
import io.cdap.plugin.format.input.PathTrackingInputFormatProvider;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -49,9 +50,9 @@
@Name(DelimitedInputFormatProvider.NAME)
@Description(DelimitedInputFormatProvider.DESC)
public class DelimitedInputFormatProvider extends PathTrackingInputFormatProvider<DelimitedInputFormatProvider.Conf> {
public static final PluginClass PLUGIN_CLASS = getPluginClass();
static final String NAME = "delimited";
static final String DESC = "Plugin for reading files in delimited format.";
public static final PluginClass PLUGIN_CLASS = getPluginClass();
private final Conf conf;

public DelimitedInputFormatProvider(Conf conf) {
Expand All @@ -77,8 +78,8 @@ public void validate(FormatContext context) {
FailureCollector collector = context.getFailureCollector();
if (!conf.containsMacro(PathTrackingConfig.NAME_SCHEMA) && schema == null && context.getInputSchema() == null) {
collector.addFailure(
"Delimited format cannot be used without specifying a schema.",
"Schema must be specified.")
"Delimited format cannot be used without specifying a schema.",
"Schema must be specified.")
.withConfigProperty("schema");
}

Expand All @@ -88,8 +89,8 @@ public void validate(FormatContext context) {

if (conf.getEnableQuotedValues() && conf.delimiter != null && conf.delimiter.contains("\"")) {
collector.addFailure(
"The delimiter %s cannot contain \" when quoted values are enabled.",
"Check the delimiter.")
"The delimiter %s cannot contain \" when quoted values are enabled.",
"Check the delimiter.")
.withConfigProperty("delimiter");
}
}
Expand All @@ -99,6 +100,11 @@ protected void addFormatProperties(Map<String, String> properties) {
properties.put(PathTrackingDelimitedInputFormat.DELIMITER, conf.delimiter == null ? "," : conf.delimiter);
properties.put(PathTrackingDelimitedInputFormat.SKIP_HEADER, String.valueOf(conf.getSkipHeader()));
properties.put(PathTrackingDelimitedInputFormat.ENABLE_QUOTES_VALUE, String.valueOf(conf.getEnableQuotedValues()));
properties.put(PathTrackingDelimitedInputFormat.ENABLE_MULTILINE_SUPPORT,
String.valueOf(conf.getEnableMultilineSupport()));
if (conf.getEnableMultilineSupport()) {
properties.put(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE));
}
}

@Nullable
Expand Down Expand Up @@ -168,6 +174,6 @@ private static PluginClass getPluginClass() {
Map<String, PluginPropertyField> properties = new HashMap<>(DelimitedConfig.DELIMITED_FIELDS);
properties.put("delimiter", new PluginPropertyField("delimiter", Conf.DELIMITER_DESC, "string", false, true));
return new PluginClass(ValidatingInputFormat.PLUGIN_TYPE, NAME, DESC, DelimitedInputFormatProvider.class.getName(),
"conf", properties);
"conf", properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@

package io.cdap.plugin.format.delimited.input;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.AbstractIterator;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.plugin.common.SchemaValidator;
import io.cdap.plugin.format.delimited.common.DelimitedStructuredRecordStringConverter;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
import org.apache.hadoop.io.LongWritable;
Expand All @@ -32,6 +28,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
import java.util.Iterator;
import javax.annotation.Nullable;
Expand All @@ -43,21 +40,24 @@ public class PathTrackingDelimitedInputFormat extends PathTrackingInputFormat {
static final String DELIMITER = "delimiter";
static final String ENABLE_QUOTES_VALUE = "enable_quotes_value";
static final String SKIP_HEADER = "skip_header";
static final String ENABLE_MULTILINE_SUPPORT = "enable_multiline_support";

private static final String QUOTE = "\"";

@Override
protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReader(FileSplit split,
TaskAttemptContext context,
@Nullable String pathField,
@Nullable Schema schema) {
TaskAttemptContext context,
@Nullable String pathField,
@Nullable Schema schema) {

RecordReader<LongWritable, Text> delegate = getDefaultRecordReaderDelegate(split, context);
String delimiter = context.getConfiguration().get(DELIMITER);
boolean skipHeader = context.getConfiguration().getBoolean(SKIP_HEADER, false);
boolean enableQuotesValue = context.getConfiguration().getBoolean(ENABLE_QUOTES_VALUE, false);
boolean enableMultilineSupport = context.getConfiguration().getBoolean(ENABLE_MULTILINE_SUPPORT, false);

return new RecordReader<NullWritable, StructuredRecord.Builder>() {
StructuredRecord.Builder builder = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Expand All @@ -69,66 +69,72 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
if (delegate.nextKeyValue()) {
// skip to next if the current record is header
if (skipHeader && delegate.getCurrentKey().get() == 0L) {
return delegate.nextKeyValue();
if (!delegate.nextKeyValue()) {
return false;
}
}
// this logic must be in nextKeyValue to prevent multiple calls to getCurrentValue
// from advancing the delegate reader
String delimitedString = delegate.getCurrentValue().toString();
builder = StructuredRecord.builder(schema);
Iterator<Schema.Field> fields = schema.getFields().iterator();
Iterator<String> splitsIterator = getSplitsIterator(enableQuotesValue, delimitedString, delimiter);
int dataFieldsCount = 0;
while (splitsIterator.hasNext()) {
dataFieldsCount++;
String part = splitsIterator.next();
if (!fields.hasNext()) {
while (splitsIterator.hasNext()) {
splitsIterator.next();
dataFieldsCount++;
}
handleImproperString(delimitedString.contains(QUOTE), dataFieldsCount);
}

Schema.Field nextField = fields.next();
DelimitedStructuredRecordStringConverter.parseAndSetFieldValue(builder, nextField, part);
}
return true;
}
return false;
}

private void handleImproperString(boolean containsQuote, int numDataFields) throws IOException {
int numSchemaFields = schema.getFields().size();
String message =
String.format(
"Found a row with %d fields when the schema only contains %d field%s.",
numDataFields, numSchemaFields, numSchemaFields == 1 ? "" : "s");
// special error handling for the case when the user most likely set the schema to delimited
// when they meant to use 'text'.
Schema.Field bodyField = schema.getField("body");
if (bodyField != null) {
Schema bodySchema = bodyField.getSchema();
bodySchema = bodySchema.isNullable() ? bodySchema.getNonNullable() : bodySchema;
if (bodySchema.getType() == Schema.Type.STRING) {
throw new IOException(message + " Did you mean to use the 'text' format?");
}
}
if (!enableQuotesValue && containsQuote) {
message += " Check if quoted values should be allowed.";
}
throw new IOException(
message + " Check that the schema contains the right number of fields.");
}

@Override
public NullWritable getCurrentKey() {
return NullWritable.get();
}

@Override
public StructuredRecord.Builder getCurrentValue() throws IOException, InterruptedException {
String delimitedString = delegate.getCurrentValue().toString();

StructuredRecord.Builder builder = StructuredRecord.builder(schema);
Iterator<Schema.Field> fields = schema.getFields().iterator();
Iterator<String> splitsIterator = getSplitsIterator(enableQuotesValue, delimitedString, delimiter);

while (splitsIterator.hasNext()) {
String part = splitsIterator.next();
if (!fields.hasNext()) {
int numDataFields = 0;
splitsIterator = getSplitsIterator(enableQuotesValue, delimitedString, delimiter);
while (splitsIterator.hasNext()) {
splitsIterator.next();
numDataFields++;
}
int numSchemaFields = schema.getFields().size();
String message =
String.format(
"Found a row with %d fields when the schema only contains %d field%s.",
numDataFields, numSchemaFields, numSchemaFields == 1 ? "" : "s");
// special error handling for the case when the user most likely set the schema to delimited
// when they meant to use 'text'.
Schema.Field bodyField = schema.getField("body");
if (bodyField != null) {
Schema bodySchema = bodyField.getSchema();
bodySchema = bodySchema.isNullable() ? bodySchema.getNonNullable() : bodySchema;
if (bodySchema.getType() == Schema.Type.STRING) {
throw new IOException(message + " Did you mean to use the 'text' format?");
}
}
if (!enableQuotesValue && delimitedString.contains(QUOTE)) {
message += " Check if quoted values should be allowed.";
}
throw new IOException(
message + " Check that the schema contains the right number of fields.");
}

Schema.Field nextField = fields.next();
DelimitedStructuredRecordStringConverter.parseAndSetFieldValue(builder, nextField, part);
}
return builder;
}

private Iterator<String> getSplitsIterator(boolean enableQuotesValue, String delimitedString, String delimiter) {
if (enableQuotesValue) {
return new SplitQuotesIterator(delimitedString, delimiter);
return new SplitQuotesIterator(delimitedString, delimiter, delegate, enableMultilineSupport);
} else {
return Splitter.on(delimiter).split(delimitedString).iterator();
}
Expand Down
Loading

0 comments on commit 92e9cbe

Please sign in to comment.