From 8ef51da28da0a66678113749c645f0a9c84b964a Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 13 Jun 2023 11:52:11 +0200 Subject: [PATCH] MemoryManager and Page Size + Configuration Properties --- docs/configuration-properties.md | 134 +++++++++++++++++++++---------- docs/memory/MemoryManager.md | 36 +++++++-- docs/memory/TaskMemoryManager.md | 6 +- 3 files changed, 127 insertions(+), 49 deletions(-) diff --git a/docs/configuration-properties.md b/docs/configuration-properties.md index 4aeb19bdcd..e7fb01f263 100644 --- a/docs/configuration-properties.md +++ b/docs/configuration-properties.md @@ -1,27 +1,5 @@ # Configuration Properties -## spark.scheduler - -### barrier.maxConcurrentTasksCheck.interval { #spark.scheduler.barrier.maxConcurrentTasksCheck.interval } - -**spark.scheduler.barrier.maxConcurrentTasksCheck.interval** - -### barrier.maxConcurrentTasksCheck.maxFailures { #spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures } - -**spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures** - -## shuffle.sort.io.plugin.class - -**spark.shuffle.sort.io.plugin.class** - -Name of the class to use for [shuffle IO](shuffle/ShuffleDataIO.md) - -Default: [LocalDiskShuffleDataIO](shuffle/LocalDiskShuffleDataIO.md) - -Used when: - -* `ShuffleDataIOUtils` is requested to [loadShuffleDataIO](shuffle/ShuffleDataIOUtils.md#loadShuffleDataIO) - ## spark.app.id Unique identifier of a Spark application that Spark uses to uniquely identify [metric sources](metrics/MetricsSystem.md#buildRegistryName). @@ -55,6 +33,18 @@ Used when: * `TorrentBroadcast` is requested to [setConf](broadcast-variables/TorrentBroadcast.md#compressionCodec) * `SerializerManager` is [created](serializer/SerializerManager.md#compressBroadcast) +## spark.buffer.pageSize { #spark.buffer.pageSize } + +**spark.buffer.pageSize** + +The amount of memory used per page (in bytes) + +Default: (undefined) + +Used when: + +* `MemoryManager` is [created](memory/MemoryManager.md#pageSizeBytes) + ## spark.cleaner.referenceTracking Controls whether to enable [ContextCleaner](core/ContextCleaner.md) @@ -483,15 +473,29 @@ Used when: * `Executor` is [created](executor/Executor.md#maxDirectResultSize) * `MapOutputTrackerMaster` is [created](scheduler/MapOutputTrackerMaster.md#maxRpcMessageSize) (and makes sure that [spark.shuffle.mapOutput.minSizeForBroadcast](#spark.shuffle.mapOutput.minSizeForBroadcast) is below the threshold) -## spark.scheduler.minRegisteredResourcesRatio +## spark.scheduler + +### barrier.maxConcurrentTasksCheck.interval { #spark.scheduler.barrier.maxConcurrentTasksCheck.interval } + +**spark.scheduler.barrier.maxConcurrentTasksCheck.interval** + +### barrier.maxConcurrentTasksCheck.maxFailures { #spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures } + +**spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures** + +### minRegisteredResourcesRatio { #spark.scheduler.minRegisteredResourcesRatio } + +**spark.scheduler.minRegisteredResourcesRatio** Minimum ratio of (registered resources / total expected resources) before submitting tasks Default: (undefined) -## spark.scheduler.revive.interval +### spark.scheduler.revive.interval { #spark.scheduler.revive.interval } -**Revive Interval** that is the time (in millis) between resource offers revives +**spark.scheduler.revive.interval** + +The time (in millis) between resource offers revives Default: `1s` @@ -510,7 +514,23 @@ Used when: * `SparkEnv` utility is used to [create a SparkEnv](SparkEnv.md#create) * `SparkConf` is requested to [registerKryoClasses](SparkConf.md#registerKryoClasses) (as a side-effect) -## spark.shuffle.checksum.enabled +## spark.shuffle + +### sort.io.plugin.class { #spark.shuffle.sort.io.plugin.class } + +**spark.shuffle.sort.io.plugin.class** + +Name of the class to use for [shuffle IO](shuffle/ShuffleDataIO.md) + +Default: [LocalDiskShuffleDataIO](shuffle/LocalDiskShuffleDataIO.md) + +Used when: + +* `ShuffleDataIOUtils` is requested to [loadShuffleDataIO](shuffle/ShuffleDataIOUtils.md#loadShuffleDataIO) + +### checksum.enabled { #spark.shuffle.checksum.enabled } + +**spark.shuffle.checksum.enabled** Controls checksuming of shuffle data. If enabled, Spark will calculate the checksum values for each partition data within the map output file and store the values in a checksum file on the disk. @@ -518,13 +538,17 @@ When there's shuffle data corruption detected, Spark will try to diagnose the ca Default: `true` -## spark.shuffle.compress +### compress { #spark.shuffle.compress } + +**spark.shuffle.compress** -Controls whether to compress shuffle output when stored +Enables compressing shuffle output when stored Default: `true` -## spark.shuffle.detectCorrupt +### detectCorrupt { #spark.shuffle.detectCorrupt } + +**spark.shuffle.detectCorrupt** Controls corruption detection in fetched blocks @@ -534,7 +558,9 @@ Used when: * `BlockStoreShuffleReader` is requested to [read combined records for a reduce task](shuffle/BlockStoreShuffleReader.md#read) -## spark.shuffle.detectCorrupt.useExtraMemory +### detectCorrupt.useExtraMemory { #spark.shuffle.detectCorrupt.useExtraMemory } + +**spark.shuffle.detectCorrupt.useExtraMemory** If enabled, part of a compressed/encrypted stream will be de-compressed/de-crypted by using extra memory to detect early corruption. Any `IOException` thrown will cause the task to be retried once and if it fails again with same exception, then `FetchFailedException` will be thrown to retry previous stage @@ -544,7 +570,9 @@ Used when: * `BlockStoreShuffleReader` is requested to [read combined records for a reduce task](shuffle/BlockStoreShuffleReader.md#read) -## spark.shuffle.file.buffer +### file.buffer { #spark.shuffle.file.buffer } + +**spark.shuffle.file.buffer** Size of the in-memory buffer for each shuffle file output stream, in KiB unless otherwise specified. These buffers reduce the number of disk seeks and system calls made in creating intermediate shuffle files. @@ -560,7 +588,9 @@ Used when the following are created: * [ExternalAppendOnlyMap](shuffle/ExternalAppendOnlyMap.md) * [ExternalSorter](shuffle/ExternalSorter.md) -## spark.shuffle.manager +### manager { #spark.shuffle.manager } + +**spark.shuffle.manager** A fully-qualified class name or the alias of the [ShuffleManager](shuffle/ShuffleManager.md) in a Spark application @@ -573,7 +603,9 @@ Supported aliases: Used when `SparkEnv` object is requested to [create a "base" SparkEnv for a driver or an executor](SparkEnv.md#create) -## spark.shuffle.mapOutput.parallelAggregationThreshold +### mapOutput.parallelAggregationThreshold { #spark.shuffle.mapOutput.parallelAggregationThreshold } + +**spark.shuffle.mapOutput.parallelAggregationThreshold** **(internal)** Multi-thread is used when the number of mappers * shuffle partitions is greater than or equal to this threshold. Note that the actual parallelism is calculated by number of mappers * shuffle partitions / this threshold + 1, so this threshold should be positive. @@ -583,7 +615,9 @@ Used when: * `MapOutputTrackerMaster` is requested for the [statistics of a ShuffleDependency](scheduler/MapOutputTrackerMaster.md#getStatistics) -## spark.shuffle.minNumPartitionsToHighlyCompress +### minNumPartitionsToHighlyCompress { #spark.shuffle.minNumPartitionsToHighlyCompress } + +**spark.shuffle.minNumPartitionsToHighlyCompress** **(internal)** Minimum number of partitions (threshold) for `MapStatus` utility to prefer a [HighlyCompressedMapStatus](scheduler/MapStatus.md#HighlyCompressedMapStatus) (over [CompressedMapStatus](scheduler/MapStatus.md#CompressedMapStatus)) (for [ShuffleWriters](shuffle/ShuffleWriter.md)). @@ -591,7 +625,9 @@ Default: `2000` Must be a positive integer (above `0`) -## spark.shuffle.push.enabled +### push.enabled { #spark.shuffle.push.enabled } + +**spark.shuffle.push.enabled** Enables push-based shuffle on the client side @@ -603,13 +639,17 @@ Used when: * `Utils` utility is used to [determine whether push-based shuffle is enabled or not](Utils.md#isPushBasedShuffleEnabled) -## spark.shuffle.readHostLocalDisk +### readHostLocalDisk { #spark.shuffle.readHostLocalDisk } + +**spark.shuffle.readHostLocalDisk** If enabled (with [spark.shuffle.useOldFetchProtocol](#spark.shuffle.useOldFetchProtocol) disabled and [spark.shuffle.service.enabled](external-shuffle-service/configuration-properties.md#spark.shuffle.service.enabled) enabled), shuffle blocks requested from those block managers which are running on the same host are read from the disk directly instead of being fetched as remote blocks over the network. Default: `true` -## spark.shuffle.registration.maxAttempts +### registration.maxAttempts { #spark.shuffle.registration.maxAttempts } + +**spark.shuffle.registration.maxAttempts** How many attempts to [register a BlockManager with External Shuffle Service](storage/BlockManager.md#registerWithExternalShuffleServer) @@ -617,7 +657,9 @@ Default: `3` Used when `BlockManager` is requested to [register with External Shuffle Server](storage/BlockManager.md#registerWithExternalShuffleServer) -## spark.shuffle.sort.bypassMergeThreshold +### sort.bypassMergeThreshold { #spark.shuffle.sort.bypassMergeThreshold } + +**spark.shuffle.sort.bypassMergeThreshold** Maximum number of reduce partitions below which [SortShuffleManager](shuffle/SortShuffleManager.md) avoids merge-sorting data for no map-side aggregation @@ -628,7 +670,9 @@ Used when: * `SortShuffleWriter` utility is used to [shouldBypassMergeSort](shuffle/SortShuffleWriter.md#shouldBypassMergeSort) * `ShuffleExchangeExec` ([Spark SQL]({{ book.spark_sql }}/physical-operators/ShuffleExchangeExec)) physical operator is requested to `prepareShuffleDependency` -## spark.shuffle.spill.initialMemoryThreshold +### spill.initialMemoryThreshold { #spark.shuffle.spill.initialMemoryThreshold } + +**spark.shuffle.spill.initialMemoryThreshold** Initial threshold for the size of an in-memory collection @@ -636,7 +680,9 @@ Default: 5MB Used by [Spillable](shuffle/Spillable.md#initialMemoryThreshold) -## spark.shuffle.spill.numElementsForceSpillThreshold +### spill.numElementsForceSpillThreshold { #spark.shuffle.spill.numElementsForceSpillThreshold } + +**spark.shuffle.spill.numElementsForceSpillThreshold** **(internal)** The maximum number of elements in memory before forcing the shuffle sorter to spill. @@ -653,7 +699,9 @@ Used when: * Spark SQL's `UnsafeExternalRowSorter` is created * Spark SQL's `UnsafeFixedWidthAggregationMap` is requested for an `UnsafeKVExternalSorter` -## spark.shuffle.sync +### sync { #spark.shuffle.sync } + +**spark.shuffle.sync** Controls whether `DiskBlockObjectWriter` should force outstanding writes to disk while [committing a single atomic block](storage/DiskBlockObjectWriter.md#commitAndGet) (i.e. all operating system buffers should synchronize with the disk to ensure that all changes to a file are in fact recorded in the storage) @@ -661,7 +709,9 @@ Default: `false` Used when `BlockManager` is requested for a [DiskBlockObjectWriter](storage/BlockManager.md#getDiskWriter) -## spark.shuffle.useOldFetchProtocol +### useOldFetchProtocol { #spark.shuffle.useOldFetchProtocol } + +**spark.shuffle.useOldFetchProtocol** Whether to use the old protocol while doing the shuffle block fetching. It is only enabled while we need the compatibility in the scenario of new Spark version job fetching shuffle blocks from old version external shuffle service. diff --git a/docs/memory/MemoryManager.md b/docs/memory/MemoryManager.md index af55fa1f8a..0f783bcc55 100644 --- a/docs/memory/MemoryManager.md +++ b/docs/memory/MemoryManager.md @@ -1,6 +1,6 @@ # MemoryManager -`MemoryManager` is an [abstraction](#contract) of [memory managers](#implementations) that can share available memory between task execution ([TaskMemoryManager](TaskMemoryManager.md#memoryManager)) and storage ([BlockManager](../storage/BlockManager.md#memoryManager)). +`MemoryManager` is an [abstraction](#contract) of [memory managers](#implementations) that can share available memory between tasks ([TaskMemoryManager](TaskMemoryManager.md#memoryManager)) and storage ([BlockManager](../storage/BlockManager.md#memoryManager)). ![MemoryManager and Core Services](../images/memory/MemoryManager.png) @@ -95,16 +95,23 @@ Used when: ??? note "Abstract Class" `MemoryManager` is an abstract class and cannot be created directly. It is created indirectly for the [concrete MemoryManagers](#implementations). -## Accessing MemoryManager (SparkEnv) +## Accessing MemoryManager { #SparkEnv } `MemoryManager` is available as [SparkEnv.memoryManager](../SparkEnv.md#memoryManager) on the driver and executors. -```text +```scala import org.apache.spark.SparkEnv val mm = SparkEnv.get.memoryManager +``` + +```scala +// MemoryManager is private[spark] +// the following won't work unless within org.apache.spark package +// import org.apache.spark.memory.MemoryManager +// assert(mm.isInstanceOf[MemoryManager]) -scala> :type mm -org.apache.spark.memory.MemoryManager +// we have to revert to string comparision 😔 +assert("UnifiedMemoryManager".equals(mm.getClass.getSimpleName)) ``` ## Associating MemoryStore with Storage Memory Pools @@ -241,3 +248,22 @@ MemoryMode | MemoryAllocator `tungstenMemoryAllocator` is used when: * `TaskMemoryManager` is requested to [allocate a memory page](TaskMemoryManager.md#allocatePage), [release a memory page](TaskMemoryManager.md#freePage) and [clean up all the allocated memory](TaskMemoryManager.md#cleanUpAllAllocatedMemory) + +## Page Size { #pageSizeBytes } + +`pageSizeBytes` is either [spark.buffer.pageSize](../configuration-properties.md#spark.buffer.pageSize), if defined, or the [default page size](#defaultPageSizeBytes). + +`pageSizeBytes` is used when: + +* `TaskMemoryManager` is requested for the [page size](TaskMemoryManager.md#pageSizeBytes) + +### Default Page Size { #defaultPageSizeBytes } + +```scala +defaultPageSizeBytes: Long +``` + +??? note "Lazy Value" + `defaultPageSizeBytes` is a Scala **lazy value** to guarantee that the code to initialize it is executed once only (when accessed for the first time) and the computed value never changes afterwards. + + Learn more in the [Scala Language Specification]({{ scala.spec }}/05-classes-and-objects.html#lazy). diff --git a/docs/memory/TaskMemoryManager.md b/docs/memory/TaskMemoryManager.md index 3c71ea843c..51b689df70 100644 --- a/docs/memory/TaskMemoryManager.md +++ b/docs/memory/TaskMemoryManager.md @@ -172,13 +172,15 @@ In the end, `releaseExecutionMemory` requests the [MemoryManager](#memoryManager * `MemoryConsumer` is requested to [free up memory](MemoryConsumer.md#freeMemory) * `TaskMemoryManager` is requested to [allocatePage](#allocatePage) and [freePage](#freePage) -## Page Size +## Page Size { #pageSizeBytes } ```java long pageSizeBytes() ``` -`pageSizeBytes` requests the [MemoryManager](#memoryManager) for the [pageSizeBytes](MemoryManager.md#pageSizeBytes). +`pageSizeBytes` requests the [MemoryManager](#memoryManager) for the [page size](MemoryManager.md#pageSizeBytes). + +--- `pageSizeBytes` is used when: