diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java index 4860cd764..7ae784e63 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java @@ -16,20 +16,29 @@ package com.nvidia.spark.rapids.jni.kudo; -import ai.rapids.cudf.*; +import static com.nvidia.spark.rapids.jni.Preconditions.ensure; +import static java.util.Objects.requireNonNull; + +import ai.rapids.cudf.BufferType; +import ai.rapids.cudf.Cuda; +import ai.rapids.cudf.HostColumnVector; +import ai.rapids.cudf.JCudfSerialization; +import ai.rapids.cudf.Schema; +import ai.rapids.cudf.Table; import com.nvidia.spark.rapids.jni.Pair; import com.nvidia.spark.rapids.jni.schema.Visitors; - -import java.io.*; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Arrays; import java.util.List; import java.util.function.LongConsumer; import java.util.function.Supplier; import java.util.stream.IntStream; -import static com.nvidia.spark.rapids.jni.Preconditions.ensure; -import static java.util.Objects.requireNonNull; - /** * This class is used to serialize/deserialize a table using the Kudo format. * @@ -148,8 +157,9 @@ public class KudoSerializer { private static final byte[] PADDING = new byte[64]; - private static final BufferType[] ALL_BUFFER_TYPES = new BufferType[]{BufferType.VALIDITY, BufferType.OFFSET, - BufferType.DATA}; + private static final BufferType[] ALL_BUFFER_TYPES = + new BufferType[] {BufferType.VALIDITY, BufferType.OFFSET, + BufferType.DATA}; static { Arrays.fill(PADDING, (byte) 0); @@ -176,7 +186,7 @@ public KudoSerializer(Schema schema) { * @param numRows number of rows to write * @return number of bytes written */ - WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) { + WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) { HostColumnVector[] columns = null; try { columns = IntStream.range(0, table.getNumberOfColumns()) @@ -199,10 +209,12 @@ WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffs * Write partition of an array of {@link HostColumnVector} to an output stream. * See {@link #writeToStreamWithMetrics(HostColumnVector[], OutputStream, int, int)} for more * details. + * * @return number of bytes written */ - public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) { - return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes(); + public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, + int numRows) { + return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes(); } /** @@ -218,7 +230,8 @@ public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowO * @param numRows number of rows to write * @return number of bytes written */ - public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) { + public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out, + int rowOffset, int numRows) { ensure(numRows > 0, () -> "numRows must be > 0, but was " + numRows); ensure(columns.length > 0, () -> "columns must not be empty, for row count only records " + "please call writeRowCountToStream"); @@ -296,9 +309,11 @@ public Pair mergeToTable(List kudoTables) throws } } - private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, int numRows) throws Exception { + private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, + int numRows) throws Exception { WriteMetrics metrics = new WriteMetrics(); - KudoTableHeaderCalc headerCalc = new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount); + KudoTableHeaderCalc headerCalc = + new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount); withTime(() -> Visitors.visitColumns(columns, headerCalc), metrics::addCalcHeaderTime); KudoTableHeader header = headerCalc.getHeader(); long currentTime = System.nanoTime(); diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java index 86d51116b..080cb5eda 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java @@ -17,7 +17,6 @@ package com.nvidia.spark.rapids.jni.kudo; import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment; -import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.withTime; import ai.rapids.cudf.BufferType; import ai.rapids.cudf.DType;