Skip to content

Commit

Permalink
add memory reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Sep 10, 2023
1 parent fedb138 commit c86745a
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 12 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: clustermq
Title: Evaluate Function Calls on HPC Schedulers (LSF, SGE, SLURM, PBS/Torque)
Version: 0.8.917
Version: 0.8.918
Authors@R: c(
person('Michael', 'Schubert', email='[email protected]',
role = c('aut', 'cre', 'cph'),
Expand Down
14 changes: 6 additions & 8 deletions R/pool.r
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ Pool = R6::R6Class("Pool",
list = function() {
info = private$master$list_workers()
times = do.call(rbind, info$time)[,1:3]
data.frame(worker=info$worker, status=info$status, times)
mem = function(field) sapply(info$mem, function(m) sum(m[,field] * c(56,1)))
data.frame(worker=info$worker, status=info$status, times,
mem.used=mem("used"), mem.max=mem("max used"))
},

add = function(qsys, n, ...) {
Expand Down Expand Up @@ -100,12 +102,8 @@ Pool = R6::R6Class("Pool",

times = private$master$list_workers()$time
times = times[sapply(times, length) != 0]
# max_mem = Reduce(max, lapply(private$worker_stats, function(w) w$mem))
max_mb = NA_character_
# if (length(max_mem) == 1) {
# class(max_mem) = "object_size"
# max_mb = format(max_mem + 2e8, units="auto") # ~ 200 Mb overhead
# }
max_mem = max(c(self$list()[["mem.max"]]+2e8, 0), na.rm=TRUE) # add 200 Mb
max_mem_str = format(structure(max_mem, class="object_size"), units="auto")

wt = Reduce(`+`, times) / length(times)
rt = proc.time() - private$timer
Expand All @@ -116,7 +114,7 @@ Pool = R6::R6Class("Pool",

fmt = "Master: [%s %.1f%% CPU]; Worker: [avg %.1f%% CPU, max %s]"
message(sprintf(fmt, rt3_str, 100*(rt[[1]]+rt[[2]])/rt[[3]],
100*(wt[[1]]+wt[[2]])/wt[[3]], max_mb))
100*(wt[[1]]+wt[[2]])/wt[[3]], max_mem_str))

invisible(TRUE)
},
Expand Down
8 changes: 6 additions & 2 deletions src/CMQMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,18 @@ class CMQMaster {
names.reserve(peers.size());
std::vector<int> status;
status.reserve(peers.size());
Rcpp::List wtime;
Rcpp::List wtime, mem;
for (const auto &kv: peers) {
names.push_back(kv.first);
status.push_back(kv.second.status);
wtime.push_back(kv.second.time);
mem.push_back(kv.second.mem);
}
return Rcpp::List::create(
Rcpp::_["worker"] = Rcpp::wrap(names),
Rcpp::_["status"] = Rcpp::wrap(status),
Rcpp::_["time"] = wtime
Rcpp::_["time"] = wtime,
Rcpp::_["mem"] = mem
);
}

Expand All @@ -179,6 +181,7 @@ class CMQMaster {
std::set<std::string> env;
SEXP call {R_NilValue};
SEXP time {Rcpp::List()};
SEXP mem {Rcpp::List()};
wlife_t status;
std::string via;
};
Expand Down Expand Up @@ -265,6 +268,7 @@ class CMQMaster {
}

w.time = msg2r(msgs[++cur_i], true);
w.mem = msg2r(msgs[++cur_i], true);
return ++cur_i;
}
};
4 changes: 3 additions & 1 deletion src/CMQProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class CMQProxy {
void proxy_request_cmd() {
to_master.send(zmq::message_t(0), zmq::send_flags::sndmore);
to_master.send(int2msg(wlife_t::proxy_cmd), zmq::send_flags::sndmore);
to_master.send(r2msg(proc_time()), zmq::send_flags::none);
to_master.send(r2msg(proc_time()), zmq::send_flags::sndmore);
to_master.send(r2msg(gc()), zmq::send_flags::none);
}
SEXP proxy_receive_cmd() {
std::vector<zmq::message_t> msgs;
Expand Down Expand Up @@ -150,6 +151,7 @@ class CMQProxy {

private:
Rcpp::Function proc_time {"proc.time"};
Rcpp::Function gc {"gc"};
bool external_context {true};
zmq::context_t *ctx {nullptr};
zmq::socket_t to_master;
Expand Down
3 changes: 3 additions & 0 deletions src/CMQWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class CMQWorker {
check_send_ready(timeout);
sock.send(int2msg(wlife_t::active), zmq::send_flags::sndmore);
sock.send(r2msg(proc_time()), zmq::send_flags::sndmore);
sock.send(r2msg(gc()), zmq::send_flags::sndmore);
sock.send(r2msg(R_NilValue), zmq::send_flags::none);
} catch (zmq::error_t const &e) {
Rf_error(e.what());
Expand Down Expand Up @@ -94,6 +95,7 @@ class CMQWorker {
}
sock.send(int2msg(wlife_t::active), zmq::send_flags::sndmore);
sock.send(r2msg(proc_time()), zmq::send_flags::sndmore);
sock.send(r2msg(gc()), zmq::send_flags::sndmore);
sock.send(r2msg(eval), zmq::send_flags::none);
UNPROTECT(1);
return true;
Expand All @@ -107,6 +109,7 @@ class CMQWorker {
Rcpp::Environment env {1};
Rcpp::Function load_pkg {"library"};
Rcpp::Function proc_time {"proc.time"};
Rcpp::Function gc {"gc"};

void check_send_ready(int timeout=5000) {
auto pitems = std::vector<zmq::pollitem_t>(1);
Expand Down

0 comments on commit c86745a

Please sign in to comment.