Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into trunc
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Dec 7, 2024
2 parents abbf2c6 + af33e63 commit 2fd0282
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 52 deletions.
8 changes: 4 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ and build inside WSL2, e.g.
### Testing
Java tests are in the `src/test` directory and c++ tests are in the `src/main/cpp/tests` directory.
The c++ tests are built with the `-DBUILD_TESTS` command line option and will build into the
`target/cmake-build/gtests/` directory. Due to building inside the docker container, it is possible
`target/jni/cmake-build/gtests/` directory. Due to building inside the docker container, it is possible
that the host environment does not match the container well enough to run these executables, resulting
in errors finding libraries. The script `build/run-in-docker` was created to help with this
situation. A test can be run directly using this script or the script can be run without any
arguments to get into an interactive shell inside the container.
```build/run-in-docker target/cmake-build/gtests/ROW_CONVERSION```
```build/run-in-docker target/jni/cmake-build/gtests/ROW_CONVERSION```

#### Testing with Compute Sanitizer
[Compute Sanitizer](https://docs.nvidia.com/compute-sanitizer/ComputeSanitizer/index.html) is a
Expand Down Expand Up @@ -311,12 +311,12 @@ in the cuDF [CONTRIBUTING](thirdparty/cudf/CONTRIBUTING.md) guide.
### Benchmarks
Benchmarks exist for c++ benchmarks using NVBench and are in the `src/main/cpp/benchmarks` directory.
To build these benchmarks requires the `-DBUILD_BENCHMARKS` build option. Once built, the benchmarks
can be found in the `target/cmake-build/benchmarks/` directory. Due to building inside the docker
can be found in the `target/jni/cmake-build/benchmarks/` directory. Due to building inside the docker
container, it is possible that the host environment does not match the container well enough to
run these executables, resulting in errors finding libraries. The script `build/run-in-docker`
was created to help with this situation. A benchmark can be run directly using this script or the
script can be run without any arguments to get into an interactive shell inside the container.
```build/run-in-docker target/cmake-build/benchmarks/ROW_CONVERSION_BENCH```
```build/run-in-docker target/jni/cmake-build/benchmarks/ROW_CONVERSION_BENCH```
## Code contributions

### Your first issue
Expand Down
4 changes: 2 additions & 2 deletions src/main/cpp/faultinj/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Spark local mode is a single CUDA process. We can test is as any standalone
single-process application.

```bash
CUDA_INJECTION64_PATH=$PWD/target/cmake-build/faultinj/libcufaultinj.so \
CUDA_INJECTION64_PATH=$PWD/target/jni/cmake-build/faultinj/libcufaultinj.so \
FAULT_INJECTOR_CONFIG_PATH=src/test/cpp/faultinj/test_faultinj.json \
$SPARK_HOME/bin/pyspark \
--jars $SPARK_RAPIDS_REPO/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar \
Expand All @@ -44,7 +44,7 @@ $SPARK_HOME/bin/pyspark \
$SPARK_HOME/bin/spark-shell \
--jars $SPARK_RAPIDS_REPO/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--files ./target/cmake-build/faultinj/libcufaultinj.so,./src/test/cpp/faultinj/test_faultinj.json \
--files ./target/jni/cmake-build/faultinj/libcufaultinj.so,./src/test/cpp/faultinj/test_faultinj.json \
--conf spark.executorEnv.CUDA_INJECTION64_PATH=./libcufaultinj.so \
--conf spark.executorEnv.FAULT_INJECTOR_CONFIG_PATH=test_faultinj.json \
--conf spark.rapids.memory.gpu.minAllocFraction=0 \
Expand Down
69 changes: 53 additions & 16 deletions src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,29 @@

package com.nvidia.spark.rapids.jni.kudo;

import ai.rapids.cudf.*;
import static com.nvidia.spark.rapids.jni.Preconditions.ensure;
import static java.util.Objects.requireNonNull;

import ai.rapids.cudf.BufferType;
import ai.rapids.cudf.Cuda;
import ai.rapids.cudf.HostColumnVector;
import ai.rapids.cudf.JCudfSerialization;
import ai.rapids.cudf.Schema;
import ai.rapids.cudf.Table;
import com.nvidia.spark.rapids.jni.Pair;
import com.nvidia.spark.rapids.jni.schema.Visitors;

import java.io.*;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static com.nvidia.spark.rapids.jni.Preconditions.ensure;
import static java.util.Objects.requireNonNull;

/**
* This class is used to serialize/deserialize a table using the Kudo format.
*
Expand Down Expand Up @@ -148,8 +157,9 @@
public class KudoSerializer {

private static final byte[] PADDING = new byte[64];
private static final BufferType[] ALL_BUFFER_TYPES = new BufferType[]{BufferType.VALIDITY, BufferType.OFFSET,
BufferType.DATA};
private static final BufferType[] ALL_BUFFER_TYPES =
new BufferType[] {BufferType.VALIDITY, BufferType.OFFSET,
BufferType.DATA};

static {
Arrays.fill(PADDING, (byte) 0);
Expand All @@ -176,7 +186,7 @@ public KudoSerializer(Schema schema) {
* @param numRows number of rows to write
* @return number of bytes written
*/
long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) {
WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) {
HostColumnVector[] columns = null;
try {
columns = IntStream.range(0, table.getNumberOfColumns())
Expand All @@ -185,7 +195,7 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) {
.toArray(HostColumnVector[]::new);

Cuda.DEFAULT_STREAM.sync();
return writeToStream(columns, out, rowOffset, numRows);
return writeToStreamWithMetrics(columns, out, rowOffset, numRows);
} finally {
if (columns != null) {
for (HostColumnVector column : columns) {
Expand All @@ -195,6 +205,18 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) {
}
}

/**
* Write partition of an array of {@link HostColumnVector} to an output stream.
* See {@link #writeToStreamWithMetrics(HostColumnVector[], OutputStream, int, int)} for more
* details.
*
* @return number of bytes written
*/
public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset,
int numRows) {
return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes();
}

/**
* Write partition of an array of {@link HostColumnVector} to an output stream.
* <br/>
Expand All @@ -208,7 +230,8 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) {
* @param numRows number of rows to write
* @return number of bytes written
*/
public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) {
public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out,
int rowOffset, int numRows) {
ensure(numRows > 0, () -> "numRows must be > 0, but was " + numRows);
ensure(columns.length > 0, () -> "columns must not be empty, for row count only records " +
"please call writeRowCountToStream");
Expand Down Expand Up @@ -286,17 +309,25 @@ public Pair<Table, MergeMetrics> mergeToTable(List<KudoTable> kudoTables) throws
}
}

private long writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, int numRows) throws Exception {
KudoTableHeaderCalc headerCalc = new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount);
Visitors.visitColumns(columns, headerCalc);
private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset,
int numRows) throws Exception {
WriteMetrics metrics = new WriteMetrics();
KudoTableHeaderCalc headerCalc =
new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount);
withTime(() -> Visitors.visitColumns(columns, headerCalc), metrics::addCalcHeaderTime);
KudoTableHeader header = headerCalc.getHeader();
long currentTime = System.nanoTime();
header.writeTo(out);
metrics.addCopyHeaderTime(System.nanoTime() - currentTime);
metrics.addWrittenBytes(header.getSerializedSize());

long bytesWritten = 0;
for (BufferType bufferType : ALL_BUFFER_TYPES) {
SlicedBufferSerializer serializer = new SlicedBufferSerializer(rowOffset, numRows, bufferType, out);
SlicedBufferSerializer serializer = new SlicedBufferSerializer(rowOffset, numRows, bufferType,
out, metrics);
Visitors.visitColumns(columns, serializer);
bytesWritten += serializer.getTotalDataLen();
metrics.addWrittenBytes(serializer.getTotalDataLen());
}

if (bytesWritten != header.getTotalDataLen()) {
Expand All @@ -307,7 +338,7 @@ private long writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffs

out.flush();

return header.getSerializedSize() + bytesWritten;
return metrics;
}

private static DataWriter writerFrom(OutputStream out) {
Expand Down Expand Up @@ -348,6 +379,12 @@ static <T> T withTime(Supplier<T> task, LongConsumer timeConsumer) {
return ret;
}

static void withTime(Runnable task, LongConsumer timeConsumer) {
long now = System.nanoTime();
task.run();
timeConsumer.accept(System.nanoTime() - now);
}

/**
* This method returns the length in bytes needed to represent X number of rows
* e.g. getValidityLengthInBytes(5) => 1 byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@

package com.nvidia.spark.rapids.jni.kudo;

import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment;

import ai.rapids.cudf.BufferType;
import ai.rapids.cudf.DType;
import ai.rapids.cudf.HostColumnVectorCore;
import ai.rapids.cudf.HostMemoryBuffer;
import com.nvidia.spark.rapids.jni.schema.HostColumnsVisitor;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;

import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment;

/**
* This class visits a list of columns and serialize one of the buffers (validity, offset, or data) into with kudo
* format.
Expand All @@ -48,13 +47,16 @@ class SlicedBufferSerializer implements HostColumnsVisitor<Void> {
private final DataWriter writer;

private final Deque<SliceInfo> sliceInfos = new ArrayDeque<>();
private final WriteMetrics metrics;
private long totalDataLen;

SlicedBufferSerializer(int rowOffset, int numRows, BufferType bufferType, DataWriter writer) {
SlicedBufferSerializer(int rowOffset, int numRows, BufferType bufferType, DataWriter writer,
WriteMetrics metrics) {
this.root = new SliceInfo(rowOffset, numRows);
this.bufferType = bufferType;
this.writer = writer;
this.sliceInfos.addLast(root);
this.metrics = metrics;
this.totalDataLen = 0;
}

Expand Down Expand Up @@ -153,36 +155,35 @@ public Void visit(HostColumnVectorCore col) {
}
}

private long copySlicedValidity(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException {
private long copySlicedValidity(HostColumnVectorCore column, SliceInfo sliceInfo)
throws IOException {
if (column.getValidity() != null && sliceInfo.getRowCount() > 0) {
HostMemoryBuffer buff = column.getValidity();
long len = sliceInfo.getValidityBufferInfo().getBufferLength();
writer.copyDataFrom(buff, sliceInfo.getValidityBufferInfo().getBufferOffset(),
len);
return padForHostAlignment(writer, len);
return copyBufferAndPadForHost(buff, sliceInfo.getValidityBufferInfo().getBufferOffset(), len);
} else {
return 0;
}
}

private long copySlicedOffset(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException {
private long copySlicedOffset(HostColumnVectorCore column, SliceInfo sliceInfo)
throws IOException {
if (sliceInfo.rowCount <= 0 || column.getOffsets() == null) {
// Don't copy anything, there are no rows
return 0;
}
long bytesToCopy = (sliceInfo.rowCount + 1) * Integer.BYTES;
long srcOffset = sliceInfo.offset * Integer.BYTES;
HostMemoryBuffer buff = column.getOffsets();
writer.copyDataFrom(buff, srcOffset, bytesToCopy);
return padForHostAlignment(writer, bytesToCopy);
return copyBufferAndPadForHost(column.getOffsets(), srcOffset, bytesToCopy);
}

private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException {
if (sliceInfo.rowCount > 0) {
DType type = column.getType();
if (type.equals(DType.STRING)) {
long startByteOffset = column.getOffsets().getInt(sliceInfo.offset * Integer.BYTES);
long endByteOffset = column.getOffsets().getInt((sliceInfo.offset + sliceInfo.rowCount) * Integer.BYTES);
long endByteOffset =
column.getOffsets().getInt((sliceInfo.offset + sliceInfo.rowCount) * Integer.BYTES);
long bytesToCopy = endByteOffset - startByteOffset;
if (column.getData() == null) {
if (bytesToCopy != 0) {
Expand All @@ -192,19 +193,26 @@ private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) th

return 0;
} else {
writer.copyDataFrom(column.getData(), startByteOffset, bytesToCopy);
return padForHostAlignment(writer, bytesToCopy);
return copyBufferAndPadForHost(column.getData(), startByteOffset, bytesToCopy);
}
} else if (type.getSizeInBytes() > 0) {
long bytesToCopy = sliceInfo.rowCount * type.getSizeInBytes();
long srcOffset = sliceInfo.offset * type.getSizeInBytes();
writer.copyDataFrom(column.getData(), srcOffset, bytesToCopy);
return padForHostAlignment(writer, bytesToCopy);
return copyBufferAndPadForHost(column.getData(), srcOffset, bytesToCopy);
} else {
return 0;
}
} else {
return 0;
}
}

private long copyBufferAndPadForHost(HostMemoryBuffer buffer, long offset, long length)
throws IOException {
long now = System.nanoTime();
writer.copyDataFrom(buffer, offset, length);
long ret = padForHostAlignment(writer, length);
metrics.addCopyBufferTime(System.nanoTime() - now);
return ret;
}
}
79 changes: 79 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/kudo/WriteMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.jni.kudo;

/**
* This class contains metrics for serializing table using kudo format.
*/
public class WriteMetrics {
private long calcHeaderTime;
private long copyHeaderTime;
private long copyBufferTime;
private long writtenBytes;


public WriteMetrics() {
this.calcHeaderTime = 0;
this.copyHeaderTime = 0;
this.copyBufferTime = 0;
this.writtenBytes = 0;
}

/**
* Get the time spent on calculating the header.
*/
public long getCalcHeaderTime() {
return calcHeaderTime;
}

/**
* Get the time spent on copying the buffer.
*/
public long getCopyBufferTime() {
return copyBufferTime;
}

public void addCopyBufferTime(long time) {
copyBufferTime += time;
}

/**
* Get the time spent on copying the header.
*/
public long getCopyHeaderTime() {
return copyHeaderTime;
}

public void addCalcHeaderTime(long time) {
calcHeaderTime += time;
}

public void addCopyHeaderTime(long time) {
copyHeaderTime += time;
}

/**
* Get the number of bytes written.
*/
public long getWrittenBytes() {
return writtenBytes;
}

public void addWrittenBytes(long bytes) {
writtenBytes += bytes;
}
}
Loading

0 comments on commit 2fd0282

Please sign in to comment.