Skip to content

Commit

Permalink
Make the number of fallback storage sub-directories configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Nov 25, 2024
1 parent 79cb603 commit 16290c8
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,24 @@ package object config {
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
.doc("The location for fallback storage during block manager decommissioning. " +
"For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " +
"The storage should be managed by TTL because Spark will not clean it up.")
"The storage should be managed by TTL because Spark will not clean it up, " +
"unless spark.storage.decommission.fallbackStorage.cleanUp is true.")
.version("3.1.0")
.stringConf
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS =
ConfigBuilder("spark.storage.decommission.fallbackStorage.subPaths")
.doc("The fallback storage puts all files of one shuffle in one directory when this is 0. " +
"When this option is larger than 0, it will instead distribute the files across " +
"this number of subdirectories.")
.version("4.0.0")
.intConf
.checkValue(_ >= 0, "The number of subdirectories must be 0 or larger.")
.createWithDefault(Int.MaxValue)


private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
Expand Down
35 changes: 24 additions & 11 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage.FallbackStorage.getPath
import org.apache.spark.util.Utils

/**
Expand All @@ -62,17 +63,15 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
val indexFile = r.getIndexFile(shuffleId, mapId)

if (indexFile.exists()) {
val hash = JavaUtils.nonNegativeHash(indexFile.getName)
fallbackFileSystem.copyFromLocalFile(
new Path(Utils.resolveURI(indexFile.getAbsolutePath)),
new Path(fallbackPath, s"$appId/$shuffleId/$hash/${indexFile.getName}"))
getPath(conf, appId, shuffleId, indexFile.getName))

val dataFile = r.getDataFile(shuffleId, mapId)
if (dataFile.exists()) {
val hash = JavaUtils.nonNegativeHash(dataFile.getName)
fallbackFileSystem.copyFromLocalFile(
new Path(Utils.resolveURI(dataFile.getAbsolutePath)),
new Path(fallbackPath, s"$appId/$shuffleId/$hash/${dataFile.getName}"))
getPath(conf, appId, shuffleId, dataFile.getName))
}

// Report block statuses
Expand All @@ -90,8 +89,7 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
}

def exists(shuffleId: Int, filename: String): Boolean = {
val hash = JavaUtils.nonNegativeHash(filename)
fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
fallbackFileSystem.exists(getPath(conf, appId, shuffleId, filename))
}
}

Expand Down Expand Up @@ -155,6 +153,23 @@ private[spark] object FallbackStorage extends Logging {
FALLBACK_BLOCK_MANAGER_ID, blockId, StorageLevel.DISK_ONLY, memSize = 0, dataLength)
}

/**
* Provide the Path for a shuffle file.
*/
private[storage] def getPath(conf: SparkConf,
appId: String,
shuffleId: Int,
filename: String): Path = {
val fallbackPath = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
val subPaths = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS)
if (subPaths > 0) {
val hash = JavaUtils.nonNegativeHash(filename) % subPaths
new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename")
} else {
new Path(fallbackPath, s"$appId/$shuffleId/$filename")
}
}

/**
* Read a ManagedBuffer.
*/
Expand All @@ -176,8 +191,7 @@ private[spark] object FallbackStorage extends Logging {
}

val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
val hash = JavaUtils.nonNegativeHash(name)
val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val indexFile = getPath(conf, appId, shuffleId, name)
val start = startReduceId * 8L
val end = endReduceId * 8L
Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
Expand All @@ -187,8 +201,7 @@ private[spark] object FallbackStorage extends Logging {
index.skip(end - (start + 8L))
val nextOffset = index.readLong()
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
val hash = JavaUtils.nonNegativeHash(name)
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val dataFile = getPath(conf, appId, shuffleId, name)
val size = nextOffset - offset
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.concurrent.duration._
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PathFilter, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.HadoopFSUtils
import org.apache.spark.util.Utils.tryWithResource

class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
Expand Down Expand Up @@ -293,6 +294,62 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}

Seq(0, 1, 2, 4, 1024, Int.MaxValue).foreach { subPaths =>
test(s"Get path for filename with $subPaths subdirectories") {
val conf = getSparkConf(2, 2).set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS, subPaths)
val path = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get
val appId = "app-id"
val shuffleId = 123
val filename = "the-file"
val actual = FallbackStorage.getPath(conf, appId, shuffleId, filename)
val expected = if (subPaths == 0) {
new Path(s"${path}/$appId/$shuffleId/$filename")
} else {
new Path(s"${path}/$appId/$shuffleId/${1049883992 % subPaths}/$filename")
}
assert(actual == expected)
}
}

Seq(0, 1, 2, 4, 1024, Int.MaxValue).foreach { subPaths =>
test(s"Control number of sub-directories ($subPaths)") {
val conf = getSparkConf(2, 2).set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS, subPaths)
sc = new SparkContext(conf)
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val rdd1 = sc.parallelize(1 to 10, 10)
val rdd2 = rdd1.map(x => (x % 2, 1))
val rdd3 = rdd2.reduceByKey(_ + _)
assert(rdd3.count() === 2)

// Decommission all
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
sc.getExecutorIds().foreach {
sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false)
}

// We expect two files per partition, with ten partitions
val files = 0 until 10 flatMap (idx => Seq(
s"shuffle_0_${idx}_0.index", s"shuffle_0_${idx}_0.data")
)
val fallbackStorage = new FallbackStorage(sc.getConf)
// Uploading is completed on decommissioned executors
eventually(timeout(20.seconds), interval(1.seconds)) {
files.foreach { file => assert(fallbackStorage.exists(0, file)) }
}

// Check number of subdirectories
val path = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get
val noopFilter = new PathFilter {
override def accept(path: Path): Boolean = true
}
val subDirs = HadoopFSUtils.listFiles(new Path(path), sc.hadoopConfiguration, noopFilter)
.flatMap(_._2.map(_.getPath.getParent)).toSet.toList
assert(subDirs.length == Math.max(1, Math.min(subPaths, 20)), subDirs.mkString(", "))
}
}
}

CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
test(s"$codec - Newly added executors should access old data from remote storage") {
sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec))
Expand Down Expand Up @@ -335,6 +392,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}
}

class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
with Seekable with PositionedReadable {
override def read: Int = in.read
Expand Down

0 comments on commit 16290c8

Please sign in to comment.