From 2971586485d489bfb2571ac1df1c4712aa11e132 Mon Sep 17 00:00:00 2001 From: jishnub Date: Fri, 29 Oct 2021 22:08:38 +0400 Subject: [PATCH] Update examples --- docs/src/examples/pmapreduce.md | 65 ++++++++++++------- docs/src/examples/sharedarrays.md | 2 +- docs/src/examples/threads.md | 72 ++++++++++------------ examples/mpiclustermanager_mpitransport.jl | 12 ++++ examples/mpijobscript.slurm | 10 +++ examples/pmapreduce.jl | 29 +++++---- examples/pmapreducejobscript.jl | 6 ++ examples/pmapreducejobscript.slurm | 10 +++ examples/threads.jl | 49 +++++++-------- examples/threadsjobscript.jl | 6 ++ examples/threadsjobscript.slurm | 13 ++++ 11 files changed, 172 insertions(+), 102 deletions(-) create mode 100644 examples/mpiclustermanager_mpitransport.jl create mode 100644 examples/mpijobscript.slurm create mode 100644 examples/pmapreducejobscript.jl create mode 100644 examples/pmapreducejobscript.slurm create mode 100644 examples/threadsjobscript.jl create mode 100644 examples/threadsjobscript.slurm diff --git a/docs/src/examples/pmapreduce.md b/docs/src/examples/pmapreduce.md index a16f722..b92e71b 100644 --- a/docs/src/examples/pmapreduce.md +++ b/docs/src/examples/pmapreduce.md @@ -1,5 +1,7 @@ # Example of the use of pmapreduce +## Using [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl) + The function `pmapreduce` performs a parallel `mapreduce`. This is primarily useful when the function has to perform an expensive calculation, that is the evaluation time per core exceeds the setup and communication time. This is also useful when each core is allocated memory and has to work with arrays that won't fit into memory collectively, as is often the case on a cluster. We walk through an example where we initialize and concatenate arrays in serial and in parallel. @@ -11,13 +13,15 @@ using ParallelUtilities using Distributed ``` -We define the function that performs the initialization on each core. This step is embarassingly parallel as no communication happens between workers. We simulate an expensive calculation by adding a sleep interval for each index. +We define the function that performs the initialization on each core. This step is embarassingly parallel as no communication happens between workers. ```julia -function initialize(sleeptime) - A = Array{Int}(undef, 20, 20) +function initialize(x, n) + inds = 1:n + d, r = divrem(length(inds), nworkers()) + ninds_local = d + (x <= r) + A = zeros(Int, 50, ninds_local) for ind in eachindex(A) - sleep(sleeptime) A[ind] = ind end return A @@ -27,34 +31,35 @@ end Next we define the function that calls `pmapreduce`: ```julia -function main_pmapreduce(sleeptime) - pmapreduce(x -> initialize(sleeptime), hcat, 1:20) +function mapreduce_parallel(n) + pmapreduce(x -> initialize(x, n), hcat, 1:nworkers()) end ``` We also define a function that carries out a serial mapreduce: ```julia -function main_mapreduce(sleeptime) - mapreduce(x -> initialize(sleeptime), hcat, 1:20) +function mapreduce_serial(n) + mapreduce(x -> initialize(x, n), hcat, 1:nworkers()) end ``` -We compare the performance of the serial and parallel evaluations using 20 cores on one node: +We compare the performance of the distributed for loop and the parallel mapreduce using `3` nodes with `28` cores on each node. We define a caller function first ```julia function compare_with_serial() # precompile - main_mapreduce(0) - main_pmapreduce(0) + mapreduce_serial(1) + mapreduce_parallel(nworkers()) # time - println("Tesing serial") - A = @time main_mapreduce(5e-6) - println("Tesing parallel") - B = @time main_pmapreduce(5e-6) + n = 2_000_000 + println("Tesing serial mapreduce") + A = @time mapreduce_serial(n) + println("Tesing pmapreduce") + B = @time mapreduce_parallel(n) # check results println("Results match : ", A == B) @@ -62,13 +67,31 @@ end ``` We run this caller on the cluster: -```julia -julia> compare_with_serial() -Tesing serial - 9.457601 seconds (40.14 k allocations: 1.934 MiB) -Tesing parallel - 0.894611 seconds (23.16 k allocations: 1.355 MiB, 2.56% compilation time) +```console +Tesing serial mapreduce + 23.986976 seconds (8.26 k allocations: 30.166 GiB, 11.71% gc time, 0.02% compilation time) +Tesing pmapreduce + 7.465366 seconds (29.55 k allocations: 764.166 MiB) Results match : true ``` +In this case the the overall gain is only around a factor of `3`. In general a parallel mapreduce is advantageous if the time required to evaluate the function far exceeds that required to communicate across workers. + +The time required for a `@distributed` `for` loop is unfortunately exceedingly high for it to be practical here. + The full script may be found in the examples directory. + +## Using [MPIClusterManagers.jl](https://github.com/JuliaParallel/MPIClusterManagers.jl) + +The same script may also be used by initiating an MPI cluster (the cluster in this case has 77 workers + 1 master process). This leads to the timings + +```console +Using MPI_TRANSPORT_ALL +Tesing serial mapreduce + 22.263389 seconds (8.07 k allocations: 29.793 GiB, 11.70% gc time, 0.02% compilation time) +Tesing pmapreduce + 11.374551 seconds (65.92 k allocations: 2.237 GiB, 0.46% gc time) +Results match : true +``` + +The performance is worse in this case than that obtained using `ClusterManagers.jl`. diff --git a/docs/src/examples/sharedarrays.md b/docs/src/examples/sharedarrays.md index c831325..21c3062 100644 --- a/docs/src/examples/sharedarrays.md +++ b/docs/src/examples/sharedarrays.md @@ -12,7 +12,7 @@ using SharedArrays using Distributed ``` -We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices`. +We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices` to split the load among workers. ```julia function initialize_localpart(s, sleeptime) diff --git a/docs/src/examples/threads.md b/docs/src/examples/threads.md index c7a5cf3..32a8812 100644 --- a/docs/src/examples/threads.md +++ b/docs/src/examples/threads.md @@ -13,7 +13,7 @@ We create a function to initailize the local part on each worker. In this case w ```julia function initializenode_threads(sleeptime) - s = zeros(Int, 2_000) + s = zeros(Int, 5_000) Threads.@threads for ind in eachindex(s) sleep(sleeptime) s[ind] = ind @@ -22,37 +22,21 @@ function initializenode_threads(sleeptime) end ``` -We create a main function that runs on the calling process and launches the array initialization task on each node. This is run on a `WorkerPool` consisting of one worker per node which acts as the root process. We may obtain such a pool through the function `ParallelUtilities.workerpool_nodes()`. The array creation step on each node is followed by an eventual concatenation. +We create a main function that runs on the calling process and launches the array initialization task on each node. The array creation step on each node is followed by an eventual concatenation. ```julia -function main_threads(sleeptime) - # obtain the workerpool with one process on each node - pool = ParallelUtilities.workerpool_nodes() - - # obtain the number of workers in the pool. - nw_nodes = nworkers(pool) - - # Evaluate the parallel mapreduce - pmapreduce(x -> initializenode_threads(sleeptime), hcat, pool, 1:nw_nodes) +function pmapreduce_threads(sleeptime) + pmapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers()) end ``` -We compare the results with a serial execution that uses a similar workflow, except we use `mapreduce` instead of `pmapreduce` and do not use threads. +We compare the results with +* a `mapreduce` that uses a similar workflow, except the operation takes place entirely on one node +* a `@distributed` mapreduce, where the evaluation is spread across nodes. ```julia -function initialize_serial(sleeptime) - s = zeros(Int, 2_000) - for ind in eachindex(s) - sleep(sleeptime) - s[ind] = ind - end - return s -end - -function main_serial(sleeptime) - pool = ParallelUtilities.workerpool_nodes() - nw_nodes = nworkers(pool) - mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes) +function mapreduce_threads(sleeptime) + mapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers()) end ``` @@ -61,28 +45,34 @@ We create a function to compare the performance of the two. We start with a prec ```julia function compare_with_serial() # precompile - main_serial(0) - main_threads(0) - + mapreduce_threads(0) + mapreduce_distributed_threads(0) + pmapreduce_threads(0) # time - println("Testing serial") - A = @time main_serial(5e-3); - println("Testing threads") - B = @time main_threads(5e-3); - - println("Results match : ", A == B) + sleeptime = 1e-2 + println("Testing threaded mapreduce") + A = @time mapreduce_threads(sleeptime); + println("Testing threaded+distributed mapreduce") + B = @time mapreduce_distributed_threads(sleeptime); + println("Testing threaded pmapreduce") + C = @time pmapreduce_threads(sleeptime); + + println("Results match : ", A == B == C) end ``` We run this script on a Slurm cluster across 2 nodes with 28 cores on each node. The results are: -```julia -julia> compare_with_serial() -Testing serial - 24.601593 seconds (22.49 k allocations: 808.266 KiB) -Testing threads - 0.666256 seconds (3.71 k allocations: 201.703 KiB) +```console +Testing threaded mapreduce + 4.161118 seconds (66.27 k allocations: 2.552 MiB, 0.95% compilation time) +Testing threaded+distributed mapreduce + 2.232924 seconds (48.64 k allocations: 2.745 MiB, 3.20% compilation time) +Testing threaded pmapreduce + 2.432104 seconds (6.79 k allocations: 463.788 KiB, 0.44% compilation time) Results match : true ``` -The full script may be found in the examples directory. +We see that there is little difference in evaluation times between the `@distributed` reduction and `pmapreduce`, both of which are roughly doubly faster than the one-node evaluation. + +The full script along with the Slurm jobscript may be found in the examples directory. diff --git a/examples/mpiclustermanager_mpitransport.jl b/examples/mpiclustermanager_mpitransport.jl new file mode 100644 index 0000000..2ce8b29 --- /dev/null +++ b/examples/mpiclustermanager_mpitransport.jl @@ -0,0 +1,12 @@ +using MPIClusterManagers +import MPI +using Distributed + +# This uses MPI to communicate with the workers +mgr = MPIClusterManagers.start_main_loop(MPI_TRANSPORT_ALL) + +@everywhere include(joinpath(@__DIR__, "pmapreduce.jl")) +println("Using MPI_TRANSPORT_ALL") +PMapReduceTiming.compare_with_serial() + +MPIClusterManagers.stop_main_loop(mgr) diff --git a/examples/mpijobscript.slurm b/examples/mpijobscript.slurm new file mode 100644 index 0000000..e081987 --- /dev/null +++ b/examples/mpijobscript.slurm @@ -0,0 +1,10 @@ +#!/bin/bash + +#SBATCH -n 78 +#SBATCH --job-name=mpitest +#SBATCH --time=00:05:00 +#SBATCH -e mpitest.err +#SBATCH -o mpitest.out + +juliaexe=$SCRATCH/julia/julia-1.7.0-rc2/bin/julia +mpirun $juliaexe --startup=no mpiclustermanager_mpitransport.jl diff --git a/examples/pmapreduce.jl b/examples/pmapreduce.jl index 2367648..e984ce2 100644 --- a/examples/pmapreduce.jl +++ b/examples/pmapreduce.jl @@ -3,33 +3,36 @@ module PMapReduceTiming using ParallelUtilities using Distributed -function initialize(sleeptime) - A = Array{Int}(undef, 20, 20) +function initialize(x, n) + inds = 1:n + d, r = divrem(length(inds), nworkers()) + ninds_local = d + (x <= r) + A = zeros(Int, 50, ninds_local) for ind in eachindex(A) - sleep(sleeptime) A[ind] = ind end return A end -function main_mapreduce(sleeptime) - mapreduce(x -> initialize(sleeptime), hcat, 1:20) +function mapreduce_serial(n) + mapreduce(x -> initialize(x, n), hcat, 1:nworkers()) end -function main_pmapreduce(sleeptime) - pmapreduce(x -> initialize(sleeptime), hcat, 1:20) +function mapreduce_parallel(n) + pmapreduce(x -> initialize(x, n), hcat, 1:nworkers()) end function compare_with_serial() # precompile - main_mapreduce(0) - main_pmapreduce(0) + mapreduce_serial(1) + mapreduce_parallel(nworkers()) # time - println("Tesing serial") - A = @time main_mapreduce(5e-6) - println("Tesing parallel") - B = @time main_pmapreduce(5e-6) + n = 2_000_000 + println("Tesing serial mapreduce") + A = @time mapreduce_serial(n) + println("Tesing pmapreduce") + B = @time mapreduce_parallel(n) # check results println("Results match : ", A == B) diff --git a/examples/pmapreducejobscript.jl b/examples/pmapreducejobscript.jl new file mode 100644 index 0000000..3caf17c --- /dev/null +++ b/examples/pmapreducejobscript.jl @@ -0,0 +1,6 @@ +using ClusterManagers +job_file_loc = mktempdir(@__DIR__) +addprocs_slurm(78, exeflags=["--startup=no"], job_file_loc = job_file_loc) +using Distributed +@everywhere include(joinpath(@__DIR__, "pmapreduce.jl")) +PMapReduceTiming.compare_with_serial() diff --git a/examples/pmapreducejobscript.slurm b/examples/pmapreducejobscript.slurm new file mode 100644 index 0000000..fd27e27 --- /dev/null +++ b/examples/pmapreducejobscript.slurm @@ -0,0 +1,10 @@ +#!/bin/bash + +#SBATCH -n 78 +#SBATCH --job-name=threadstest +#SBATCH --time=00:10:00 +#SBATCH -e pmapreducetest.err +#SBATCH -o pmapreducetest.out + +juliaexe=$SCRATCH/julia/julia-1.7.0-rc2/bin/julia +$juliaexe --startup=no pmapreducejobscript.jl diff --git a/examples/threads.jl b/examples/threads.jl index 4bbd316..a56deee 100644 --- a/examples/threads.jl +++ b/examples/threads.jl @@ -3,17 +3,8 @@ module ThreadsTiming using ParallelUtilities using Distributed -function initialize_serial(sleeptime) - s = zeros(Int, 2_000) - for ind in eachindex(s) - sleep(sleeptime) - s[ind] = ind - end - return s -end - function initializenode_threads(sleeptime) - s = zeros(Int, 2_000) + s = zeros(Int, 5_000) Threads.@threads for ind in eachindex(s) sleep(sleeptime) s[ind] = ind @@ -21,29 +12,35 @@ function initializenode_threads(sleeptime) return s end -function main_threads(sleeptime) - workers_node_pool = ParallelUtilities.workerpool_nodes() - nw_nodes = nworkers(workers_node_pool) - pmapreduce(x -> initializenode_threads(sleeptime), hcat, workers_node_pool, 1:nw_nodes) +function mapreduce_threads(sleeptime) + mapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers()) end -function main_serial(sleeptime) - workers_node_pool = ParallelUtilities.workerpool_nodes() - nw_nodes = nworkers(workers_node_pool) - mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes) +function mapreduce_distributed_threads(sleeptime) + @distributed hcat for _ in 1:nworkers() + initializenode_threads(sleeptime) + end +end + +function pmapreduce_threads(sleeptime) + pmapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers()) end function compare_with_serial() # precompile - main_serial(0) - main_threads(0) + mapreduce_threads(0) + mapreduce_distributed_threads(0) + pmapreduce_threads(0) # time - println("Testing serial") - A = @time main_serial(5e-3); - println("Testing threads") - B = @time main_threads(5e-3); - - println("Results match : ", A == B) + sleeptime = 1e-2 + println("Testing threaded mapreduce") + A = @time mapreduce_threads(sleeptime); + println("Testing threaded+distributed mapreduce") + B = @time mapreduce_distributed_threads(sleeptime); + println("Testing threaded pmapreduce") + C = @time pmapreduce_threads(sleeptime); + + println("Results match : ", A == B == C) end end diff --git a/examples/threadsjobscript.jl b/examples/threadsjobscript.jl new file mode 100644 index 0000000..cb80b62 --- /dev/null +++ b/examples/threadsjobscript.jl @@ -0,0 +1,6 @@ +using ClusterManagers +job_file_loc = mktempdir(@__DIR__) +addprocs_slurm(2, exeflags=["-t 28", "--startup=no"], job_file_loc = job_file_loc) +using Distributed +@everywhere include(joinpath(@__DIR__, "threads.jl")) +ThreadsTiming.compare_with_serial() diff --git a/examples/threadsjobscript.slurm b/examples/threadsjobscript.slurm new file mode 100644 index 0000000..1d0b55e --- /dev/null +++ b/examples/threadsjobscript.slurm @@ -0,0 +1,13 @@ +#!/bin/bash + +#SBATCH -n 2 +#SBATCH --nodes 2 +#SBATCH --ntasks-per-node 1 +#SBATCH --cpus-per-task 28 +#SBATCH --job-name=threadstest +#SBATCH --time=00:05:00 +#SBATCH -e threadstest.err +#SBATCH -o threadstest.out + +juliaexe=$SCRATCH/julia/julia-1.7.0-rc2/bin/julia +$juliaexe -t 28 --startup=no threadsjobscript.jl