Skip to content

Commit

Permalink
Merge branch 'pending_workers'
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Oct 10, 2023
2 parents f5e5faf + 1adc5af commit 5cc3840
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
6 changes: 5 additions & 1 deletion R/pool.r
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Pool = R6::R6Class("Pool",

add = function(qsys, n, ...) {
self$workers = qsys$new(addr=private$addr, master=private$master, n_jobs=n, ...)
private$master$add_pending_workers(n)
},

env = function(...) {
Expand Down Expand Up @@ -127,7 +128,10 @@ Pool = R6::R6Class("Pool",
),

active = list(
workers_total = function() self$workers$n(),
workers_total = function() {
ls_w = private$master$list_workers()
length(ls_w$worker) + ls_w$pending
},
workers_running = function() length(private$master$list_workers()$worker),
reusable = function() private$reuse
),
Expand Down
1 change: 1 addition & 0 deletions src/CMQMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ RCPP_MODULE(cmq_master) {
.method("add_env", &CMQMaster::add_env)
.method("add_pkg", &CMQMaster::add_pkg)
.method("list_env", &CMQMaster::list_env)
.method("add_pending_workers", &CMQMaster::add_pending_workers)
.method("list_workers", &CMQMaster::list_workers)
;
}
14 changes: 11 additions & 3 deletions src/CMQMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class CMQMaster {
}

SEXP recv(int timeout=-1) {
// if (peers.size() == 0)
// Rf_error("Trying to receive data without workers");
if (peers.size() + pending_workers <= 0)
Rf_error("Trying to receive data without workers");

int data_offset;
std::vector<zmq::message_t> msgs;
Expand Down Expand Up @@ -156,6 +156,10 @@ class CMQMaster {
Rcpp::_["size"] = Rcpp::wrap(sizes));
}

void add_pending_workers(int n) {
pending_workers += n;
}

Rcpp::List list_workers() {
std::vector<std::string> names;
names.reserve(peers.size());
Expand All @@ -172,7 +176,8 @@ class CMQMaster {
Rcpp::_["worker"] = Rcpp::wrap(names),
Rcpp::_["status"] = Rcpp::wrap(status),
Rcpp::_["time"] = wtime,
Rcpp::_["mem"] = mem
Rcpp::_["mem"] = mem,
Rcpp::_["pending"] = pending_workers
);
}

Expand All @@ -188,6 +193,7 @@ class CMQMaster {

zmq::context_t *ctx {nullptr};
int has_proxy {0};
int pending_workers {0};
zmq::socket_t sock;
std::string cur;
std::unordered_map<std::string, worker_t> peers;
Expand Down Expand Up @@ -238,7 +244,9 @@ class CMQMaster {
++cur_i;

cur = msgs[cur_i].to_string();
int prev_size = peers.size();
auto &w = peers[cur];
pending_workers -= peers.size() - prev_size;
w.call = R_NilValue;
if (cur_i == 1)
w.via = msgs[0].to_string();
Expand Down
7 changes: 7 additions & 0 deletions tests/testthat/test-2-worker.r
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ context("worker usage")
test_that("timeouts are triggered correctly", {
m = methods::new(CMQMaster)
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
expect_error(m$recv(0L))
m$close(0L)

Expand All @@ -15,6 +16,7 @@ test_that("worker evaluation", {
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 0L)

m$recv(0L)
Expand All @@ -33,6 +35,7 @@ test_that("export variable to worker", {
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 0L)

m$add_env("x", 3)
Expand All @@ -58,6 +61,7 @@ test_that("load package on worker", {
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 0L)

m$add_pkg("parallel")
Expand All @@ -80,6 +84,7 @@ test_that("errors are sent back to master", {
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 0L)

m$recv(0L)
Expand All @@ -100,6 +105,7 @@ test_that("worker R API", {

m = methods::new(CMQMaster)
addr = m$listen("tcp://127.0.0.1:*")
m$add_pending_workers(1L)
# addr = m$listen("inproc://endpoint") # mailbox.cpp assertion error

p = parallel::mcparallel(worker(addr))
Expand All @@ -120,6 +126,7 @@ test_that("communication with two workers", {

m = methods::new(CMQMaster)
addr = m$listen("tcp://127.0.0.1:*")
m$add_pending_workers(2L)
w1 = parallel::mcparallel(worker(addr))
w2 = parallel::mcparallel(worker(addr))

Expand Down

0 comments on commit 5cc3840

Please sign in to comment.