Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallelization #159

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions R/normalize.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
#'
#' @return normalized data of the same class as \code{population}.
#'
#' NOTE: Experimental - keep track of feature columns changed via following edit. If dispersion is zero, i.e., one or more columns in stratum (control wells) have identical values across replicates ( = no variation), set such dispersion value to 0.000001 instead of 0. Doing so will replace NaNs in scaled data frame to 0 unless control well values in variable columns or test well columns have significant deviation from that in control wells.
#'
#' @importFrom magrittr %>%
#' @importFrom magrittr %<>%
#' @importFrom rlang :=
#' @importFrom stats cor mad median sd setNames
#' @importFrom future plan multisession
#' @importFrom furrr future_map
#'
#' @examples
#' suppressMessages(suppressWarnings(library(magrittr)))
Expand All @@ -32,7 +36,7 @@
#' cytominer::normalize(population, variables, strata, sample, operation = "standardize")
#' @export
normalize <- function(population, variables, strata, sample,
operation = "standardize", ...) {
operation = "standardize", nthreads = 4, ...) {
scale <- function(data, location, dispersion, variables) {
if (is.data.frame(data)) {
futile.logger::flog.debug(paste0(
Expand All @@ -42,9 +46,9 @@ normalize <- function(population, variables, strata, sample,
))

dplyr::bind_cols(
data %>% dplyr::select(-variables),
data %>% dplyr::select(! any_of(variables)),
data %>%
dplyr::select(variables) %>%
dplyr::select(all_of(variables)) %>%
as.matrix() %>%
base::scale(
center = as.matrix(location),
Expand Down Expand Up @@ -104,14 +108,25 @@ normalize <- function(population, variables, strata, sample,
# TOD: Below, change to select(across(all_of(strata)))
groups <-
sample %>%
dplyr::select(strata) %>%
dplyr::select(all_of(strata)) %>%
dplyr::distinct() %>%
dplyr::collect()

# enable parallel map fn
futile.logger::flog.info(
glue::glue("Starting future_map parallel function: Using { nthreads } workers.")
)
futile.logger::flog.info(
glue::glue("Processing { nrow(groups) } groups")
)
future::plan(future::multisession, workers = nthreads)

Reduce(
dplyr::union_all,
Map(
f = function(group) {
furrr::future_map(
.x = split(x = groups, f = seq(nrow(groups))),
.f = function(group) {
futile.logger::flog.info(glue::glue("Start normalize: { group }"))
futile.logger::flog.debug(group)
futile.logger::flog.debug("\tstratum")
stratum <-
Expand All @@ -131,10 +146,18 @@ normalize <- function(population, variables, strata, sample,
# TODO: Migrate to `dplyr::across` once this issue is fixed
# https://github.com/tidyverse/dbplyr/issues/480#issuecomment-811814636

## NOTE: Experimental - keep track of feature columns changed via following edit.
## if dispersion is zero, i.e., one or more columns in stratum (control wells)
## have identical values across replicates ( = no variation), set such
## dispersion value to 0.000001 instead of 0. Doing so will replace NaNs
## in scaled data frame to 0 unless control well values in variable columns or
## test well columns have significant deviation from that in control wells.

futile.logger::flog.debug("\tdispersion")
dispersion <-
stratum %>%
dplyr::summarise_at(.funs = dispersion, .vars = variables) %>%
dplyr::mutate(across(everything(), ~ if_else(. == 0, 0.000001, .))) %>%
dplyr::collect()

futile.logger::flog.debug("\tscale")
Expand All @@ -143,10 +166,10 @@ normalize <- function(population, variables, strata, sample,
dplyr::inner_join(y = group, by = names(group), copy = TRUE) %>%
scale(location, dispersion, variables)
futile.logger::flog.debug("\tscaled")
futile.logger::flog.info(glue::glue("End normalize: { group }"))

scaled
},
split(x = groups, f = seq(nrow(groups)))
}, .progress = TRUE
)
)
}