-
Notifications
You must be signed in to change notification settings - Fork 25
/
hello_balance.R
62 lines (51 loc) · 2.31 KB
/
hello_balance.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
suppressMessages(library(pbdMPI))
## get node name
host = unlist(strsplit(system("hostname", intern = TRUE), split = "[.]"))[1]
rank = comm.rank()
size = comm.size()
mc.function = function(x) {
Sys.sleep(1) # replace with your function for mclapply cores here
Sys.getpid() # returns process id
}
## Compute how many cores per R session are on this node
#local_ranks_query = "echo $OMPI_COMM_WORLD_LOCAL_SIZE"
#ranks_on_my_node = as.numeric(system(local_ranks_query, intern = TRUE))
ranks_on_my_node = as.numeric(Sys.getenv("OMPI_COMM_WORLD_LOCAL_SIZE"))
cores_on_my_node = parallel::detectCores()
cores_per_R = floor(cores_on_my_node/ranks_on_my_node)
cores_total = allreduce(cores_per_R) # adds up over ranks
## Run mclapply on allocated cores to demonstrate fork pids
barrier()
mc_time = system.time({
my_mcpids = parallel::mclapply(1:cores_per_R, mc.function, mc.cores = cores_per_R)
barrier()
})
my_mcpids = do.call(paste, my_mcpids) # combines results from mclapply
## Run lapply this time with same function
l_time = system.time({
my_pids = lapply(1:cores_per_R, mc.function)
})
##
## Same cores are shared with OpenBLAS (see flexiblas package)
## or for other OpenMP enabled codes outside mclapply.
## If BLAS functions are called inside mclapply, they compete for the
## same cores: avoid or manage appropriately!!!
## Now report what happened and where
msg = paste0("Hello from rank ", rank, " on node ", host, " claiming ",
cores_per_R, " cores.", "(", ranks_on_my_node,
" Rs on ", cores_on_my_node, " cores).\n",
" pid: ", my_mcpids, "\n")
comm.cat(msg, quiet = TRUE, all.rank = TRUE)
comm.cat("Total R sessions:", size, "\n", quiet = TRUE)
comm.cat("Total cores:", cores_total, "\n", quiet = TRUE)
comm.cat("\nNotes: cores on node obtained by: detectCores {parallel}\n",
" ranks (R sessions) per node: OMPI_COMM_WORLD_LOCAL_SIZE\n",
" pid to core map changes frequently during mclapply\n",
quiet = TRUE)
comm.cat("\nTime lapply should be about", cores_per_R, "x time mclapply\n",
quiet = TRUE)
comm.cat(" mclapply time on each of the", size, "ranks:", mc_time[3], "\n",
quiet = TRUE)
comm.cat(" lapply time on each of the", size, "ranks:", l_time[3], "\n\n",
quiet = TRUE)
finalize()