Skip to content

Commit

Permalink
review comments -1
Browse files Browse the repository at this point in the history
  • Loading branch information
minurajeeve committed Jan 2, 2024
1 parent 6c686ae commit d07f02d
Showing 1 changed file with 77 additions and 64 deletions.
141 changes: 77 additions & 64 deletions wrangler-core/src/main/java/io/cdap/directives/column/SetType.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@
import java.util.stream.Collectors;

/**
* A Wrangler step for converting data type of column
* Accepted types are: int, short, long, double, float, string, boolean and bytes
* When decimal type is selected, can also specify the scale, precision and rounding mode
* A Wrangler step for converting data type of column Accepted types are: int, short, long, double,
* float, string, boolean and bytes When decimal type is selected, can also specify the scale,
* precision and rounding mode
*/
@Plugin(type = "directives")
@Name(SetType.NAME)
@Categories(categories = {"column"})
@Description("Converting data type of a column. Optional arguments scale, precision and "
+ "rounding-mode are used only when type is decimal.")
public final class SetType implements Directive, Lineage {

public static final String NAME = "set-type";

private String col;
Expand All @@ -72,7 +73,8 @@ public UsageDefinition define() {
builder.define("type", TokenType.IDENTIFIER);
builder.define("scale", TokenType.NUMERIC, Optional.TRUE);
builder.define("rounding-mode", TokenType.TEXT, Optional.TRUE);
builder.define("precision", TokenType.PROPERTIES, "prop:{precision=<precision>}", Optional.TRUE);
builder.define("precision", TokenType.PROPERTIES, "prop:{precision=<precision>}",
Optional.TRUE);
return builder.build();
}

Expand All @@ -88,15 +90,18 @@ public void initialize(Arguments args) throws DirectiveParseException {
}
scale = args.contains("scale") ? ((Numeric) args.value("scale")).value().intValue() : null;
if (scale == null && precision == null && args.contains("rounding-mode")) {
throw new DirectiveParseException("'rounding-mode' can only be specified when a 'scale' or 'precision' is set");
throw new DirectiveParseException(
"'rounding-mode' can only be specified when a 'scale' or 'precision' is set");
}
try {
roundingMode = args.contains("rounding-mode") ?
RoundingMode.valueOf(((Text) args.value("rounding-mode")).value()) :
(scale == null && precision == null ? RoundingMode.UNNECESSARY : RoundingMode.HALF_EVEN);
RoundingMode.valueOf(((Text) args.value("rounding-mode")).value()) :
(scale == null && precision == null ? RoundingMode.UNNECESSARY
: RoundingMode.HALF_EVEN);
} catch (IllegalArgumentException e) {
throw new DirectiveParseException(String.format(
"Specified rounding-mode '%s' is not a valid Java rounding mode", args.value("rounding-mode").value()), e);
"Specified rounding-mode '%s' is not a valid Java rounding mode",
args.value("rounding-mode").value()), e);
}
}
}
Expand All @@ -107,7 +112,8 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context)
throws DirectiveExecutionException {
for (Row row : rows) {
ColumnConverter.convertType(NAME, row, col, type, scale, precision, roundingMode);
}
Expand All @@ -117,76 +123,83 @@ public List<Row> execute(List<Row> rows, ExecutorContext context) throws Directi
@Override
public Mutation lineage() {
return Mutation.builder()
.readable("Changed the column '%s' to type '%s'", col, type)
.relation(col, col)
.build();
.readable("Changed the column '%s' to type '%s'", col, type)
.relation(col, col)
.build();
}

@Override
public Schema getOutputSchema(SchemaResolutionContext context) {
Schema inputSchema = context.getInputSchema();
return Schema.recordOf(
"outputSchema",
inputSchema.getFields().stream()
.map(
field -> {
try {
if (field.getName().equals(col)) {
Integer outputScale = scale;
Integer outputPrecision = precision;
if (type.equalsIgnoreCase("decimal") && field.getSchema().isNullable()) {
Schema fieldSchema = field.getSchema().getNonNullable();
Pair<Integer, Integer> scaleAndPrecision = getPrecisionAndScale(fieldSchema);
Integer inputSchemaScale = scaleAndPrecision.getSecond();
Integer inputSchemaPrecision = scaleAndPrecision.getFirst();

if (scale == null && precision == null) {
outputScale = inputSchemaScale;
outputPrecision = inputSchemaPrecision;
} else if (scale == null && inputSchemaScale != null) {
if (precision - inputSchemaScale < 1) {
throw new DirectiveParseException(String.format(
"Cannot set scale as '%s' and precision as '%s' when "
+ "given precision - scale is less than 1 ", inputSchemaScale,
precision));
}
outputScale = inputSchemaScale;
outputPrecision = precision;

} else if (precision == null && inputSchemaPrecision != null) {
if (inputSchemaPrecision - scale < 1) {
throw new DirectiveParseException(String.format(
"Cannot set scale as '%s' and precision as '%s' when "
+ "given precision - scale is less than 1 ", scale,
inputSchemaPrecision));
"outputSchema",
inputSchema.getFields().stream()
.map(
field -> {
try {
if (field.getName().equals(col)) {
Integer outputScale = scale;
Integer outputPrecision = precision;
if (type.equalsIgnoreCase("decimal") && field.getSchema().isNullable()) {
Schema fieldSchema = field.getSchema().getNonNullable();
Pair<Integer, Integer> scaleAndPrecision = getValidatedPrecisionAndScale(
fieldSchema, precision, scale);
outputScale = scaleAndPrecision.getSecond();
outputPrecision = scaleAndPrecision.getFirst();
}
return Schema.Field.of(col, ColumnConverter.getSchemaForType(type,
outputScale, outputPrecision));
}
outputScale = scale;
outputPrecision = inputSchemaPrecision;
return field;
} catch (DirectiveParseException e) {
throw new RuntimeException(e);
}
}
return Schema.Field.of(col, ColumnConverter.getSchemaForType(type,
outputScale, outputPrecision));
}
return field;
} catch (DirectiveParseException e) {
throw new RuntimeException(e);
}
}
)
.collect(Collectors.toList())
)
.collect(Collectors.toList())
);
}

/**
* extracts precision and scale from schema string
*/
public static Pair<Integer, Integer> getPrecisionAndScale(Schema fieldSchema) {
Integer precision = null;
Integer scale = null;
if (fieldSchema.getLogicalType() == LogicalType.DECIMAL) {
precision = fieldSchema.getPrecision();
scale = fieldSchema.getScale();
public static Pair<Integer, Integer> getValidatedPrecisionAndScale(Schema fieldSchema,
Integer precision, Integer scale)
throws DirectiveParseException { //check precision and scale
Integer outputPrecision = precision;
Integer outputScale = scale;
Integer inputSchemaPrecision = null;
Integer inputSchemaScale = null;

if (fieldSchema.getLogicalType() == LogicalType.DECIMAL) {
inputSchemaPrecision = fieldSchema.getPrecision();
inputSchemaScale = fieldSchema.getScale();
}

if (scale == null && precision == null) {
outputScale = inputSchemaScale;
outputPrecision = inputSchemaPrecision;
} else if (scale == null && inputSchemaScale != null) {
if (precision - inputSchemaScale < 1) {
throw new DirectiveParseException(String.format(
"Cannot set scale as '%s' and precision as '%s' when "
+ "given precision - scale is less than 1 ", inputSchemaScale,
precision));
}
return new Pair<Integer, Integer>(precision, scale);
outputScale = inputSchemaScale;
outputPrecision = precision;

} else if (precision == null && inputSchemaPrecision != null) {
if (inputSchemaPrecision - scale < 1) {
throw new DirectiveParseException(String.format(
"Cannot set scale as '%s' and precision as '%s' when "
+ "given precision - scale is less than 1 ", scale,
inputSchemaPrecision));
}
outputScale = scale;
outputPrecision = inputSchemaPrecision;
}

return new Pair<Integer, Integer>(outputPrecision, outputScale);
}
}

0 comments on commit d07f02d

Please sign in to comment.