Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use fair-spill pool when spark.memory.offHeap.enabled=false #1004

Closed
wants to merge 3 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Oct 8, 2024

Which issue does this PR close?

Closes #996

Possibly depends on #988 being resolved first

Rationale for this change

This is an alternative to the approach in #1002

What changes are included in this PR?

  • Take task concurrency into account when allocating memory per native plan
  • Use fair spool pool instead of greedy pool

How are these changes tested?

@andygrove
Copy link
Member Author

I tried running benchmarks with this PR but ran into:

Failed to allocate additional 917708800 bytes for ShuffleRepartitioner[0] with 0 bytes already allocated for this reservation - 858914816 bytes remain available for the total pool

Perhaps we need to merge #988 first

@andygrove
Copy link
Member Author

@Kontinuation Is this the general approach you were suggesting?

@andygrove
Copy link
Member Author

I was able to get benchmarks running by allocating more memory to Comet.

@andygrove andygrove changed the title feat: Use fair-spill pool when spark.executor.offHeap.enabled=false feat: Use fair-spill pool when spark.memory.offHeap.enabled=false Oct 8, 2024
@Kontinuation
Copy link
Member

Kontinuation commented Oct 9, 2024

This is better than using a greedy memory pool. It makes spillable operators work correctly under memory pressure, especially when running sort-merge-join where multiple sort operators compete for resources.

There are still some issues remaining unresolved. Each task may create multiple native plans and we still do not make them share the same memory pool. I'd like to share the experiments I've done to better sync with you on this topic.

I did some experiments on my branch to try out various ways of using memory pools. There's a configuration spark.comet.exec.memoryPool to allow me running queries using various memory pools. All configurations were tested using the query mentioned in #1003.

spark.comet.exec.memoryPool = greedy

This is the current mode when using native memory management. It could only run with spark.comet.memoryOverhead = 8000m, otherwise sort-merge-join will fail because of memory reservation failure:

24/10/09 10:59:14 WARN TaskSetManager: Lost task 3.0 in stage 13.0 (TID 43) (bogon executor driver): org.apache.comet.CometNativeException: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge[0] consumed 1164398840 bytes, GroupedHashAggregateStream[0] consumed 117699433 bytes, SMJStream[0] consumed 459160 bytes, HashJoinInput[0] consumed 1392 bytes. Error: Failed to allocate additional 993312 bytes for ExternalSorter[0] with 1321280 bytes already allocated for this reservation - 625495 bytes remain available for the total pool

spark.comet.exec.memoryPool = fair_spill

The same approach as this PR. Simply use FairSpillPool for per-plan memory pool. It could run with spark.comet.memoryOverhead = 3200m. Both sort operators could spill to cope with the memory bound:

24/10/09 11:03:11 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (stage: 13 task: 41):
AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum], metrics=[output_rows=2791473, elapsed_compute=3.695235425s]
  ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8], metrics=[output_rows=15002382, elapsed_compute=2.23923ms]
    ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1], metrics=[output_rows=15002382, elapsed_compute=2.445133ms]
      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)], metrics=[output_rows=15002382, input_batches=1832, input_rows=15002382, build_input_batches=1, output_batches=1832, build_input_rows=25, build_mem_used=1392, join_time=853.609662ms, build_time=42.125µs]
        CopyExec [UnpackOrDeepCopy], metrics=[output_rows=25, elapsed_compute=3.292µs]
          ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8], metrics=[output_rows=25, elapsed_compute=709ns, cast_time=1ns]
        CopyExec [UnpackOrClone], metrics=[output_rows=15002382, elapsed_compute=1.754617ms]
          ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8], metrics=[output_rows=15002382, elapsed_compute=2.032586ms]
            SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)], metrics=[output_rows=15002382, spill_count=0, spilled_bytes=0, spilled_rows=0, input_batches=2290, input_rows=18752902, output_batches=1832, peak_mem_used=918320, join_time=4.976020762s]
              SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false], metrics=[output_rows=3750520, elapsed_compute=1.678235876s, spill_count=3, spilled_bytes=572203168, spilled_rows=3066232]
                CopyExec [UnpackOrDeepCopy], metrics=[output_rows=3750520, elapsed_compute=65.265593ms]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64], metrics=[output_rows=3750520, elapsed_compute=450.456µs, cast_time=1ns]
              SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false], metrics=[output_rows=15002382, elapsed_compute=2.424249133s, spill_count=4, spilled_bytes=547164360, spilled_rows=13667085]
                CopyExec [UnpackOrDeepCopy], metrics=[output_rows=15002382, elapsed_compute=40.672672ms]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)], metrics=[output_rows=15002382, elapsed_compute=531.627µs, cast_time=1ns]

