Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

17 eliminate openmp dependency and use tbb #18

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import(RcppProgress)
import(purrr)
importFrom(Rcpp,sourceCpp)
importFrom(RcppParallel,RcppParallelLibs)
importFrom(RcppParallel,setThreadOptions)
importFrom(ggplot2,aes)
importFrom(ggplot2,coord_cartesian)
importFrom(ggplot2,geom_point)
Expand Down
12 changes: 5 additions & 7 deletions R/mcmc.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#' Sample from the target distribution using MCMC
#'
#' @importFrom RcppParallel setThreadOptions
#'
#' @export
#'
#' @param data Data to be used in MCMC, as generated by the `load_*_data` functions
Expand Down Expand Up @@ -38,19 +40,15 @@
#' the latent genotypes at each step of the MCMC. WARNING: This will increase
#' the size of the output object significantly.
#' @param num_chains Total number of chains to run, possibly simultaneously
#' @param num_cores Total OMP parallel threads to use to run chains.
#' num_cores * pt_num_threads should not exceed the number of cores available
#' @param num_cores Total parallel cores to use to run algorithm.
#' num_cores * num_chains should not exceed the number of cores available
#' on your system.
#' @param pt_chains Total number of chains to run with parallel tempering or a
#' vector containing the temperatures that should be used for parallel tempering.
#' @param pt_grad Power to raise parallel tempering chains to. A value of 1
#' results in evenly distributed temperatures between \[0,1\], below 1 will bias
#' towards 1 and above 1 will bias towards 0. Only used if pt_chains is a
#' single value (i.e. not a vector).
#' @param pt_num_threads Total number of OMP parallel threads to be used to
#' process parallel tempered chains
#' num_cores * pt_num_threads should not exceed the number of cores available
#' on your system.
#' @param adapt_temp Logical indicating whether or not to adapt the parallel
#' tempering temperatures. If TRUE, the temperatures will be adapted during the
#' `burnin` period, starting after `pre_adapt_steps` steps. The adaptation will
Expand Down Expand Up @@ -87,11 +85,11 @@ run_mcmc <-
num_cores = 1,
pt_chains = 1,
pt_grad = 1,
pt_num_threads = 1,
adapt_temp = TRUE,
pre_adapt_steps = 25,
temp_adapt_steps = 25,
max_initialization_tries = 10000) {
RcppParallel::setThreadOptions(numThreads = num_cores)
start_time <- Sys.time()
args <- as.list(environment())
mcmc_args <- as.list(environment())
Expand Down
4 changes: 2 additions & 2 deletions data-raw/mcmc_results.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ simulated_data <- moire::simulate_data(
burnin <- 1e3
num_samples <- 1e3
pt_chains <- seq(1, 0, length.out = 80)
pt_num_threads <- 20 # number of threads to use for parallel tempering
num_cores <- parallelly::availableCores() - 1 # number of threads to use for parallel processing

mcmc_results <- moire::run_mcmc(
simulated_data,
verbose = TRUE, burnin = burnin, samples_per_chain = num_samples,
pt_chains = pt_chains, pt_num_threads = pt_num_threads,
pt_chains = pt_chains, num_cores = num_cores,
adapt_temp = TRUE
)

Expand Down
Binary file modified data/mcmc_results.rda
Binary file not shown.
Binary file modified data/simulated_data.rda
Binary file not shown.
10 changes: 2 additions & 8 deletions man/run_mcmc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions src/Makevars
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,4 @@ ifdef ENABLE_PROFILER
PKG_CXXFLAGS+=-DENABLE_PROFILER
endif

PKG_CXXFLAGS+=$(SHLIB_OPENMP_CXXFLAGS)
PKG_LIBS+=$(SHLIB_OPENMP_CXXFLAGS)
PKG_LIBS+=$(shell ${R_HOME}/bin/Rscript -e "RcppParallel::RcppParallelLibs()")
5 changes: 0 additions & 5 deletions src/chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,6 @@ float Chain::calc_new_likelihood()
genotyping_llik_new.end(), 0.0f);
#else
float sum = 0.0f;
#pragma omp simd reduction(+:sum)
for (int i = 0; i < genotyping_llik_new.size(); ++i) {
sum += genotyping_llik_new[i];
}
Expand All @@ -991,19 +990,15 @@ float Chain::calc_new_prior()
mean_coi_hyper_prior_new;
#else
float sum = 0.0f;
#pragma omp simd reduction(+:sum)
for (int i = 0; i < eps_neg_prior_new.size(); ++i) {
sum += eps_neg_prior_new[i];
}
#pragma omp simd reduction(+:sum)
for (int i = 0; i < eps_pos_prior_new.size(); ++i) {
sum += eps_pos_prior_new[i];
}
#pragma omp simd reduction(+:sum)
for (int i = 0; i < relatedness_prior_new.size(); ++i) {
sum += relatedness_prior_new[i];
}
#pragma omp simd reduction(+:sum)
for (int i = 0; i < coi_prior_new.size(); ++i) {
sum += coi_prior_new[i];
}
Expand Down
139 changes: 66 additions & 73 deletions src/mcmc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,7 @@

#include <Rcpp.h>

#ifdef _OPENMP
#include <omp.h>
#else
#define omp_get_num_threads() 1
#define omp_get_thread_num() 0
#define omp_get_max_threads() 1
#define omp_get_thread_limit() 1
#define omp_get_num_procs() 1
#define omp_set_nested(a)
#define omp_set_num_threads(a)
#define omp_get_wtime() 0
#endif
#include <tbb/parallel_for.h>

MCMC::MCMC(GenotypingData genotyping_data, Parameters params)
: genotyping_data(genotyping_data), params(params)
Expand All @@ -35,34 +24,33 @@ MCMC::MCMC(GenotypingData genotyping_data, Parameters params)
swap_barriers.resize(params.pt_chains.size() - 1, 0.0);
swap_indices.resize(params.pt_chains.size(), 0);

omp_set_num_threads(params.pt_num_threads);


chains.resize(params.pt_chains.size());
std::vector<bool> any_ill_conditioned(params.pt_chains.size(), false);
temp_gradient = params.pt_chains;
#pragma omp parallel for
for (size_t i = 0; i < params.pt_chains.size(); i++)
tbb::parallel_for(tbb::blocked_range<size_t>(0, params.pt_chains.size()), [&](const tbb::blocked_range<size_t>& range)
{
if (std::any_of(any_ill_conditioned.begin(), any_ill_conditioned.end(), [](bool v) { return v; }))
for (size_t i = range.begin(); i < range.end(); ++i)
{
continue;
}
if (std::any_of(any_ill_conditioned.begin(), any_ill_conditioned.end(), [](bool v) { return v; }))
{
continue;
}
float temp = params.pt_chains[i];
chains[i] = Chain(genotyping_data, params, temp);

bool ill_conditioned = std::isnan(chains[i].get_llik());
int max_tries = params.max_initialization_tries;

while (ill_conditioned and max_tries > 0)
{
chains[i].initialize_parameters();
max_tries--;
ill_conditioned = std::isnan(chains[i].get_llik());
}
while (ill_conditioned and max_tries > 0)
{
chains[i].initialize_parameters();
max_tries--;
ill_conditioned = std::isnan(chains[i].get_llik());
}

any_ill_conditioned[i] = ill_conditioned;
}
any_ill_conditioned[i] = ill_conditioned;
}
});

