-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathrunParallel.r
116 lines (112 loc) · 5 KB
/
runParallel.r
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
## Given a function 'onecore' that runs the needed set of simulations on
## one CPU core, and given a total number of repetitions 'reps', determines
## the number of available cores and by default uses one less than that.
## By default the number of cores is one less than the number available
## on your machine.
## reps is divided as evenly as possible over these cores, and batches
## are run on the cores using the 'parallel' package 'mclapply' function.
## The current per-core repetition number is continually updated in
## your system's temporary directory (/tmp for Linux, TEMP for Windows)
## in a file name progressX.log where X is the core number.
## The random number seed is set for each core and is equal to
## the scalar 'seed' - core number + 1. The default seed is a random
## number between 0 and 10000 but it's best if the user provides the
## seed so the simulation is reproducible.
## The total run time is computed and printed
## onefile must create a named list of all the results created during
## that one simulation batch. Elements of this list must be data frames,
## vectors, matrices, or arrays. Upon completion of all batches,
## all the results are rbind'd and saved in a single list.
##
## onecore must have an argument 'reps' that will tell the function
## how many simulations to run for one batch, another argument 'showprogress'
## which is a function to be called inside onecore to write to the
## progress file for the current core and repetition, and an argument 'core'
## which informs 'onecore' which sequential core number (batch number) it is
## processing.
## When calling 'showprogress' inside 'onecore', the arguments, in order,
## must be the integer value of the repetition to be noted, the number of reps,
## 'core', an optional 4th argument 'other' that can contain a single
## character string to add to the output, and an optional 5th argument 'pr'.
## You can set 'pr=FALSE' to suppress printing and have 'showprogress'
## return the file name for holding progress information if you want to
## customize printing.
##
## If any of the objects appearing as list elements produced by onecore
## are multi-dimensional arrays, you must specify an integer value fo
## 'along'. This specifies to the 'abind' package 'abind' function
## the dimension along which to bind the arrays. For example, if the
## first dimension of the array corresponding to repetitions, you would
## specify along=1. All arrays present must use the same 'along' unless
## 'along' is a named vector and the names match elements of the
## simulation result object.
## Set `simplify=FALSE` if you don't want the result simplified if
## onecore produces only one list element. The default returns the
## first (and only) list element rather than the list if there is only one
## element.
##
## Usage:
## require(Hmisc)
## getRs('runParallel.r')
runParallel <- function(onecore, reps, seed=round(runif(1, 0, 10000)),
cores=max(1, parallel::detectCores() - 1),
simplify=TRUE, along) {
progressDir <- paste0(dirname(tempdir()))
stime <- Sys.time()
require(parallel)
## Function to divide n things as evenly as possible into m groups
## See https://math.stackexchange.com/questions/199690
evenly <- function(n, m) {
a <- floor(n / m)
r <- n %% m
w <- c(rep(a + 1, r), rep(a, m - r))
if(sum(w) != n) stop('program logic error')
w
}
repsc <- evenly(reps, cores)
showprogress <- function(i=0, reps=0, core, other='', pr=TRUE) {
file <- paste0(progressDir, '/progress', core, '.log')
if(other != '') other <- paste0(other, ' ')
if(pr) cat(other, i, ' of ', reps, '\n', sep='', file=file)
invisible(file)
}
ff <- function(i) {
set.seed(seed + i - 1)
onecore(reps=repsc[i], showprogress=showprogress, core=i)
}
v <- mclapply(1 : cores, ff, mc.cores=cores, mc.set.seed=FALSE)
v1 <- v[[1]]
ite <- sapply(v, function(z) inherits(z, 'try-error'))
if(any(ite)) {
z <- sapply(v, function(x) {
x <- as.character(attr(x, 'condition'))
if(length(x)) x else '' })
stop(paste(z, collapse=';'))
}
etime <- Sys.time()
cat('\nRun time:', format(etime - stime), 'using', cores, 'cores\n')
## Separately for each element of each list in w, stack the results so
## the use can treat them as if from a single run
m <- length(v1) # number of elements in a per-core list
R <- vector('list', m)
names(R) <- names(v1)
u <- function(j) {
x <- lapply(v, function(x) x[[j]])
z <- x[[1]]
if(is.matrix(z)) x <- do.call('rbind', x)
else
if(is.list(z)) x <- data.table::rbindlist(x)
else if(is.array(z)) {
require(abind)
al <- if(length(along) == 1) along else along[names(v1)[j]]
x <- do.call('abind', list(x, along=al))
}
else if(! is.atomic(z))
stop(paste('list element', j,
' of result returned by onecore is not data.frame, matrix, array, or vector'))
else x <- unlist(x) # vectors
x
}
for(j in 1: m) R[[j]] <- u(j)
if(simplify && m == 1) R[[1]] else R
}