Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FallbackStorage retries FileNotFoundExceptions #16

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Download test results to report
uses: dawidd6/action-download-artifact@09385b76de790122f4da9c82b17bccf858b9557c # pin@v2
uses: dawidd6/action-download-artifact@bf251b5aa9c2f7eeb574a96ee720e24f801b7c11 # pin @v6
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
workflow: ${{ github.event.workflow_run.workflow_id }}
commit: ${{ github.event.workflow_run.head_commit.id }}
workflow_conclusion: completed
- name: Publish test report
uses: scacap/action-surefire-report@482f012643ed0560e23ef605a79e8e87ca081648 # pin@v1
uses: scacap/action-surefire-report@a2911bd1a4412ec18dde2d93b1758b3e56d2a880 # pin @v1.8.0
with:
check_name: Report test results
github_token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,25 @@ package object config {
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY =
ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationDelay")
.doc("The maximum expected delay for files written by one executor to become " +
"available to other executors.")
.version("4.0.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ > 0, "Value must be positive.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT =
ConfigBuilder("spark.storage.decommission.fallbackStorage.replicationWait")
.doc("When an executor cannot find a file in the fallback storage it waits " +
"this amount of time before attempting to open the file again, " +
f"while not exceeding ${STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key}.")
.version("4.0.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ > 0, "Value must be positive.")
.createWithDefaultString("1s")

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
Expand Down
67 changes: 61 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@

package org.apache.spark.storage

import java.io.DataInputStream
import java.io.{DataInputStream, FileNotFoundException}
import java.nio.ByteBuffer

import scala.annotation.tailrec
import scala.concurrent.Future
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY, STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* A fallback storage used by storage decommissioners.
Expand Down Expand Up @@ -138,6 +139,7 @@ private[spark] object FallbackStorage extends Logging {
val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
// The fallback directory for this app may not be created yet.
if (fallbackFileSystem.exists(fallbackPath)) {
logInfo(log"Attempt to clean up: ${MDC(URI, fallbackUri)}")
if (fallbackFileSystem.delete(fallbackPath, true)) {
logInfo(log"Succeed to clean up: ${MDC(URI, fallbackUri)}")
} else {
Expand All @@ -155,6 +157,59 @@ private[spark] object FallbackStorage extends Logging {
FALLBACK_BLOCK_MANAGER_ID, blockId, StorageLevel.DISK_ONLY, memSize = 0, dataLength)
}

/**
* Open the file, retry a FileNotFoundException for waitMs milliseconds,
* unless this would exceed the deadline. In the latter case, rethrow the exception.
*/
@tailrec
private def open(filesystem: FileSystem,
path: Path,
deadlineMs: Long,
waitMs: Long,
clock: Clock) : FSDataInputStream = {
try {
filesystem.open(path)
} catch {
case fnf: FileNotFoundException =>
val waitTillMs = clock.getTimeMillis() + waitMs
if (waitTillMs <= deadlineMs) {
logInfo(f"File not found, waiting ${waitMs / 1000}s: $path")
clock.waitTillTime(waitTillMs)
open(filesystem, path, deadlineMs, waitMs, clock)
} else {
throw fnf
}
}
}

/**
* Open the file and retry FileNotFoundExceptions according to
* STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY and
* STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT
*/
// Visible for testing
private[spark] def open(conf: SparkConf,
filesystem: FileSystem,
path: Path,
clock: Clock = new SystemClock()): FSDataInputStream = {
val replicationDelay = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY)
if (replicationDelay.isDefined) {
val replicationDeadline = clock.getTimeMillis() + replicationDelay.get * 1000
val replicationWait = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT)
val replicationWaitMs = replicationWait * 1000
try {
open(filesystem, path, replicationDeadline, replicationWaitMs, clock)
} catch {
case fnf: FileNotFoundException =>
logInfo(f"File not found, exceeded expected replication delay " +
f"of ${replicationDelay.get}s: $path")
throw fnf
}
} else {
filesystem.open(path)
}
}

/**
* Read a ManagedBuffer.
*/
Expand All @@ -180,7 +235,7 @@ private[spark] object FallbackStorage extends Logging {
val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val start = startReduceId * 8L
val end = endReduceId * 8L
Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
Utils.tryWithResource(open(conf, fallbackFileSystem, indexFile)) { inputStream =>
Utils.tryWithResource(new DataInputStream(inputStream)) { index =>
index.skip(start)
val offset = index.readLong()
Expand All @@ -193,7 +248,7 @@ private[spark] object FallbackStorage extends Logging {
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
Utils.tryWithResource(open(conf, fallbackFileSystem, dataFile)) { f =>
f.seek(offset)
f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
*/
package org.apache.spark.storage

import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
import java.io.{DataOutputStream, File, FileNotFoundException, FileOutputStream, InputStream, IOException}
import java.nio.file.Files

import scala.concurrent.duration._
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
Expand All @@ -39,11 +40,12 @@ import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.Clock
import org.apache.spark.util.Utils.tryWithResource

class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {

def getSparkConf(initialExecutor: Int = 1, minExecutor: Int = 1): SparkConf = {
def getSparkConf(initialExecutor: Int = 1, minExecutor: Int = 1): SparkConf = {
new SparkConf(false)
.setAppName(getClass.getName)
.set(SPARK_MASTER, s"local-cluster[$initialExecutor,1,1024]")
Expand Down Expand Up @@ -334,7 +336,43 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}
}

Seq(0, 1000, 3000, 6000).foreach { replicationMs =>
test(s"Consider replication delay - ${replicationMs}ms") {
val delay = 5000 // max allowed replication
val wait = 2000 // time between open file attempts
val conf = getSparkConf()
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY.key, s"${delay}ms")
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT.key, s"${wait}ms")

val filesystem = FileSystem.get(SparkHadoopUtil.get.newConfiguration(conf))
val path = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, "file")
val startMs = 123000000L * 1000L // arbitrary system time
val clock = new DelayedActionClock(replicationMs, startMs)(filesystem.create(path).close())

if (replicationMs <= delay) {
// expect open to succeed
val in = FallbackStorage.open(conf, filesystem, path, clock)
assert(in != null)

// how many waits are expected to observe replication
val expectedWaits = Math.ceil(replicationMs.toFloat / wait).toInt
assert(clock.timeMs == startMs + expectedWaits * wait)
assert(clock.waited == expectedWaits)
in.close()
} else {
// expect open to fail
assertThrows[FileNotFoundException](FallbackStorage.open(conf, filesystem, path, clock))

// how many waits are expected to observe delay
val expectedWaits = delay / wait
assert(clock.timeMs == startMs + expectedWaits * wait)
assert(clock.waited == expectedWaits)
}
}
}
}

class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
with Seekable with PositionedReadable {
override def read: Int = in.read
Expand Down Expand Up @@ -378,3 +416,31 @@ class ReadPartialFileSystem extends LocalFileSystem {
new FSDataInputStream(new ReadPartialInputStream(stream))
}
}

class DelayedActionClock(delayMs: Long, startTimeMs: Long)(action: => Unit)
extends Clock {
var timeMs: Long = startTimeMs
var waited: Int = 0
var triggered: Boolean = false

if (delayMs == 0) trigger()

private def trigger(): Unit = {
if (!triggered) {
triggered = true
action
}
}

override def getTimeMillis(): Long = timeMs
override def nanoTime(): Long = timeMs * 1000000
override def waitTillTime(targetTime: Long): Long = {
waited += 1
if (targetTime >= startTimeMs + delayMs) {
timeMs = startTimeMs + delayMs
trigger()
}
timeMs = targetTime
targetTime
}
}
Loading