Skip to content

Commit

Permalink
track call references
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Dec 13, 2023
1 parent 80c3f9c commit 9e39305
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
2 changes: 1 addition & 1 deletion R/pool.r
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Pool = R6::R6Class("Pool",
send = function(cmd, ...) {
pcall = quote(substitute(cmd))
cmd = as.expression(do.call(substitute, list(eval(pcall), env=list(...))))
private$master$send(cmd)
invisible(private$master$send(cmd))
},
send_shutdown = function() {
private$master$send_shutdown()
Expand Down
7 changes: 6 additions & 1 deletion src/CMQMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class CMQMaster {
return msg2r(std::move(msgs[data_offset]), true);
}

void send(SEXP cmd) {
int send(SEXP cmd) {
if (peers.find(cur) == peers.end())
Rcpp::stop("Trying to send to worker that does not exist");
auto &w = peers[cur];
Expand Down Expand Up @@ -159,7 +159,9 @@ class CMQMaster {
mp.push_back(r2msg(Rcpp::wrap(proxy_add_env)));

w.call = cmd;
w.call_ref = ++call_counter;
mp.send(sock);
return w.call_ref;
}
void send_shutdown() {
if (peers.find(cur) == peers.end())
Expand Down Expand Up @@ -266,6 +268,7 @@ class CMQMaster {
return Rcpp::List::create(
Rcpp::_["worker"] = os.str(),
Rcpp::_["status"] = Rcpp::wrap(wlife_t2str(w.status)),
Rcpp::_["call_ref"] = w.call_ref,
Rcpp::_["calls"] = w.n_calls,
Rcpp::_["time"] = w.time,
Rcpp::_["mem"] = w.mem
Expand All @@ -281,11 +284,13 @@ class CMQMaster {
wlife_t status;
std::string via;
int n_calls {-1};
int call_ref {-1};
};

zmq::context_t *ctx {nullptr};
bool is_cleaned_up {false};
int pending_workers {0};
int call_counter {-1};
zmq::socket_t sock;
std::string cur;
std::unordered_map<std::string, worker_t> peers;
Expand Down
16 changes: 16 additions & 0 deletions tests/testthat/test-4-pool.r
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ test_that("calculations are really done on the worker", {
w$cleanup()
})

test_that("call references are matched properly", {
skip_on_os("windows")
w = workers(2, qsys_id="multicore")
expect_null(w$recv(5000L))

r1 = w$send({Sys.sleep(1); 1})
expect_null(w$recv(1000L))
r2 = w$send(2)
expect_equal(w$recv(500L), 2)
expect_equal(w$current()$call_ref, r2)
w$send_shutdown()
expect_equal(w$recv(2000L), 1)
expect_equal(w$current()$call_ref, r1)
w$cleanup()
})

test_that("multiprocess", {
skip("https://github.com/r-lib/processx/issues/236")

Expand Down
9 changes: 5 additions & 4 deletions vignettes/technicaldocs.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,21 @@ to all workers automatically in a greedy fashion.
### Main event loop

Putting the above together in an event loop, we get what is essentially
implemented in `master`. Note that your `expression` to evaluate needs to track
its origin (this may change in the future).
implemented in `master`. `w$send` invisibly returns an identifier to track
which call was submitted, and `w$current()` matches the same to `w$recv()`.

```{r eval=FALSE}
w = workers(3)
on.exit(w$cleanup())
w$env(...)
while (we have new work to send || jobs pending) {
res = w$recv()
res = w$recv() # the result of the call, or NULL for a new worker
w$current()$call_ref # matches answer to request, -1 otherwise
# handle result
if (more work)
w$send(expression, ...)
call_ref = w$send(expression, ...) # call_ref tracks request identity
else
w$send_shutdown()
}
Expand Down

0 comments on commit 9e39305

Please sign in to comment.