From 43b979ee4baa758bc33b2674d67c5da0d35950b9 Mon Sep 17 00:00:00 2001 From: Samir Amin Date: Wed, 8 May 2024 10:06:04 -0400 Subject: [PATCH] Implement parallelization Suggesting an improvement by adding parallelization for scaling using furrr::future_map function. On my end, resulting scaled df is identical with or without using parallel mode. Also, using dispersion default value of 0.000001 instead of 0 to avoid NaN for entries not divisible by zero. ``` dispersion <- stratum %>% dplyr::summarise_at(.funs = dispersion, .vars = variables) %>% dplyr::mutate(across(everything(), ~ if_else(. == 0, 0.000001, .))) %>% dplyr::collect() ``` --- R/normalize.R | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/R/normalize.R b/R/normalize.R index 56514dd..497807a 100644 --- a/R/normalize.R +++ b/R/normalize.R @@ -11,10 +11,14 @@ #' #' @return normalized data of the same class as \code{population}. #' +#' NOTE: Experimental - keep track of feature columns changed via following edit. If dispersion is zero, i.e., one or more columns in stratum (control wells) have identical values across replicates ( = no variation), set such dispersion value to 0.000001 instead of 0. Doing so will replace NaNs in scaled data frame to 0 unless control well values in variable columns or test well columns have significant deviation from that in control wells. +#' #' @importFrom magrittr %>% #' @importFrom magrittr %<>% #' @importFrom rlang := #' @importFrom stats cor mad median sd setNames +#' @importFrom future plan multisession +#' @importFrom furrr future_map #' #' @examples #' suppressMessages(suppressWarnings(library(magrittr))) @@ -32,7 +36,7 @@ #' cytominer::normalize(population, variables, strata, sample, operation = "standardize") #' @export normalize <- function(population, variables, strata, sample, - operation = "standardize", ...) { + operation = "standardize", nthreads = 4, ...) { scale <- function(data, location, dispersion, variables) { if (is.data.frame(data)) { futile.logger::flog.debug(paste0( @@ -42,9 +46,9 @@ normalize <- function(population, variables, strata, sample, )) dplyr::bind_cols( - data %>% dplyr::select(-variables), + data %>% dplyr::select(! any_of(variables)), data %>% - dplyr::select(variables) %>% + dplyr::select(all_of(variables)) %>% as.matrix() %>% base::scale( center = as.matrix(location), @@ -104,14 +108,25 @@ normalize <- function(population, variables, strata, sample, # TOD: Below, change to select(across(all_of(strata))) groups <- sample %>% - dplyr::select(strata) %>% + dplyr::select(all_of(strata)) %>% dplyr::distinct() %>% dplyr::collect() + # enable parallel map fn + futile.logger::flog.info( + glue::glue("Starting future_map parallel function: Using { nthreads } workers.") + ) + futile.logger::flog.info( + glue::glue("Processing { nrow(groups) } groups") + ) + future::plan(future::multisession, workers = nthreads) + Reduce( dplyr::union_all, - Map( - f = function(group) { + furrr::future_map( + .x = split(x = groups, f = seq(nrow(groups))), + .f = function(group) { + futile.logger::flog.info(glue::glue("Start normalize: { group }")) futile.logger::flog.debug(group) futile.logger::flog.debug("\tstratum") stratum <- @@ -131,10 +146,18 @@ normalize <- function(population, variables, strata, sample, # TODO: Migrate to `dplyr::across` once this issue is fixed # https://github.com/tidyverse/dbplyr/issues/480#issuecomment-811814636 + ## NOTE: Experimental - keep track of feature columns changed via following edit. + ## if dispersion is zero, i.e., one or more columns in stratum (control wells) + ## have identical values across replicates ( = no variation), set such + ## dispersion value to 0.000001 instead of 0. Doing so will replace NaNs + ## in scaled data frame to 0 unless control well values in variable columns or + ## test well columns have significant deviation from that in control wells. + futile.logger::flog.debug("\tdispersion") dispersion <- stratum %>% dplyr::summarise_at(.funs = dispersion, .vars = variables) %>% + dplyr::mutate(across(everything(), ~ if_else(. == 0, 0.000001, .))) %>% dplyr::collect() futile.logger::flog.debug("\tscale") @@ -143,10 +166,10 @@ normalize <- function(population, variables, strata, sample, dplyr::inner_join(y = group, by = names(group), copy = TRUE) %>% scale(location, dispersion, variables) futile.logger::flog.debug("\tscaled") + futile.logger::flog.info(glue::glue("End normalize: { group }")) scaled - }, - split(x = groups, f = seq(nrow(groups))) + }, .progress = TRUE ) ) }