diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1bb9fdf6b0..baea783808 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -235,12 +235,12 @@ and build inside WSL2, e.g. ### Testing Java tests are in the `src/test` directory and c++ tests are in the `src/main/cpp/tests` directory. The c++ tests are built with the `-DBUILD_TESTS` command line option and will build into the -`target/cmake-build/gtests/` directory. Due to building inside the docker container, it is possible +`target/jni/cmake-build/gtests/` directory. Due to building inside the docker container, it is possible that the host environment does not match the container well enough to run these executables, resulting in errors finding libraries. The script `build/run-in-docker` was created to help with this situation. A test can be run directly using this script or the script can be run without any arguments to get into an interactive shell inside the container. -```build/run-in-docker target/cmake-build/gtests/ROW_CONVERSION``` +```build/run-in-docker target/jni/cmake-build/gtests/ROW_CONVERSION``` #### Testing with Compute Sanitizer [Compute Sanitizer](https://docs.nvidia.com/compute-sanitizer/ComputeSanitizer/index.html) is a @@ -311,12 +311,12 @@ in the cuDF [CONTRIBUTING](thirdparty/cudf/CONTRIBUTING.md) guide. ### Benchmarks Benchmarks exist for c++ benchmarks using NVBench and are in the `src/main/cpp/benchmarks` directory. To build these benchmarks requires the `-DBUILD_BENCHMARKS` build option. Once built, the benchmarks -can be found in the `target/cmake-build/benchmarks/` directory. Due to building inside the docker +can be found in the `target/jni/cmake-build/benchmarks/` directory. Due to building inside the docker container, it is possible that the host environment does not match the container well enough to run these executables, resulting in errors finding libraries. The script `build/run-in-docker` was created to help with this situation. A benchmark can be run directly using this script or the script can be run without any arguments to get into an interactive shell inside the container. -```build/run-in-docker target/cmake-build/benchmarks/ROW_CONVERSION_BENCH``` +```build/run-in-docker target/jni/cmake-build/benchmarks/ROW_CONVERSION_BENCH``` ## Code contributions ### Your first issue diff --git a/src/main/cpp/faultinj/README.md b/src/main/cpp/faultinj/README.md index 08ac1e0285..a22748eac8 100644 --- a/src/main/cpp/faultinj/README.md +++ b/src/main/cpp/faultinj/README.md @@ -33,7 +33,7 @@ Spark local mode is a single CUDA process. We can test is as any standalone single-process application. ```bash -CUDA_INJECTION64_PATH=$PWD/target/cmake-build/faultinj/libcufaultinj.so \ +CUDA_INJECTION64_PATH=$PWD/target/jni/cmake-build/faultinj/libcufaultinj.so \ FAULT_INJECTOR_CONFIG_PATH=src/test/cpp/faultinj/test_faultinj.json \ $SPARK_HOME/bin/pyspark \ --jars $SPARK_RAPIDS_REPO/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar \ @@ -44,7 +44,7 @@ $SPARK_HOME/bin/pyspark \ $SPARK_HOME/bin/spark-shell \ --jars $SPARK_RAPIDS_REPO/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar \ --conf spark.plugins=com.nvidia.spark.SQLPlugin \ - --files ./target/cmake-build/faultinj/libcufaultinj.so,./src/test/cpp/faultinj/test_faultinj.json \ + --files ./target/jni/cmake-build/faultinj/libcufaultinj.so,./src/test/cpp/faultinj/test_faultinj.json \ --conf spark.executorEnv.CUDA_INJECTION64_PATH=./libcufaultinj.so \ --conf spark.executorEnv.FAULT_INJECTOR_CONFIG_PATH=test_faultinj.json \ --conf spark.rapids.memory.gpu.minAllocFraction=0 \ diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java index 6370531428..7ae784e639 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java @@ -16,20 +16,29 @@ package com.nvidia.spark.rapids.jni.kudo; -import ai.rapids.cudf.*; +import static com.nvidia.spark.rapids.jni.Preconditions.ensure; +import static java.util.Objects.requireNonNull; + +import ai.rapids.cudf.BufferType; +import ai.rapids.cudf.Cuda; +import ai.rapids.cudf.HostColumnVector; +import ai.rapids.cudf.JCudfSerialization; +import ai.rapids.cudf.Schema; +import ai.rapids.cudf.Table; import com.nvidia.spark.rapids.jni.Pair; import com.nvidia.spark.rapids.jni.schema.Visitors; - -import java.io.*; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Arrays; import java.util.List; import java.util.function.LongConsumer; import java.util.function.Supplier; import java.util.stream.IntStream; -import static com.nvidia.spark.rapids.jni.Preconditions.ensure; -import static java.util.Objects.requireNonNull; - /** * This class is used to serialize/deserialize a table using the Kudo format. * @@ -148,8 +157,9 @@ public class KudoSerializer { private static final byte[] PADDING = new byte[64]; - private static final BufferType[] ALL_BUFFER_TYPES = new BufferType[]{BufferType.VALIDITY, BufferType.OFFSET, - BufferType.DATA}; + private static final BufferType[] ALL_BUFFER_TYPES = + new BufferType[] {BufferType.VALIDITY, BufferType.OFFSET, + BufferType.DATA}; static { Arrays.fill(PADDING, (byte) 0); @@ -176,7 +186,7 @@ public KudoSerializer(Schema schema) { * @param numRows number of rows to write * @return number of bytes written */ - long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { + WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) { HostColumnVector[] columns = null; try { columns = IntStream.range(0, table.getNumberOfColumns()) @@ -185,7 +195,7 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { .toArray(HostColumnVector[]::new); Cuda.DEFAULT_STREAM.sync(); - return writeToStream(columns, out, rowOffset, numRows); + return writeToStreamWithMetrics(columns, out, rowOffset, numRows); } finally { if (columns != null) { for (HostColumnVector column : columns) { @@ -195,6 +205,18 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { } } + /** + * Write partition of an array of {@link HostColumnVector} to an output stream. + * See {@link #writeToStreamWithMetrics(HostColumnVector[], OutputStream, int, int)} for more + * details. + * + * @return number of bytes written + */ + public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, + int numRows) { + return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes(); + } + /** * Write partition of an array of {@link HostColumnVector} to an output stream. *
@@ -208,7 +230,8 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { * @param numRows number of rows to write * @return number of bytes written */ - public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) { + public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out, + int rowOffset, int numRows) { ensure(numRows > 0, () -> "numRows must be > 0, but was " + numRows); ensure(columns.length > 0, () -> "columns must not be empty, for row count only records " + "please call writeRowCountToStream"); @@ -286,17 +309,25 @@ public Pair mergeToTable(List kudoTables) throws } } - private long writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, int numRows) throws Exception { - KudoTableHeaderCalc headerCalc = new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount); - Visitors.visitColumns(columns, headerCalc); + private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, + int numRows) throws Exception { + WriteMetrics metrics = new WriteMetrics(); + KudoTableHeaderCalc headerCalc = + new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount); + withTime(() -> Visitors.visitColumns(columns, headerCalc), metrics::addCalcHeaderTime); KudoTableHeader header = headerCalc.getHeader(); + long currentTime = System.nanoTime(); header.writeTo(out); + metrics.addCopyHeaderTime(System.nanoTime() - currentTime); + metrics.addWrittenBytes(header.getSerializedSize()); long bytesWritten = 0; for (BufferType bufferType : ALL_BUFFER_TYPES) { - SlicedBufferSerializer serializer = new SlicedBufferSerializer(rowOffset, numRows, bufferType, out); + SlicedBufferSerializer serializer = new SlicedBufferSerializer(rowOffset, numRows, bufferType, + out, metrics); Visitors.visitColumns(columns, serializer); bytesWritten += serializer.getTotalDataLen(); + metrics.addWrittenBytes(serializer.getTotalDataLen()); } if (bytesWritten != header.getTotalDataLen()) { @@ -307,7 +338,7 @@ private long writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffs out.flush(); - return header.getSerializedSize() + bytesWritten; + return metrics; } private static DataWriter writerFrom(OutputStream out) { @@ -348,6 +379,12 @@ static T withTime(Supplier task, LongConsumer timeConsumer) { return ret; } + static void withTime(Runnable task, LongConsumer timeConsumer) { + long now = System.nanoTime(); + task.run(); + timeConsumer.accept(System.nanoTime() - now); + } + /** * This method returns the length in bytes needed to represent X number of rows * e.g. getValidityLengthInBytes(5) => 1 byte diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java index e22a523855..080cb5eda6 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java @@ -16,19 +16,18 @@ package com.nvidia.spark.rapids.jni.kudo; +import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment; + import ai.rapids.cudf.BufferType; import ai.rapids.cudf.DType; import ai.rapids.cudf.HostColumnVectorCore; import ai.rapids.cudf.HostMemoryBuffer; import com.nvidia.spark.rapids.jni.schema.HostColumnsVisitor; - import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; -import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment; - /** * This class visits a list of columns and serialize one of the buffers (validity, offset, or data) into with kudo * format. @@ -48,13 +47,16 @@ class SlicedBufferSerializer implements HostColumnsVisitor { private final DataWriter writer; private final Deque sliceInfos = new ArrayDeque<>(); + private final WriteMetrics metrics; private long totalDataLen; - SlicedBufferSerializer(int rowOffset, int numRows, BufferType bufferType, DataWriter writer) { + SlicedBufferSerializer(int rowOffset, int numRows, BufferType bufferType, DataWriter writer, + WriteMetrics metrics) { this.root = new SliceInfo(rowOffset, numRows); this.bufferType = bufferType; this.writer = writer; this.sliceInfos.addLast(root); + this.metrics = metrics; this.totalDataLen = 0; } @@ -153,28 +155,26 @@ public Void visit(HostColumnVectorCore col) { } } - private long copySlicedValidity(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException { + private long copySlicedValidity(HostColumnVectorCore column, SliceInfo sliceInfo) + throws IOException { if (column.getValidity() != null && sliceInfo.getRowCount() > 0) { HostMemoryBuffer buff = column.getValidity(); long len = sliceInfo.getValidityBufferInfo().getBufferLength(); - writer.copyDataFrom(buff, sliceInfo.getValidityBufferInfo().getBufferOffset(), - len); - return padForHostAlignment(writer, len); + return copyBufferAndPadForHost(buff, sliceInfo.getValidityBufferInfo().getBufferOffset(), len); } else { return 0; } } - private long copySlicedOffset(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException { + private long copySlicedOffset(HostColumnVectorCore column, SliceInfo sliceInfo) + throws IOException { if (sliceInfo.rowCount <= 0 || column.getOffsets() == null) { // Don't copy anything, there are no rows return 0; } long bytesToCopy = (sliceInfo.rowCount + 1) * Integer.BYTES; long srcOffset = sliceInfo.offset * Integer.BYTES; - HostMemoryBuffer buff = column.getOffsets(); - writer.copyDataFrom(buff, srcOffset, bytesToCopy); - return padForHostAlignment(writer, bytesToCopy); + return copyBufferAndPadForHost(column.getOffsets(), srcOffset, bytesToCopy); } private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException { @@ -182,7 +182,8 @@ private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) th DType type = column.getType(); if (type.equals(DType.STRING)) { long startByteOffset = column.getOffsets().getInt(sliceInfo.offset * Integer.BYTES); - long endByteOffset = column.getOffsets().getInt((sliceInfo.offset + sliceInfo.rowCount) * Integer.BYTES); + long endByteOffset = + column.getOffsets().getInt((sliceInfo.offset + sliceInfo.rowCount) * Integer.BYTES); long bytesToCopy = endByteOffset - startByteOffset; if (column.getData() == null) { if (bytesToCopy != 0) { @@ -192,14 +193,12 @@ private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) th return 0; } else { - writer.copyDataFrom(column.getData(), startByteOffset, bytesToCopy); - return padForHostAlignment(writer, bytesToCopy); + return copyBufferAndPadForHost(column.getData(), startByteOffset, bytesToCopy); } } else if (type.getSizeInBytes() > 0) { long bytesToCopy = sliceInfo.rowCount * type.getSizeInBytes(); long srcOffset = sliceInfo.offset * type.getSizeInBytes(); - writer.copyDataFrom(column.getData(), srcOffset, bytesToCopy); - return padForHostAlignment(writer, bytesToCopy); + return copyBufferAndPadForHost(column.getData(), srcOffset, bytesToCopy); } else { return 0; } @@ -207,4 +206,13 @@ private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) th return 0; } } + + private long copyBufferAndPadForHost(HostMemoryBuffer buffer, long offset, long length) + throws IOException { + long now = System.nanoTime(); + writer.copyDataFrom(buffer, offset, length); + long ret = padForHostAlignment(writer, length); + metrics.addCopyBufferTime(System.nanoTime() - now); + return ret; + } } diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/WriteMetrics.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/WriteMetrics.java new file mode 100644 index 0000000000..d34564e776 --- /dev/null +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/WriteMetrics.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni.kudo; + +/** + * This class contains metrics for serializing table using kudo format. + */ +public class WriteMetrics { + private long calcHeaderTime; + private long copyHeaderTime; + private long copyBufferTime; + private long writtenBytes; + + + public WriteMetrics() { + this.calcHeaderTime = 0; + this.copyHeaderTime = 0; + this.copyBufferTime = 0; + this.writtenBytes = 0; + } + + /** + * Get the time spent on calculating the header. + */ + public long getCalcHeaderTime() { + return calcHeaderTime; + } + + /** + * Get the time spent on copying the buffer. + */ + public long getCopyBufferTime() { + return copyBufferTime; + } + + public void addCopyBufferTime(long time) { + copyBufferTime += time; + } + + /** + * Get the time spent on copying the header. + */ + public long getCopyHeaderTime() { + return copyHeaderTime; + } + + public void addCalcHeaderTime(long time) { + calcHeaderTime += time; + } + + public void addCopyHeaderTime(long time) { + copyHeaderTime += time; + } + + /** + * Get the number of bytes written. + */ + public long getWrittenBytes() { + return writtenBytes; + } + + public void addWrittenBytes(long bytes) { + writtenBytes += bytes; + } +} diff --git a/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java b/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java index 210777accf..3ffcb5e61b 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java @@ -75,7 +75,7 @@ public void testWriteSimple() throws Exception { try (Table t = buildSimpleTable()) { ByteArrayOutputStream out = new ByteArrayOutputStream(); - long bytesWritten = serializer.writeToStream(t, out, 0, 4); + long bytesWritten = serializer.writeToStreamWithMetrics(t, out, 0, 4).getWrittenBytes(); assertEquals(189, bytesWritten); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); @@ -365,7 +365,7 @@ private static void checkMergeTable(Table expected, List tableSlices ByteArrayOutputStream bout = new ByteArrayOutputStream(); for (TableSlice slice : tableSlices) { - serializer.writeToStream(slice.getBaseTable(), bout, slice.getStartRow(), slice.getNumRows()); + serializer.writeToStreamWithMetrics(slice.getBaseTable(), bout, slice.getStartRow(), slice.getNumRows()); } bout.flush(); diff --git a/thirdparty/cudf b/thirdparty/cudf index 12c77f32ee..14b4321b51 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit 12c77f32eee3b1aa0ba5592d9f25b4664104bd04 +Subproject commit 14b4321b5172104c5d9801e196e607e3bb0c4c39 diff --git a/thirdparty/cudf-pins/rapids-cmake.sha b/thirdparty/cudf-pins/rapids-cmake.sha index fad760dd73..f494e2ffe6 100644 --- a/thirdparty/cudf-pins/rapids-cmake.sha +++ b/thirdparty/cudf-pins/rapids-cmake.sha @@ -1 +1 @@ -fa61767d1584a9c8d083aa9ce8636af89cc63923 +7e3122c579b19ccaa3fe670fa6ad8efb8d3f738c diff --git a/thirdparty/cudf-pins/versions.json b/thirdparty/cudf-pins/versions.json index f54f56d1cc..bcb861913b 100644 --- a/thirdparty/cudf-pins/versions.json +++ b/thirdparty/cudf-pins/versions.json @@ -5,15 +5,10 @@ { "always_download" : true, "git_shallow" : false, - "git_tag" : "e21d607157218540cd7c45461213fb96adf720b7", + "git_tag" : "05e019afe53f9b0e4454cbd822f9bdda18df49bb", "git_url" : "https://github.com/NVIDIA/cccl.git", "patches" : [ - { - "file" : "${current_json_dir}/../cudf/cpp/cmake/thirdparty/patches/cccl_symbol_visibility.diff", - "fixed_in" : "2.6", - "issue" : "Correct symbol visibility issues in libcudacxx [https://github.com/NVIDIA/cccl/pull/1832/]" - }, { "file" : "${current_json_dir}/../cudf/cpp/cmake/thirdparty/patches/thrust_disable_64bit_dispatching.diff", "fixed_in" : "", @@ -30,7 +25,7 @@ "issue" : "Improve Thrust scan compile times by reducing the number of kernels generated [https://github.com/rapidsai/cudf/pull/8183]" } ], - "version" : "2.5.0" + "version" : "2.7.0" }, "GTest" : { @@ -99,7 +94,7 @@ { "always_download" : true, "git_shallow" : false, - "git_tag" : "3b90520801bab995c54ec210f8a7080a9812e75c", + "git_tag" : "2c7eb9759f27b2407dbd59c43696056c26249735", "git_url" : "https://github.com/rapidsai/kvikio.git", "version" : "25.02" }, @@ -149,7 +144,7 @@ { "always_download" : true, "git_shallow" : false, - "git_tag" : "d4066fa611c803430c9bc5dbe8e243f89bb9a25c", + "git_tag" : "fc9c1389307db60c25a5bd015f21a835fc708226", "git_url" : "https://github.com/rapidsai/rmm.git", "version" : "25.02" },