diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0e19143411e96..29f89dc9f23ab 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -578,12 +578,24 @@ package object config { ConfigBuilder("spark.storage.decommission.fallbackStorage.path") .doc("The location for fallback storage during block manager decommissioning. " + "For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " + - "The storage should be managed by TTL because Spark will not clean it up.") + "The storage should be managed by TTL because Spark will not clean it up, " + + "unless spark.storage.decommission.fallbackStorage.cleanUp is true.") .version("3.1.0") .stringConf .checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.") .createOptional + private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS = + ConfigBuilder("spark.storage.decommission.fallbackStorage.subPaths") + .doc("The fallback storage puts all files of one shuffle in one directory when this is 0. " + + "When this option is larger than 0, it will instead distribute the files across " + + "this number of subdirectories.") + .version("4.0.0") + .intConf + .checkValue(_ >= 0, "The number of subdirectories must be 0 or larger.") + .createWithDefault(Int.MaxValue) + + private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP = ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp") .doc("If true, Spark cleans up its fallback storage data during shutting down.") diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 0f2bfaede4454..888f2a0330e33 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -30,12 +30,13 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH} +import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo} import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID +import org.apache.spark.storage.FallbackStorage.getPath import org.apache.spark.util.Utils /** @@ -62,17 +63,15 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging { val indexFile = r.getIndexFile(shuffleId, mapId) if (indexFile.exists()) { - val hash = JavaUtils.nonNegativeHash(indexFile.getName) fallbackFileSystem.copyFromLocalFile( new Path(Utils.resolveURI(indexFile.getAbsolutePath)), - new Path(fallbackPath, s"$appId/$shuffleId/$hash/${indexFile.getName}")) + getPath(conf, appId, shuffleId, indexFile.getName)) val dataFile = r.getDataFile(shuffleId, mapId) if (dataFile.exists()) { - val hash = JavaUtils.nonNegativeHash(dataFile.getName) fallbackFileSystem.copyFromLocalFile( new Path(Utils.resolveURI(dataFile.getAbsolutePath)), - new Path(fallbackPath, s"$appId/$shuffleId/$hash/${dataFile.getName}")) + getPath(conf, appId, shuffleId, dataFile.getName)) } // Report block statuses @@ -90,8 +89,7 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging { } def exists(shuffleId: Int, filename: String): Boolean = { - val hash = JavaUtils.nonNegativeHash(filename) - fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename")) + fallbackFileSystem.exists(getPath(conf, appId, shuffleId, filename)) } } @@ -155,6 +153,23 @@ private[spark] object FallbackStorage extends Logging { FALLBACK_BLOCK_MANAGER_ID, blockId, StorageLevel.DISK_ONLY, memSize = 0, dataLength) } + /** + * Provide the Path for a shuffle file. + */ + private[storage] def getPath(conf: SparkConf, + appId: String, + shuffleId: Int, + filename: String): Path = { + val fallbackPath = new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get) + val subPaths = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS) + if (subPaths > 0) { + val hash = JavaUtils.nonNegativeHash(filename) % subPaths + new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename") + } else { + new Path(fallbackPath, s"$appId/$shuffleId/$filename") + } + } + /** * Read a ManagedBuffer. */ @@ -176,8 +191,7 @@ private[spark] object FallbackStorage extends Logging { } val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name - val hash = JavaUtils.nonNegativeHash(name) - val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name") + val indexFile = getPath(conf, appId, shuffleId, name) val start = startReduceId * 8L val end = endReduceId * 8L Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream => @@ -187,8 +201,7 @@ private[spark] object FallbackStorage extends Logging { index.skip(end - (start + 8L)) val nextOffset = index.readLong() val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name - val hash = JavaUtils.nonNegativeHash(name) - val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name") + val dataFile = getPath(conf, appId, shuffleId, name) val size = nextOffset - offset logDebug(s"To byte array $size") val array = new Array[Byte](size.toInt) diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala index 6c51bd4ff2e2f..2f4b85ba9c3a3 100644 --- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala @@ -23,7 +23,7 @@ import scala.concurrent.duration._ import scala.util.Random import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable} +import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PathFilter, PositionedReadable, Seekable} import org.mockito.{ArgumentMatchers => mc} import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} @@ -39,6 +39,7 @@ import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo} import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID +import org.apache.spark.util.HadoopFSUtils import org.apache.spark.util.Utils.tryWithResource class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { @@ -293,6 +294,62 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { } } + Seq(0, 1, 2, 4, 1024, Int.MaxValue).foreach { subPaths => + test(s"Get path for filename with $subPaths subdirectories") { + val conf = getSparkConf(2, 2).set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS, subPaths) + val path = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get + val appId = "app-id" + val shuffleId = 123 + val filename = "the-file" + val actual = FallbackStorage.getPath(conf, appId, shuffleId, filename) + val expected = if (subPaths == 0) { + new Path(s"${path}/$appId/$shuffleId/$filename") + } else { + new Path(s"${path}/$appId/$shuffleId/${1049883992 % subPaths}/$filename") + } + assert(actual == expected) + } + } + + Seq(0, 1, 2, 4, 1024, Int.MaxValue).foreach { subPaths => + test(s"Control number of sub-directories ($subPaths)") { + val conf = getSparkConf(2, 2).set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_SUBPATHS, subPaths) + sc = new SparkContext(conf) + withSpark(sc) { sc => + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) + val rdd1 = sc.parallelize(1 to 10, 10) + val rdd2 = rdd1.map(x => (x % 2, 1)) + val rdd3 = rdd2.reduceByKey(_ + _) + assert(rdd3.count() === 2) + + // Decommission all + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + sc.getExecutorIds().foreach { + sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false) + } + + // We expect two files per partition, with ten partitions + val files = 0 until 10 flatMap (idx => Seq( + s"shuffle_0_${idx}_0.index", s"shuffle_0_${idx}_0.data") + ) + val fallbackStorage = new FallbackStorage(sc.getConf) + // Uploading is completed on decommissioned executors + eventually(timeout(20.seconds), interval(1.seconds)) { + files.foreach { file => assert(fallbackStorage.exists(0, file)) } + } + + // Check number of subdirectories + val path = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get + val noopFilter = new PathFilter { + override def accept(path: Path): Boolean = true + } + val subDirs = HadoopFSUtils.listFiles(new Path(path), sc.hadoopConfiguration, noopFilter) + .flatMap(_._2.map(_.getPath.getParent)).toSet.toList + assert(subDirs.length == Math.max(1, Math.min(subPaths, 20)), subDirs.mkString(", ")) + } + } + } + CompressionCodec.shortCompressionCodecNames.keys.foreach { codec => test(s"$codec - Newly added executors should access old data from remote storage") { sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec)) @@ -335,6 +392,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { } } } + class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream with Seekable with PositionedReadable { override def read: Int = in.read