if (std::any_of(any_ill_conditioned.begin(), any_ill_conditioned.end(), [](bool v) { return v; }))
{
Expand All @@ -74,31 +62,32 @@ MCMC::MCMC(GenotypingData genotyping_data, Parameters params)

void MCMC::burnin(int step)
{
#pragma omp parallel for
for (auto &chain : chains)
tbb::parallel_for(tbb::blocked_range<size_t>(0, chains.size()), [&](const tbb::blocked_range<size_t>& range)
{
chain.update_eps_neg(step);
chain.update_eps_pos(step);
chain.update_p(step);
chain.update_m(step);
if (params.allow_relatedness)
for (size_t i = range.begin(); i < range.end(); ++i)
{
chain.update_r(step);
// chain.update_m_r(step);
chain.update_eff_coi(step);
auto& chain = chains[i];
chain.update_eps_neg(step);
chain.update_eps_pos(step);
chain.update_p(step);
chain.update_m(step);
if (params.allow_relatedness)
{
chain.update_r(step);
chain.update_eff_coi(step);
}
chain.update_samples(step);
chain.update_mean_coi(step);
}
chain.update_samples(step);
chain.update_mean_coi(step);
}
});
llik_burnin.push_back(get_llik());
prior_burnin.push_back(get_prior());
posterior_burnin.push_back(get_posterior());

if (chains.size() > 1)
{
swap_chains(step, true);
if (num_swaps % params.temp_adapt_steps == 0 and
step > params.pre_adapt_steps and params.adapt_temp)
if (num_swaps % params.temp_adapt_steps == 0 and step > params.pre_adapt_steps and params.adapt_temp)
{
adapt_temp();
}
Expand Down Expand Up @@ -223,47 +212,51 @@ void MCMC::adapt_temp()

void MCMC::sample(int step)
{
#pragma omp parallel for
for (auto &chain : chains)
tbb::parallel_for(tbb::blocked_range<size_t>(0, chains.size()), [&](const tbb::blocked_range<size_t>& range)
{
chain.update_eps_neg(params.burnin + step);
chain.update_eps_pos(params.burnin + step);
chain.update_p(params.burnin + step);
chain.update_m(params.burnin + step);

if (params.allow_relatedness)
for (size_t i = range.begin(); i < range.end(); ++i)
{
chain.update_r(params.burnin + step);
chain.update_eff_coi(params.burnin + step);
}
chain.update_samples(params.burnin + step);
chain.update_mean_coi(params.burnin + step);
auto& chain = chains[i];
chain.update_eps_neg(params.burnin + step);
chain.update_eps_pos(params.burnin + step);
chain.update_p(params.burnin + step);
chain.update_m(params.burnin + step);

if ((params.thin == 0 or step % params.thin == 0) and
(chain.get_hot() or chains.size() == 1))
{
for (size_t ii = 0; ii < genotyping_data.num_loci; ++ii)
if (params.allow_relatedness)
{
p_store[ii].push_back(chain.p[ii]);
chain.update_r(params.burnin + step);
chain.update_eff_coi(params.burnin + step);
}
chain.update_samples(params.burnin + step);
chain.update_mean_coi(params.burnin + step);

for (size_t jj = 0; jj < genotyping_data.num_samples; ++jj)
if ((params.thin == 0 || step % params.thin == 0) &&
(chain.get_hot() || chains.size() == 1))
{
m_store[jj].push_back(chain.m[jj]);
eps_neg_store[jj].push_back(chain.eps_neg[jj]);
eps_pos_store[jj].push_back(chain.eps_pos[jj]);
r_store[jj].push_back(chain.r[jj]);

if (params.record_latent_genotypes) {
for (size_t kk = 0; kk < genotyping_data.num_loci; ++kk)
{
latent_genotypes_store[jj][kk].push_back(chain.latent_genotypes_new[kk][jj]);
for (size_t ii = 0; ii < genotyping_data.num_loci; ++ii)
{
p_store[ii].push_back(chain.p[ii]);
}

for (size_t jj = 0; jj < genotyping_data.num_samples; ++jj)
{
m_store[jj].push_back(chain.m[jj]);
eps_neg_store[jj].push_back(chain.eps_neg[jj]);
eps_pos_store[jj].push_back(chain.eps_pos[jj]);
r_store[jj].push_back(chain.r[jj]);

if (params.record_latent_genotypes) {
for (size_t kk = 0; kk < genotyping_data.num_loci; ++kk)
{
latent_genotypes_store[jj][kk].push_back(chain.latent_genotypes_new[kk][jj]);
}
}
}
mean_coi_store.push_back(chain.mean_coi);
}
mean_coi_store.push_back(chain.mean_coi);
}
}
});

llik_sample.push_back(get_llik());
prior_sample.push_back(get_prior());
posterior_sample.push_back(get_posterior());
Expand Down
1 change: 0 additions & 1 deletion src/mcmc_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ inline float logSumExp(const std::vector<float> &x)
}

float sum = 0;
#pragma omp simd reduction(+ : sum)
for (size_t i = 0; i < x.size(); ++i)
{
sum += std::exp(x[i] - max_el);
Expand Down
1 change: 0 additions & 1 deletion src/parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Parameters::Parameters(const Rcpp::List &args)
burnin = UtilFunctions::r_to_int(args["burnin"]);
samples = UtilFunctions::r_to_int(args["samples"]);
pt_chains = UtilFunctions::r_to_vector_float(args["pt_chains"]);
pt_num_threads = UtilFunctions::r_to_int(args["pt_num_threads"]);
adapt_temp = UtilFunctions::r_to_bool(args["adapt_temp"]);
pre_adapt_steps = UtilFunctions::r_to_int(args["pre_adapt_steps"]);
temp_adapt_steps = UtilFunctions::r_to_int(args["temp_adapt_steps"]);
Expand Down
1 change: 0 additions & 1 deletion src/parameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class Parameters
int burnin;
int samples;
std::vector<float> pt_chains;
int pt_num_threads;
bool adapt_temp;
int pre_adapt_steps;
int temp_adapt_steps;
Expand Down
7 changes: 5 additions & 2 deletions vignettes/mcmc_demo.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ simulated_data <- moire::simulate_data(
```

```{r load_simulated_data, eval=TRUE, include=FALSE}
# actual simulated data is contained in the package as part of the build process
simulated_data <- moire::simulated_data
```

Expand All @@ -63,17 +64,19 @@ Now that we have our data, let's go ahead and run the MCMC. Here we run 20 paral
burnin <- 1e3
num_samples <- 1e3
pt_chains <- seq(1, 0, length.out = 80)
pt_num_threads <- 20 # number of threads to use for parallel tempering
num_cores <- parallelly::availableCores() - 1 # number of threads to use for parallel processing

mcmc_results <- moire::run_mcmc(
simulated_data,
verbose = TRUE, burnin = burnin, samples_per_chain = num_samples,
pt_chains = pt_chains, pt_num_threads = pt_num_threads,
pt_chains = pt_chains, num_cores = num_cores,
adapt_temp = TRUE
)
```

```{r run_mcmc_load, eval = TRUE, include = FALSE}
# actual MCMC results are contained in the package as part of the build process. Otherwise,
# building the vignette will take a long time.
mcmc_results <- moire::mcmc_results
```
After running the MCMC, we can access the draws from the posterior distribution and analyze.
Expand Down
Loading
Loading