Skip to content

Commit

Permalink
Merge pull request #12 from psrenergy/gb/return-results-in-pmap
Browse files Browse the repository at this point in the history
Return results in all processes in pmap
  • Loading branch information
guilhermebodin authored Nov 11, 2024
2 parents dc7a9f7 + 59daa8d commit c6b8e99
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "JobQueueMPI"
uuid = "32d208e1-246e-420c-b6ff-18b71b410923"
authors = ["pedroripper <[email protected]>"]
version = "0.1.0"
version = "0.1.1"

[deps]
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
[codecov-img]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl/coverage.svg?branch=master
[codecov-url]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl?branch=master

| **Build Status** | **Coverage** |
|:-----------------:|:-----------------:|
| **Build Status** | **Coverage** | **Documentation** |
|:-----------------:|:-----------------:|:-----------------:|
| [![Build Status][build-img]][build-url] | [![Codecov branch][codecov-img]][codecov-url] |[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://psrenergy.github.io/JobQueueMPI.jl/dev/)


Expand Down
4 changes: 2 additions & 2 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ makedocs(;
authors = "psrenergy",
pages = [
"Home" => "index.md",
]
],
)

deploydocs(;
repo = "github.com/psrenergy/JobQueueMPI.jl.git",
push_preview = true,
)
)
94 changes: 94 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,100 @@ JobQueueMPI.jl has the following components:
- `Controller`: The controller is responsible for managing the jobs and the workers. It keeps track of the jobs that have been sent and received and sends the jobs to the available workers.
- `Worker`: The worker is responsible for executing the jobs. It receives the jobs from the controller, executes them, and sends the results back to the controller.

Users can call functions to compute jobs in parallel in two ways:
- Building a function and using a `pmap` implementation that will put the function in the job queue and send it to the workers.
```julia
using JobQueueMPI

function sum_100(value)
return value + 100
end

sum_100_answer = JobQueueMPI.pmap(sum_100, collect(1:10))
```
- Building the jobs and sending them to workers explicitly. There are examples of this structure in the test folder. This way is much more flexible than the first one, but it requires more code and knowledge about how MPI works.

```julia
using JobQueueMPI

mutable struct Message
value::Int
vector_idx::Int
end

all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller)

function sum_100(message::Message)
message.value += 100
return message
end

function update_data(new_data, message::Message)
idx = message.vector_idx
value = message.value
return new_data[idx] = value
end

function workers_loop()
if JQM.is_worker_process()
worker = JQM.Worker()
while true
job = JQM.receive_job(worker)
message = JQM.get_message(job)
if message == JQM.TerminationMessage()
break
end
return_message = sum_100(message)
JQM.send_job_answer_to_controller(worker, return_message)
end
exit(0)
end
end

function job_queue(data)
JQM.mpi_init()
JQM.mpi_barrier()

T = eltype(data)
N = length(data)

if JQM.is_controller_process()
new_data = Array{T}(undef, N)

controller = JQM.Controller(JQM.num_workers())

for i in eachindex(data)
message = Message(data[i], i)
JQM.add_job_to_queue!(controller, message)
end

while !all_jobs_done(controller)
if !JQM.is_job_queue_empty(controller)
JQM.send_jobs_to_any_available_workers(controller)
end
if JQM.any_pending_jobs(controller)
job_answer = JQM.check_for_job_answers(controller)
if !isnothing(job_answer)
message = JQM.get_message(job_answer)
update_data(new_data, message)
end
end
end

JQM.send_termination_message()

return new_data
end
workers_loop()
JQM.mpi_barrier()
JQM.mpi_finalize()
return nothing
end

data = collect(1:10)
new_data = job_queue(data)
```

## API

```@docs
Expand Down
26 changes: 23 additions & 3 deletions src/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ function _p_map_workers_loop(f, data_defined_in_process)
end

"""
pmap(f, jobs, data_defined_in_process = nothing)
pmap(
f::Function,
jobs::Vector,
data_defined_in_process = nothing;
return_result_in_all_processes::Bool = false
)
Parallel map function that works with MPI. If the function is called in parallel, it will
distribute the jobs to the workers and collect the results. If the function is called in
Expand All @@ -28,10 +33,17 @@ serial, it will just map the function to the jobs.
The function `f` should take one argument, which is the message to be processed. If `data_defined_in_process`
is not `nothing`, the function `f` should take two arguments, the first one being `data_defined_in_process`.
The `return_result_in_all_processes` argument is used to broadcast the result to all processes. If set to `true`.
The controller process will return the answer in the same order as the jobs were given. The workers will
return nothing.
"""
function pmap(f::Function, jobs::Vector, data_defined_in_process = nothing)
function pmap(
f::Function,
jobs::Vector,
data_defined_in_process = nothing;
return_result_in_all_processes::Bool = true,
)
result = Vector{Any}(undef, length(jobs))
if is_running_in_parallel()
mpi_barrier()
Expand All @@ -55,11 +67,19 @@ function pmap(f::Function, jobs::Vector, data_defined_in_process = nothing)
end
send_termination_message()
mpi_barrier()
if return_result_in_all_processes
result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD)
mpi_barrier()
end
return result
else
_p_map_workers_loop(f, data_defined_in_process)
mpi_barrier()
return nothing
if return_result_in_all_processes
result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD)
mpi_barrier()
end
return result
end
else
for (i, job) in enumerate(jobs)
Expand Down
10 changes: 4 additions & 6 deletions test/test_pmap_mpi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ end
sum_100_answer = JQM.pmap(sum_100, collect(1:10))
divisors_answer = JQM.pmap(get_divisors, collect(1:10))

if JQM.is_controller_process()
@testset "pmap MPI" begin
@test sum_100_answer == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
@test divisors_answer ==
[[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]]
end
@testset "pmap MPI" begin
@test sum_100_answer == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
@test divisors_answer ==
[[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]]
end

JQM.mpi_finalize()
9 changes: 4 additions & 5 deletions test/test_pmap_mpi_optim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@ JQM.mpi_init()
N = 5
data = collect(1:N)
function g(x, i, data)
return i * (x[i] - 2 * i) ^ 2 + data[i]
return i * (x[i] - 2 * i)^2 + data[i]
end
x0 = zeros(N)
list_i = collect(1:N)
fake_input = Int[] # ignored


let
is_done = false
if JQM.is_controller_process()
ret = optimize(x0, NelderMead()) do x
MPI.bcast(is_done, MPI.COMM_WORLD)
g_x = JQM.pmap((v)->g(v[1], v[2], data), [(x, i) for i in list_i])
g_x = JQM.pmap((v) -> g(v[1], v[2], data), [(x, i) for i in list_i])
return sum(g_x)
end
# tell workers to stop calling pmap
Expand All @@ -42,11 +41,11 @@ let
if is_done
break
end
JQM.pmap((v)->g(v[1], v[2], data), fake_input)
JQM.pmap((v) -> g(v[1], v[2], data), fake_input)
end
end
end

JQM.mpi_barrier()

JQM.mpi_finalize()
JQM.mpi_finalize()

2 comments on commit c6b8e99

@guilhermebodin
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/119199

Tip: Release Notes

Did you know you can add release notes too? Just add markdown formatted text underneath the comment after the text
"Release notes:" and it will be added to the registry PR, and if TagBot is installed it will also be added to the
release that TagBot creates. i.e.

@JuliaRegistrator register

Release notes:

## Breaking changes

- blah

To add them here just re-invoke and the PR will be updated.

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.1.1 -m "<description of version>" c6b8e991a893ad4e3ea85319d905afe5e19c37a3
git push origin v0.1.1

Please sign in to comment.