From 7bd8ecc7d46a5a5c391f77a446160e44ceae583c Mon Sep 17 00:00:00 2001 From: pedroripper Date: Tue, 26 Mar 2024 13:38:39 -0300 Subject: [PATCH] Add pending jobs --- src/controller.jl | 50 +++++++++++++---------------------------------- test/test1.jl | 17 +++++----------- test/test2.jl | 17 +++++----------- 3 files changed, 24 insertions(+), 60 deletions(-) diff --git a/src/controller.jl b/src/controller.jl index 547b717..d181bce 100644 --- a/src/controller.jl +++ b/src/controller.jl @@ -5,10 +5,9 @@ mutable struct Controller debug_mode::Bool worker_status::Vector{WorkerStatus} job_queue::Vector{Job} - jobs_sent::Int - jobs_received::Int + pending_jobs::Vector{JobRequest} function Controller(n_workers::Int; debug_mode::Bool = false) - return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Any[], 0, 0) + return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Vector{Job}(), Vector{JobRequest}()) end end @@ -17,27 +16,17 @@ struct TerminationMessage end _is_worker_available(controller::Controller, worker::Int) = controller.worker_status[worker] == WORKER_AVAILABLE +is_job_queue_empty(controller::Controller) = isempty(controller.job_queue) +any_pending_jobs(controller::Controller) = !isempty(controller.pending_jobs) + function _pick_job_to_send!(controller::Controller) - if _has_jobs_to_send(controller) + if !is_job_queue_empty(controller) return popfirst!(controller.job_queue) else error("Controller does not have any jobs to send.") end end -function _has_jobs_to_send(controller::Controller) - return !isempty(controller.job_queue) -end - -function _pick_available_worker(controller::Controller) - for i in 1:controller.n_workers - if _is_worker_available(controller, i) - return i - end - end - return error("No available workers. You should check with any_available_workers() first.") -end - function _pick_available_workers(controller::Controller) available_workers = [] for i in 1:controller.n_workers @@ -48,32 +37,21 @@ function _pick_available_workers(controller::Controller) return available_workers end -function _any_available_workers(controller::Controller) - for i in 1:controller.n_workers - if _is_worker_available(controller, i) - return true - end - end - return false -end - function add_job_to_queue!(controller::Controller, message::Any) return push!(controller.job_queue, Job(message)) end function send_jobs_to_any_available_workers(controller::Controller) available_workers = _pick_available_workers(controller) - requests = Vector{JobRequest}() for worker in available_workers - if _has_jobs_to_send(controller) + if !is_job_queue_empty(controller) job = _pick_job_to_send!(controller) - controller.jobs_sent += 1 controller.worker_status[worker] = WORKER_BUSY request = MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32) - push!(requests, JobRequest(worker, request)) + push!(controller.pending_jobs, JobRequest(worker, request)) end end - return requests + return nothing end function send_termination_message(controller::Controller) @@ -87,12 +65,12 @@ function send_termination_message(controller::Controller) end function check_for_workers_job(controller::Controller) - for worker in 1:controller.n_workers - has_job = MPI.Iprobe(_mpi_comm(); source = worker, tag = worker + 32) + for j_i in eachindex(controller.pending_jobs) + has_job = MPI.Iprobe(_mpi_comm(); source = controller.pending_jobs[j_i].worker, tag = controller.pending_jobs[j_i].worker + 32) if has_job - job = MPI.recv(_mpi_comm(); source = worker, tag = worker + 32) - controller.jobs_received += 1 - controller.worker_status[worker] = WORKER_AVAILABLE + job = MPI.recv(_mpi_comm(); source = controller.pending_jobs[j_i].worker, tag = controller.pending_jobs[j_i].worker + 32) + controller.worker_status[controller.pending_jobs[j_i].worker] = WORKER_AVAILABLE + deleteat!(controller.pending_jobs, j_i) return job end end diff --git a/test/test1.jl b/test/test1.jl index dce4e5e..e123abf 100644 --- a/test/test1.jl +++ b/test/test1.jl @@ -7,10 +7,7 @@ mutable struct Message vector_idx::Int end -has_messages_to_send(sent_messages, total_messages) = sent_messages < total_messages -has_messages_to_receive(delivered_messages, total_messages) = delivered_messages < total_messages -job_queue_done(sent_messages, delivered_messages, total_messages) = - sent_messages == total_messages && delivered_messages == total_messages +all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller) function sum_100(message::Message) message.value += 100 @@ -48,8 +45,6 @@ function job_queue(data) if JQM.is_controller_process() # I am root new_data = Array{T}(undef, N) - sent_messages = 0 - delivered_messages = 0 controller = Controller(JQM.num_workers()) @@ -58,17 +53,15 @@ function job_queue(data) JQM.add_job_to_queue!(controller, message) end - while !job_queue_done(sent_messages, delivered_messages, N) - if has_messages_to_send(sent_messages, N) - requests = JQM.send_jobs_to_any_available_workers(controller) - sent_messages += length(requests) + while !all_jobs_done(controller) + if !JQM.is_job_queue_empty(controller) + JQM.send_jobs_to_any_available_workers(controller) end - if has_messages_to_receive(delivered_messages, N) + if JQM.any_pending_jobs(controller) job_answer = JQM.check_for_workers_job(controller) if !isnothing(job_answer) message = JQM.get_message(job_answer) update_data(new_data, message) - delivered_messages += 1 end end end diff --git a/test/test2.jl b/test/test2.jl index b35b9f1..8f35412 100644 --- a/test/test2.jl +++ b/test/test2.jl @@ -12,10 +12,7 @@ mutable struct WorkerMessage vector_idx::Int end -has_messages_to_send(sent_messages, total_messages) = sent_messages < total_messages -has_messages_to_receive(delivered_messages, total_messages) = delivered_messages < total_messages -job_queue_done(sent_messages, delivered_messages, total_messages) = - sent_messages == total_messages && delivered_messages == total_messages +all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller) function get_divisors(message::ControllerMessage) number = message.value @@ -60,8 +57,6 @@ function divisors(data) if JQM.is_controller_process() # I am root new_data = Array{Array{Int}}(undef, N) - sent_messages = 0 - delivered_messages = 0 controller = Controller(JQM.num_workers()) @@ -70,17 +65,15 @@ function divisors(data) JQM.add_job_to_queue!(controller, message) end - while !job_queue_done(sent_messages, delivered_messages, N) - if has_messages_to_send(sent_messages, N) - requests = JQM.send_jobs_to_any_available_workers(controller) - sent_messages += length(requests) + while !all_jobs_done(controller) + if !JQM.is_job_queue_empty(controller) + JQM.send_jobs_to_any_available_workers(controller) end - if has_messages_to_receive(delivered_messages, N) + if JQM.any_pending_jobs(controller) job_answer = JQM.check_for_workers_job(controller) if !isnothing(job_answer) message = JQM.get_message(job_answer) update_data(new_data, message) - delivered_messages += 1 end end end