Skip to content

Commit

Permalink
Add pending jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroripper committed Mar 26, 2024
1 parent 92cbfda commit 7bd8ecc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 60 deletions.
50 changes: 14 additions & 36 deletions src/controller.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ mutable struct Controller
debug_mode::Bool
worker_status::Vector{WorkerStatus}
job_queue::Vector{Job}
jobs_sent::Int
jobs_received::Int
pending_jobs::Vector{JobRequest}
function Controller(n_workers::Int; debug_mode::Bool = false)
return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Any[], 0, 0)
return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Vector{Job}(), Vector{JobRequest}())
end
end

Expand All @@ -17,27 +16,17 @@ struct TerminationMessage end
_is_worker_available(controller::Controller, worker::Int) =
controller.worker_status[worker] == WORKER_AVAILABLE

is_job_queue_empty(controller::Controller) = isempty(controller.job_queue)
any_pending_jobs(controller::Controller) = !isempty(controller.pending_jobs)

function _pick_job_to_send!(controller::Controller)
if _has_jobs_to_send(controller)
if !is_job_queue_empty(controller)
return popfirst!(controller.job_queue)
else
error("Controller does not have any jobs to send.")
end
end

function _has_jobs_to_send(controller::Controller)
return !isempty(controller.job_queue)
end

function _pick_available_worker(controller::Controller)
for i in 1:controller.n_workers
if _is_worker_available(controller, i)
return i
end
end
return error("No available workers. You should check with any_available_workers() first.")
end

function _pick_available_workers(controller::Controller)
available_workers = []
for i in 1:controller.n_workers
Expand All @@ -48,32 +37,21 @@ function _pick_available_workers(controller::Controller)
return available_workers
end

function _any_available_workers(controller::Controller)
for i in 1:controller.n_workers
if _is_worker_available(controller, i)
return true
end
end
return false
end

function add_job_to_queue!(controller::Controller, message::Any)
return push!(controller.job_queue, Job(message))
end

function send_jobs_to_any_available_workers(controller::Controller)
available_workers = _pick_available_workers(controller)
requests = Vector{JobRequest}()
for worker in available_workers
if _has_jobs_to_send(controller)
if !is_job_queue_empty(controller)
job = _pick_job_to_send!(controller)
controller.jobs_sent += 1
controller.worker_status[worker] = WORKER_BUSY
request = MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32)
push!(requests, JobRequest(worker, request))
push!(controller.pending_jobs, JobRequest(worker, request))
end
end
return requests
return nothing
end

function send_termination_message(controller::Controller)
Expand All @@ -87,12 +65,12 @@ function send_termination_message(controller::Controller)
end

function check_for_workers_job(controller::Controller)
for worker in 1:controller.n_workers
has_job = MPI.Iprobe(_mpi_comm(); source = worker, tag = worker + 32)
for j_i in eachindex(controller.pending_jobs)
has_job = MPI.Iprobe(_mpi_comm(); source = controller.pending_jobs[j_i].worker, tag = controller.pending_jobs[j_i].worker + 32)
if has_job
job = MPI.recv(_mpi_comm(); source = worker, tag = worker + 32)
controller.jobs_received += 1
controller.worker_status[worker] = WORKER_AVAILABLE
job = MPI.recv(_mpi_comm(); source = controller.pending_jobs[j_i].worker, tag = controller.pending_jobs[j_i].worker + 32)
controller.worker_status[controller.pending_jobs[j_i].worker] = WORKER_AVAILABLE
deleteat!(controller.pending_jobs, j_i)
return job
end
end
Expand Down
17 changes: 5 additions & 12 deletions test/test1.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ mutable struct Message
vector_idx::Int
end

has_messages_to_send(sent_messages, total_messages) = sent_messages < total_messages
has_messages_to_receive(delivered_messages, total_messages) = delivered_messages < total_messages
job_queue_done(sent_messages, delivered_messages, total_messages) =
sent_messages == total_messages && delivered_messages == total_messages
all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller)

function sum_100(message::Message)
message.value += 100
Expand Down Expand Up @@ -48,8 +45,6 @@ function job_queue(data)

if JQM.is_controller_process() # I am root
new_data = Array{T}(undef, N)
sent_messages = 0
delivered_messages = 0

controller = Controller(JQM.num_workers())

Expand All @@ -58,17 +53,15 @@ function job_queue(data)
JQM.add_job_to_queue!(controller, message)
end

while !job_queue_done(sent_messages, delivered_messages, N)
if has_messages_to_send(sent_messages, N)
requests = JQM.send_jobs_to_any_available_workers(controller)
sent_messages += length(requests)
while !all_jobs_done(controller)
if !JQM.is_job_queue_empty(controller)
JQM.send_jobs_to_any_available_workers(controller)
end
if has_messages_to_receive(delivered_messages, N)
if JQM.any_pending_jobs(controller)
job_answer = JQM.check_for_workers_job(controller)
if !isnothing(job_answer)
message = JQM.get_message(job_answer)
update_data(new_data, message)
delivered_messages += 1
end
end
end
Expand Down
17 changes: 5 additions & 12 deletions test/test2.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ mutable struct WorkerMessage
vector_idx::Int
end

has_messages_to_send(sent_messages, total_messages) = sent_messages < total_messages
has_messages_to_receive(delivered_messages, total_messages) = delivered_messages < total_messages
job_queue_done(sent_messages, delivered_messages, total_messages) =
sent_messages == total_messages && delivered_messages == total_messages
all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller)

function get_divisors(message::ControllerMessage)
number = message.value
Expand Down Expand Up @@ -60,8 +57,6 @@ function divisors(data)

if JQM.is_controller_process() # I am root
new_data = Array{Array{Int}}(undef, N)
sent_messages = 0
delivered_messages = 0

controller = Controller(JQM.num_workers())

Expand All @@ -70,17 +65,15 @@ function divisors(data)
JQM.add_job_to_queue!(controller, message)
end

while !job_queue_done(sent_messages, delivered_messages, N)
if has_messages_to_send(sent_messages, N)
requests = JQM.send_jobs_to_any_available_workers(controller)
sent_messages += length(requests)
while !all_jobs_done(controller)
if !JQM.is_job_queue_empty(controller)
JQM.send_jobs_to_any_available_workers(controller)
end
if has_messages_to_receive(delivered_messages, N)
if JQM.any_pending_jobs(controller)
job_answer = JQM.check_for_workers_job(controller)
if !isnothing(job_answer)
message = JQM.get_message(job_answer)
update_data(new_data, message)
delivered_messages += 1
end
end
end
Expand Down

0 comments on commit 7bd8ecc

Please sign in to comment.