diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java index 3b5754254..baf813bc0 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java @@ -47,9 +47,9 @@ import java.util.stream.Collectors; /** - * A Wrangler step for converting data type of column - * Accepted types are: int, short, long, double, float, string, boolean and bytes - * When decimal type is selected, can also specify the scale, precision and rounding mode + * A Wrangler step for converting data type of column Accepted types are: int, short, long, double, + * float, string, boolean and bytes When decimal type is selected, can also specify the scale, + * precision and rounding mode */ @Plugin(type = "directives") @Name(SetType.NAME) @@ -57,6 +57,7 @@ @Description("Converting data type of a column. Optional arguments scale, precision and " + "rounding-mode are used only when type is decimal.") public final class SetType implements Directive, Lineage { + public static final String NAME = "set-type"; private String col; @@ -72,7 +73,8 @@ public UsageDefinition define() { builder.define("type", TokenType.IDENTIFIER); builder.define("scale", TokenType.NUMERIC, Optional.TRUE); builder.define("rounding-mode", TokenType.TEXT, Optional.TRUE); - builder.define("precision", TokenType.PROPERTIES, "prop:{precision=}", Optional.TRUE); + builder.define("precision", TokenType.PROPERTIES, "prop:{precision=}", + Optional.TRUE); return builder.build(); } @@ -88,15 +90,18 @@ public void initialize(Arguments args) throws DirectiveParseException { } scale = args.contains("scale") ? ((Numeric) args.value("scale")).value().intValue() : null; if (scale == null && precision == null && args.contains("rounding-mode")) { - throw new DirectiveParseException("'rounding-mode' can only be specified when a 'scale' or 'precision' is set"); + throw new DirectiveParseException( + "'rounding-mode' can only be specified when a 'scale' or 'precision' is set"); } try { roundingMode = args.contains("rounding-mode") ? - RoundingMode.valueOf(((Text) args.value("rounding-mode")).value()) : - (scale == null && precision == null ? RoundingMode.UNNECESSARY : RoundingMode.HALF_EVEN); + RoundingMode.valueOf(((Text) args.value("rounding-mode")).value()) : + (scale == null && precision == null ? RoundingMode.UNNECESSARY + : RoundingMode.HALF_EVEN); } catch (IllegalArgumentException e) { throw new DirectiveParseException(String.format( - "Specified rounding-mode '%s' is not a valid Java rounding mode", args.value("rounding-mode").value()), e); + "Specified rounding-mode '%s' is not a valid Java rounding mode", + args.value("rounding-mode").value()), e); } } } @@ -107,7 +112,8 @@ public void destroy() { } @Override - public List execute(List rows, ExecutorContext context) throws DirectiveExecutionException { + public List execute(List rows, ExecutorContext context) + throws DirectiveExecutionException { for (Row row : rows) { ColumnConverter.convertType(NAME, row, col, type, scale, precision, roundingMode); } @@ -117,76 +123,83 @@ public List execute(List rows, ExecutorContext context) throws Directi @Override public Mutation lineage() { return Mutation.builder() - .readable("Changed the column '%s' to type '%s'", col, type) - .relation(col, col) - .build(); + .readable("Changed the column '%s' to type '%s'", col, type) + .relation(col, col) + .build(); } @Override public Schema getOutputSchema(SchemaResolutionContext context) { Schema inputSchema = context.getInputSchema(); return Schema.recordOf( - "outputSchema", - inputSchema.getFields().stream() - .map( - field -> { - try { - if (field.getName().equals(col)) { - Integer outputScale = scale; - Integer outputPrecision = precision; - if (type.equalsIgnoreCase("decimal") && field.getSchema().isNullable()) { - Schema fieldSchema = field.getSchema().getNonNullable(); - Pair scaleAndPrecision = getPrecisionAndScale(fieldSchema); - Integer inputSchemaScale = scaleAndPrecision.getSecond(); - Integer inputSchemaPrecision = scaleAndPrecision.getFirst(); - - if (scale == null && precision == null) { - outputScale = inputSchemaScale; - outputPrecision = inputSchemaPrecision; - } else if (scale == null && inputSchemaScale != null) { - if (precision - inputSchemaScale < 1) { - throw new DirectiveParseException(String.format( - "Cannot set scale as '%s' and precision as '%s' when " - + "given precision - scale is less than 1 ", inputSchemaScale, - precision)); - } - outputScale = inputSchemaScale; - outputPrecision = precision; - - } else if (precision == null && inputSchemaPrecision != null) { - if (inputSchemaPrecision - scale < 1) { - throw new DirectiveParseException(String.format( - "Cannot set scale as '%s' and precision as '%s' when " - + "given precision - scale is less than 1 ", scale, - inputSchemaPrecision)); + "outputSchema", + inputSchema.getFields().stream() + .map( + field -> { + try { + if (field.getName().equals(col)) { + Integer outputScale = scale; + Integer outputPrecision = precision; + if (type.equalsIgnoreCase("decimal") && field.getSchema().isNullable()) { + Schema fieldSchema = field.getSchema().getNonNullable(); + Pair scaleAndPrecision = getValidatedPrecisionAndScale( + fieldSchema, precision, scale); + outputScale = scaleAndPrecision.getSecond(); + outputPrecision = scaleAndPrecision.getFirst(); + } + return Schema.Field.of(col, ColumnConverter.getSchemaForType(type, + outputScale, outputPrecision)); } - outputScale = scale; - outputPrecision = inputSchemaPrecision; + return field; + } catch (DirectiveParseException e) { + throw new RuntimeException(e); } } - return Schema.Field.of(col, ColumnConverter.getSchemaForType(type, - outputScale, outputPrecision)); - } - return field; - } catch (DirectiveParseException e) { - throw new RuntimeException(e); - } - } - ) - .collect(Collectors.toList()) + ) + .collect(Collectors.toList()) ); } /** * extracts precision and scale from schema string */ - public static Pair getPrecisionAndScale(Schema fieldSchema) { - Integer precision = null; - Integer scale = null; - if (fieldSchema.getLogicalType() == LogicalType.DECIMAL) { - precision = fieldSchema.getPrecision(); - scale = fieldSchema.getScale(); + public static Pair getValidatedPrecisionAndScale(Schema fieldSchema, + Integer precision, Integer scale) + throws DirectiveParseException { //check precision and scale + Integer outputPrecision = precision; + Integer outputScale = scale; + Integer inputSchemaPrecision = null; + Integer inputSchemaScale = null; + + if (fieldSchema.getLogicalType() == LogicalType.DECIMAL) { + inputSchemaPrecision = fieldSchema.getPrecision(); + inputSchemaScale = fieldSchema.getScale(); + } + + if (scale == null && precision == null) { + outputScale = inputSchemaScale; + outputPrecision = inputSchemaPrecision; + } else if (scale == null && inputSchemaScale != null) { + if (precision - inputSchemaScale < 1) { + throw new DirectiveParseException(String.format( + "Cannot set scale as '%s' and precision as '%s' when " + + "given precision - scale is less than 1 ", inputSchemaScale, + precision)); } - return new Pair(precision, scale); + outputScale = inputSchemaScale; + outputPrecision = precision; + + } else if (precision == null && inputSchemaPrecision != null) { + if (inputSchemaPrecision - scale < 1) { + throw new DirectiveParseException(String.format( + "Cannot set scale as '%s' and precision as '%s' when " + + "given precision - scale is less than 1 ", scale, + inputSchemaPrecision)); + } + outputScale = scale; + outputPrecision = inputSchemaPrecision; } + + return new Pair(outputPrecision, outputScale); + } }