Skip to content

Commit

Permalink
Add support for asynchronous writing for parquet (#11730)
Browse files Browse the repository at this point in the history
* Support async writing for query output

Signed-off-by: Jihoon Son <[email protected]>

* doc change

* use a long timeout

* fix test failure due to a race

* fix flaky test

* address comments

* fix the config name for hold gpu

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/AsyncOutputStream.scala

Simplify case arm

Co-authored-by: Gera Shegalov <[email protected]>

* address comments

* missing doc change

* use trampoline

---------

Signed-off-by: Jihoon Son <[email protected]>
Co-authored-by: Gera Shegalov <[email protected]>
  • Loading branch information
jihoonson and gerashegalov authored Nov 25, 2024
1 parent 938db21 commit 6b90b2f
Show file tree
Hide file tree
Showing 10 changed files with 855 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,11 +25,13 @@ import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.io.async.{AsyncOutputStream, TrafficController}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.{ColumnarWriteTaskStatsTracker, GpuWriteTaskStatsTracker}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -70,21 +72,31 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType,
rangeName: String,
includeRetry: Boolean) extends HostBufferConsumer {
includeRetry: Boolean,
holdGpuBetweenBatches: Boolean = false) extends HostBufferConsumer with Logging {

protected val tableWriter: TableWriter

protected val conf: Configuration = context.getConfiguration

// This is implemented as a method to make it easier to subclass
// ColumnarOutputWriter in the tests, and override this behavior.
protected def getOutputStream: FSDataOutputStream = {
private val trafficController: Option[TrafficController] = TrafficController.getInstance

private def openOutputStream(): OutputStream = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(conf)
fs.create(hadoopPath, false)
}

protected val outputStream: FSDataOutputStream = getOutputStream
// This is implemented as a method to make it easier to subclass
// ColumnarOutputWriter in the tests, and override this behavior.
protected def getOutputStream: OutputStream = {
trafficController.map(controller => {
logWarning("Async output write enabled")
new AsyncOutputStream(() => openOutputStream(), controller)
}).getOrElse(openOutputStream())
}

protected val outputStream: OutputStream = getOutputStream

private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] var anythingWritten = false
Expand Down Expand Up @@ -166,7 +178,11 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
}
// we successfully buffered to host memory, release the semaphore and write
// the buffered data to the FS
GpuSemaphore.releaseIfNecessary(TaskContext.get)
if (!holdGpuBetweenBatches) {
logDebug("Releasing semaphore between batches")
GpuSemaphore.releaseIfNecessary(TaskContext.get)
}

writeBufferedData()
updateStatistics(writeStartTime, gpuTime, statsTrackers)
spillableBatch.numRows()
Expand Down Expand Up @@ -202,6 +218,10 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
// buffer an empty batch on close() to work around issues in cuDF
// where corrupt files can be written if nothing is encoded via the writer.
anythingWritten = true

// tableWriter.write() serializes the table into the HostMemoryBuffer, and buffers it
// by calling handleBuffer() on the ColumnarOutputWriter. It may not write to the
// output stream just yet.
tableWriter.write(table)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,19 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.")
}

val asyncOutputWriteEnabled = RapidsConf.ENABLE_ASYNC_OUTPUT_WRITE.get(sqlConf)
// holdGpuBetweenBatches is on by default if asyncOutputWriteEnabled is on
val holdGpuBetweenBatches = RapidsConf.ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK.get(sqlConf)
.getOrElse(asyncOutputWriteEnabled)

new ColumnarOutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString,
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled)
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled,
holdGpuBetweenBatches)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -299,8 +305,9 @@ class GpuParquetWriter(
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true) {
parquetFieldIdEnabled: Boolean,
holdGpuBetweenBatches: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, holdGpuBetweenBatches) {
override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = {
val cols = GpuColumnVector.extractBases(batch)
cols.foreach { col =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.nvidia.spark.DFUDFPlugin
import com.nvidia.spark.rapids.RapidsConf.AllowMultipleJars
import com.nvidia.spark.rapids.RapidsPluginUtils.buildInfoEvent
import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg}
import com.nvidia.spark.rapids.io.async.TrafficController
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import org.apache.commons.lang3.exception.ExceptionUtils
Expand Down Expand Up @@ -554,6 +555,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraExecutorPlugins.foreach(_.init(pluginContext, extraConf))
GpuSemaphore.initialize()
FileCache.init(pluginContext)
TrafficController.initialize(conf)
} catch {
// Exceptions in executor plugin can cause a single thread to die but the executor process
// sticks around without any useful info until it hearbeat times out. Print what happened
Expand Down Expand Up @@ -656,6 +658,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraExecutorPlugins.foreach(_.shutdown())
FileCache.shutdown()
GpuCoreDumpHandler.shutdown()
TrafficController.shutdown()
}

override def onTaskFailed(failureReason: TaskFailedReason): Unit = {
Expand Down
35 changes: 35 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2406,6 +2406,36 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(false)

val ENABLE_ASYNC_OUTPUT_WRITE =
conf("spark.rapids.sql.asyncWrite.queryOutput.enabled")
.doc("Option to turn on the async query output write. During the final output write, the " +
"task first copies the output to the host memory, and then writes it into the storage. " +
"When this option is enabled, the task will asynchronously write the output in the host " +
"memory to the storage. Only the Parquet format is supported currently.")
.internal()
.booleanConf
.createWithDefault(false)

val ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK =
conf("spark.rapids.sql.queryOutput.holdGpuInTask")
.doc("Option to hold GPU semaphore between batch processing during the final output write. " +
"This option could degrade query performance if it is enabled without the async query " +
"output write. It is recommended to consider enabling this option only when " +
s"${ENABLE_ASYNC_OUTPUT_WRITE.key} is set. This option is off by default when the async " +
"query output write is disabled; otherwise, it is on.")
.internal()
.booleanConf
.createOptional

val ASYNC_WRITE_MAX_IN_FLIGHT_HOST_MEMORY_BYTES =
conf("spark.rapids.sql.asyncWrite.maxInFlightHostMemoryBytes")
.doc("Maximum number of host memory bytes per executor that can be in-flight for async " +
"query output write. Tasks may be blocked if the total host memory bytes in-flight " +
"exceeds this value.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(2L * 1024 * 1024 * 1024)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -2663,6 +2693,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isFoldableNonLitAllowed: Boolean = get(FOLDABLE_NON_LIT_ALLOWED)

lazy val asyncWriteMaxInFlightHostMemoryBytes: Long =
get(ASYNC_WRITE_MAX_IN_FLIGHT_HOST_MEMORY_BYTES)

/**
* Convert a string value to the injection configuration OomInjection.
*
Expand Down Expand Up @@ -3248,6 +3281,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val caseWhenFuseEnabled: Boolean = get(CASE_WHEN_FUSE)

lazy val isAsyncOutputWriteEnabled: Boolean = get(ENABLE_ASYNC_OUTPUT_WRITE)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.io.async

import java.io.{IOException, OutputStream}
import java.util.concurrent.{Callable, TimeUnit}
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.rapids.execution.TrampolineUtil

/**
* OutputStream that performs writes asynchronously. Writes are scheduled on a background thread
* and executed in the order they were scheduled. This class is not thread-safe and should only be
* used by a single thread.
*/
class AsyncOutputStream(openFn: Callable[OutputStream], trafficController: TrafficController)
extends OutputStream {

private var closed = false

private val executor = new ThrottlingExecutor(
TrampolineUtil.newDaemonCachedThreadPool("AsyncOutputStream", 1, 1),
trafficController)

// Open the underlying stream asynchronously as soon as the AsyncOutputStream is constructed,
// so that the open can be done in parallel with other operations. This could help with
// performance if the open is slow.
private val openFuture = executor.submit(openFn, 0)
// Let's give it enough time to open the stream. Something bad should have happened if it
// takes more than 5 minutes to open a stream.
private val openTimeoutMin = 5

private lazy val delegate: OutputStream = {
openFuture.get(openTimeoutMin, TimeUnit.MINUTES)
}

class Metrics {
var numBytesScheduled: Long = 0
// This is thread-safe as it is updated by the background thread and can be read by
// any threads.
val numBytesWritten: AtomicLong = new AtomicLong(0)
}

val metrics = new Metrics

/**
* The last error that occurred in the background thread, or None if no error occurred.
* Once it is set, all subsequent writes that are already scheduled will fail and no new
* writes will be accepted.
*
* This is thread-safe as it is set by the background thread and can be read by any threads.
*/
val lastError: AtomicReference[Option[Throwable]] =
new AtomicReference[Option[Throwable]](None)

@throws[IOException]
private def throwIfError(): Unit = {
lastError.get() match {
case Some(t: IOException) => throw t
case Some(t) => throw new IOException(t)
case None =>
}
}

@throws[IOException]
private def ensureOpen(): Unit = {
if (closed) {
throw new IOException("Stream closed")
}
}

private def scheduleWrite(fn: () => Unit, bytesToWrite: Int): Unit = {
throwIfError()
ensureOpen()

metrics.numBytesScheduled += bytesToWrite
executor.submit(() => {
throwIfError()
ensureOpen()

try {
fn()
metrics.numBytesWritten.addAndGet(bytesToWrite)
} catch {
case t: Throwable =>
// Update the error state
lastError.set(Some(t))
}
}, bytesToWrite)
}

override def write(b: Int): Unit = {
scheduleWrite(() => delegate.write(b), 1)
}

override def write(b: Array[Byte]): Unit = {
scheduleWrite(() => delegate.write(b), b.length)
}

/**
* Schedules a write of the given bytes to the underlying stream. The write is executed
* asynchronously on a background thread. The method returns immediately, and the write may not
* have completed when the method returns.
*
* If an error has occurred in the background thread and [[lastError]] has been set, this function
* will throw an IOException immediately.
*
* If an error has occurred in the background thread while executing a previous write after the
* current write has been scheduled, the current write will fail with the same error.
*/
@throws[IOException]
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
scheduleWrite(() => delegate.write(b, off, len), len)
}

/**
* Flushes all pending writes to the underlying stream. This method blocks until all pending
* writes have been completed. If an error has occurred in the background thread, this method
* will throw an IOException.
*
* If an error has occurred in the background thread and [[lastError]] has been set, this function
* will throw an IOException immediately.
*
* If an error has occurred in the background thread while executing a previous task after the
* current flush has been scheduled, the current flush will fail with the same error.
*/
@throws[IOException]
override def flush(): Unit = {
throwIfError()
ensureOpen()

val f = executor.submit(() => {
throwIfError()
ensureOpen()

delegate.flush()
}, 0)

f.get()
}

/**
* Closes the underlying stream and releases any resources associated with it. All pending writes
* are flushed before closing the stream. This method blocks until all pending writes have been
* completed.
*
* If an error has occurred while flushing, this function will throw an IOException.
*
* If an error has occurred while executing a previous task before this function is called,
* this function will throw the same error. All resources and the underlying stream are still
* guaranteed to be closed.
*/
@throws[IOException]
override def close(): Unit = {
if (!closed) {
Seq[AutoCloseable](
() => {
// Wait for all pending writes to complete
// This will throw an exception if one of the writes fails
flush()
},
() => {
// Give the executor a chance to shutdown gracefully.
executor.shutdownNow(10, TimeUnit.SECONDS)
},
delegate,
() => closed = true).safeClose()
}
}
}
Loading

0 comments on commit 6b90b2f

Please sign in to comment.