But each task may create more than one native plans, which has its own memory pool. Here is an example task creating 2 native plans, and these 2 plans are running concurrently.

Plan 1:

24/10/08 13:34:46 INFO core/src/execution/jni_api.rs: Comet native query plan (stage: 13 task: 40):
 AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum]
  ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8]
    ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1]
      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)]
        CopyExec [UnpackOrDeepCopy]
          ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8]
        CopyExec [UnpackOrClone]
          ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8]
            SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)]
              SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false]
                CopyExec [UnpackOrDeepCopy]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64]
              SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false]
                CopyExec [UnpackOrDeepCopy]
                  ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)]

Plan 2:

24/10/08 13:34:52 INFO core/src/execution/jni_api.rs: Comet native query plan (stage: 13 task: 40):
 ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }, Column { name: "col_2", index: 2 }, Column { name: "col_3", index: 3 }, Column { name: "col_4", index: 4 }, Column { name: "col_5", index: 5 }, Column { name: "col_6", index: 6 }], 4)
  ScanExec: source=[], schema=[col_0: Int64, col_1: Utf8, col_2: Decimal128(12, 2), col_3: Utf8, col_4: Utf8, col_5: Utf8, col_6: Utf8, col_7: Decimal128(36, 4), col_8: Boolean]

spark.comet.exec.memoryPool = fair_spill_shared

This approach allocates a FairSpillPool for all plans in the same task. For the above example, the sort-merge-join and the shuffle-write plans in the same task share the same memory pool. This strictly follows the conceptual model that comet won't exceed the spark.comet.memoryOverhead. It could run with spark.comet.memoryOverhead = 4800m.

I've added 2 additional JNI interfaces for creating a memory pool at the beginning of each task and releasing the memory pool at the end of each task. Actually this is not necessary. We can create and track the usage of per-task memory pool in the native code, all it needs is the task attempt id at native plan creation time.

spark.comet.exec.memoryPool = fair_spill_global

This approach uses a singleton FairSpillPool for all tasks in the same executor instance. I thought that it should be the optimal approach, but in practice it does not work well. It could only run with spark.comet.memoryOverhead = 12000m. I'll dive into this issue next week since there are lots of other work allocated for this week :(.

spark.comet.exec.memoryPool = greedy_global

This approach uses a singleton GreedyMemoryPool for all tasks in the same executor instance. As expected, it does not work well. It could only run with spark.comet.memoryOverhead = 9000m.

So the conclusion is that fair_spill and fair_spill_shared have lower memory requirements and are less likely to break when running memory-intensive queries, and I also believe that Spark needs a more sophisticated memory management system from datafusion to support large ETL use cases steadily, which is the use case where Spark shines.

@andygrove
Copy link
Member Author

Thanks for the detailed feedback @Kontinuation. I plan to resume work on this today/tomorrow.

@andygrove
Copy link
Member Author

@Kontinuation Do you want to create a PR from your branch? I like the idea of having some different configurable options while we are experimenting with this

@Kontinuation
Copy link
Member

@Kontinuation Do you want to create a PR from your branch? I like the idea of having some different configurable options while we are experimenting with this

Sure. I'll clean up the code and submit the PR tomorrow.

@andygrove
Copy link
Member Author

Closing in favor of #1021

@andygrove andygrove closed this Oct 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement shared memory pool when native memory management is enabled
2 participants