diff --git a/wrangler-core/src/main/java/io/cdap/directives/parser/JsParser.java b/wrangler-core/src/main/java/io/cdap/directives/parser/JsParser.java index c28c91412..c05c47949 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/parser/JsParser.java +++ b/wrangler-core/src/main/java/io/cdap/directives/parser/JsParser.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017-2019 Cask Data, Inc. + * Copyright © 2017-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,11 +16,14 @@ package io.cdap.directives.parser; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; import com.google.gson.internal.LazilyParsedNumber; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -37,13 +40,14 @@ import io.cdap.wrangler.api.lineage.Lineage; import io.cdap.wrangler.api.lineage.Many; import io.cdap.wrangler.api.lineage.Mutation; +import io.cdap.wrangler.api.parser.Bool; import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.Numeric; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; import io.cdap.wrangler.dq.TypeInference; -import org.json.JSONException; +import java.lang.reflect.Type; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; @@ -67,14 +71,29 @@ public class JsParser implements Directive, Lineage { // Max depth to which the JSON needs to be parsed. private int depth; - // JSON parser. - private static final JsonParser parser = new JsonParser(); + // Ignore error in parsing for record and proceed. + private boolean ignoreError; + + // Gson parser with adapter to ensure integers are not represented as doubles. + private static final Gson parser = new GsonBuilder() + .setLenient() + .registerTypeAdapter(Double.class, new JsonSerializer() { + @Override + public JsonElement serialize(Double src, Type typeOfSrc, JsonSerializationContext context) { + if (src == src.longValue()) { + return new JsonPrimitive(src.longValue()); + } + return new JsonPrimitive(src.doubleValue()); + } + }) + .create(); @Override public UsageDefinition define() { UsageDefinition.Builder builder = UsageDefinition.builder(NAME); builder.define("column", TokenType.COLUMN_NAME); builder.define("depth", TokenType.NUMERIC, Optional.TRUE); + builder.define("ignore-error", TokenType.BOOLEAN, Optional.TRUE); return builder.build(); } @@ -86,6 +105,11 @@ public void initialize(Arguments args) throws DirectiveParseException { } else { this.depth = Integer.MAX_VALUE; } + if (args.contains("ignore-error")) { + this.ignoreError = ((Bool) args.value("ignore-error")).value(); + } else { + this.ignoreError = false; // backward compaibility. + } } @Override @@ -112,7 +136,7 @@ public List execute(List rows, ExecutorContext context) JsonElement element = null; if (value instanceof String) { String document = (String) value; - element = parser.parse(document.trim()); + element = parser.fromJson(document.trim(), JsonElement.class); } else if (value instanceof JsonObject || value instanceof JsonArray) { element = (JsonElement) value; } else { @@ -143,8 +167,19 @@ public List execute(List rows, ExecutorContext context) row.add(column, getValue(element.getAsJsonPrimitive())); } } - } catch (JSONException e) { - throw new ErrorRowException(NAME, e.getMessage(), 1); + } catch (Exception e) { + String msg = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); + // In case there are more rows being handled, we attempt to surface the Json that is + // causing an issue to make it easier for users to identify the problem quickly. + if (rows.size() > 1) { + msg = String.format("Incorrectly constructed json '%s', %s", value, e.getCause() != null ? + e.getCause().getMessage() : e.getMessage()); + } + + // If ignore error set, then don't throw exception, just move to next record. + if (!ignoreError) { + throw new ErrorRowException(NAME, msg, 1); + } } } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/parser/ParseAvroFile.java b/wrangler-core/src/main/java/io/cdap/directives/parser/ParseAvroFile.java index c59dfbb25..c8bc5f1c4 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/parser/ParseAvroFile.java +++ b/wrangler-core/src/main/java/io/cdap/directives/parser/ParseAvroFile.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017-2019 Cask Data, Inc. + * Copyright © 2017-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -54,6 +54,7 @@ @Description("parse-as-avro-file .") public class ParseAvroFile implements Directive, Lineage { public static final String NAME = "parse-as-avro-file"; + private static final int AVRO_FILE_SIZE_LIMIT = 5 * 1024 * 1024; private String column; private Gson gson; @@ -83,6 +84,15 @@ public List execute(List rows, final ExecutorContext context) throws D if (idx != -1) { Object object = row.getValue(idx); if (object instanceof byte[]) { + int size = ((byte[]) object).length; + if (size > AVRO_FILE_SIZE_LIMIT) { + throw new DirectiveExecutionException( + NAME, + String.format("Avro file greater than 5 MB are not currently supported by this directive " + + "(Current size : %d). Use File source connector with 'avro' as format to " + + "read large files.", size) + ); + } DataFileReader reader = null; try { reader = @@ -105,7 +115,11 @@ public List execute(List rows, final ExecutorContext context) throws D } } else { throw new DirectiveExecutionException( - NAME, String.format("Column '%s' is of invalid type. It should be of type 'byte array'.", column)); + NAME, String.format("Avro data file parsing directive requires '%s' to be a bytes field. " + + "Change type in input to bytes and make sure the format in File source is " + + "set as 'blob'. We recommend using Avro format in file source instead of using " + + "this directive", + column)); } } } diff --git a/wrangler-core/src/test/java/io/cdap/directives/parser/ParseAvroFileTest.java b/wrangler-core/src/test/java/io/cdap/directives/parser/ParseAvroFileTest.java index 1c999582d..059863551 100644 --- a/wrangler-core/src/test/java/io/cdap/directives/parser/ParseAvroFileTest.java +++ b/wrangler-core/src/test/java/io/cdap/directives/parser/ParseAvroFileTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2017-2019 Cask Data, Inc. + * Copyright © 2017-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -52,6 +52,40 @@ public void testParseAsAvroFile() throws Exception { Assert.assertEquals(1495194308245L, results.get(1688).getValue("timestamp")); } + @Test + public void testAvroWithJsonFieldParsingWithParseJsonIgnoreError() throws Exception { + InputStream stream = ParseAvroFileTest.class.getClassLoader().getResourceAsStream("bad.avro"); + byte[] data = IOUtils.toByteArray(stream); + + String[] directives = new String[] { + "parse-as-avro-file body", + "parse-as-json :message_contents 2 true" + }; + + List rows = new ArrayList<>(); + rows.add(new Row("body", data)); + + List results = TestingRig.execute(directives, rows); + Assert.assertEquals(6693, results.size()); // total 6670, 7 records are bad. + } + + @Test + public void testAvroWithJsonFieldParsingWithParseJsonIgnoreErrorFail() throws Exception { + InputStream stream = ParseAvroFileTest.class.getClassLoader().getResourceAsStream("bad.avro"); + byte[] data = IOUtils.toByteArray(stream); + + String[] directives = new String[] { + "parse-as-avro-file body", + "parse-as-json :message_contents 2" + }; + + List rows = new ArrayList<>(); + rows.add(new Row("body", data)); + + List results = TestingRig.execute(directives, rows); + Assert.assertEquals(0, results.size()); // total 6670, 7 records are bad. + } + @Test(expected = RecipeException.class) public void testIncorrectType() throws Exception { String[] directives = new String[] { diff --git a/wrangler-core/src/test/resources/bad.avro b/wrangler-core/src/test/resources/bad.avro new file mode 100644 index 000000000..fc0420698 Binary files /dev/null and b/wrangler-core/src/test/resources/bad.avro differ