From b62ad9de05299f1d5091d51c24ab1b5f921c8358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francesc=20Mart=C3=AD=20Escofet?= <154450563+FrancescMartiEscofetQC@users.noreply.github.com> Date: Fri, 14 Jun 2024 12:48:56 +0200 Subject: [PATCH] X, R, DR learners joblib --- metalearners/drlearner.py | 71 ++++++++++++++++++++---------- metalearners/rlearner.py | 72 ++++++++++++++++++++---------- metalearners/xlearner.py | 92 +++++++++++++++++++++++++-------------- tests/test_learner.py | 8 +++- tests/test_metalearner.py | 2 +- 5 files changed, 163 insertions(+), 82 deletions(-) diff --git a/metalearners/drlearner.py b/metalearners/drlearner.py index 1d24d89..ea9b2f1 100644 --- a/metalearners/drlearner.py +++ b/metalearners/drlearner.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: BSD-3-Clause import numpy as np +from joblib import Parallel, delayed from typing_extensions import Self from metalearners._typing import Matrix, OosMethod, Vector @@ -22,7 +23,9 @@ VARIANT_OUTCOME_MODEL, MetaLearner, _ConditionalAverageOutcomeMetaLearner, + _fit_cross_fit_estimator_joblib, _ModelSpecifications, + _ParallelJoblibSpecification, ) _EPSILON = 1e-09 @@ -102,27 +105,43 @@ def fit( else: cv_split_indices = None - # TODO: Consider multiprocessing + nuisance_jobs: list[_ParallelJoblibSpecification | None] = [] for treatment_variant in range(self.n_variants): - self.fit_nuisance( - X=index_matrix(X, self._treatment_variants_indices[treatment_variant]), - y=y[self._treatment_variants_indices[treatment_variant]], - model_kind=VARIANT_OUTCOME_MODEL, - model_ord=treatment_variant, + nuisance_jobs.append( + self._nuisance_joblib_specifications( + X=index_matrix( + X, self._treatment_variants_indices[treatment_variant] + ), + y=y[self._treatment_variants_indices[treatment_variant]], + model_kind=VARIANT_OUTCOME_MODEL, + model_ord=treatment_variant, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL], + ) + ) + + nuisance_jobs.append( + self._nuisance_joblib_specifications( + X=X, + y=w, + model_kind=PROPENSITY_MODEL, + model_ord=0, n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL], + fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL], + cv=cv_split_indices, ) + ) - self.fit_nuisance( - X=X, - y=w, - model_kind=PROPENSITY_MODEL, - model_ord=0, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL], - cv=cv_split_indices, + parallel = Parallel(n_jobs=n_jobs_base_learners) + results = parallel( + delayed(_fit_cross_fit_estimator_joblib)(job) + for job in nuisance_jobs + if job is not None ) + self._assign_joblib_nuisance_results(results) + + treatment_jobs: list[_ParallelJoblibSpecification] = [] for treatment_variant in range(1, self.n_variants): pseudo_outcomes = self._pseudo_outcome( X=X, @@ -131,15 +150,21 @@ def fit( treatment_variant=treatment_variant, ) - self.fit_treatment( - X=X, - y=pseudo_outcomes, - model_kind=TREATMENT_MODEL, - model_ord=treatment_variant - 1, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL], - cv=cv_split_indices, + treatment_jobs.append( + self._treatment_joblib_specifications( + X=X, + y=pseudo_outcomes, + model_kind=TREATMENT_MODEL, + model_ord=treatment_variant - 1, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL], + cv=cv_split_indices, + ) ) + results = parallel( + delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs + ) + self._assign_joblib_treatment_results(results) return self def predict( diff --git a/metalearners/rlearner.py b/metalearners/rlearner.py index bf39caa..6a09847 100644 --- a/metalearners/rlearner.py +++ b/metalearners/rlearner.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: BSD-3-Clause import numpy as np +from joblib import Parallel, delayed from sklearn.metrics import log_loss, root_mean_squared_error from typing_extensions import Self @@ -23,7 +24,9 @@ TREATMENT, TREATMENT_MODEL, MetaLearner, + _fit_cross_fit_estimator_joblib, _ModelSpecifications, + _ParallelJoblibSpecification, ) OUTCOME_MODEL = "outcome_model" @@ -175,25 +178,40 @@ def fit( else: cv_split_indices = None - self.fit_nuisance( - X=X, - y=w, - model_kind=PROPENSITY_MODEL, - model_ord=0, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL], - cv=cv_split_indices, + nuisance_jobs: list[_ParallelJoblibSpecification | None] = [] + + nuisance_jobs.append( + self._nuisance_joblib_specifications( + X=X, + y=w, + model_kind=PROPENSITY_MODEL, + model_ord=0, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL], + cv=cv_split_indices, + ) ) - self.fit_nuisance( - X=X, - y=y, - model_kind=OUTCOME_MODEL, - model_ord=0, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[NUISANCE][OUTCOME_MODEL], - cv=cv_split_indices, + nuisance_jobs.append( + self._nuisance_joblib_specifications( + X=X, + y=y, + model_kind=OUTCOME_MODEL, + model_ord=0, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[NUISANCE][OUTCOME_MODEL], + cv=cv_split_indices, + ) + ) + + parallel = Parallel(n_jobs=n_jobs_base_learners) + results = parallel( + delayed(_fit_cross_fit_estimator_joblib)(job) + for job in nuisance_jobs + if job is not None ) + self._assign_joblib_nuisance_results(results) + treatment_jobs: list[_ParallelJoblibSpecification] = [] for treatment_variant in range(1, self.n_variants): is_treatment = w == treatment_variant @@ -213,15 +231,21 @@ def fit( X_filtered = index_matrix(X, mask) - self.fit_treatment( - X=X_filtered, - y=pseudo_outcomes, - model_kind=TREATMENT_MODEL, - model_ord=treatment_variant - 1, - fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL] - | {_SAMPLE_WEIGHT: weights}, - n_jobs_cross_fitting=n_jobs_cross_fitting, + treatment_jobs.append( + self._treatment_joblib_specifications( + X=X_filtered, + y=pseudo_outcomes, + model_kind=TREATMENT_MODEL, + model_ord=treatment_variant - 1, + fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL] + | {_SAMPLE_WEIGHT: weights}, + n_jobs_cross_fitting=n_jobs_cross_fitting, + ) ) + results = parallel( + delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs + ) + self._assign_joblib_treatment_results(results) return self def predict( diff --git a/metalearners/xlearner.py b/metalearners/xlearner.py index 64d059f..729899c 100644 --- a/metalearners/xlearner.py +++ b/metalearners/xlearner.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: BSD-3-Clause import numpy as np +from joblib import Parallel, delayed from typing_extensions import Self from metalearners._typing import Matrix, OosMethod, Vector @@ -20,7 +21,9 @@ VARIANT_OUTCOME_MODEL, MetaLearner, _ConditionalAverageOutcomeMetaLearner, + _fit_cross_fit_estimator_joblib, _ModelSpecifications, + _ParallelJoblibSpecification, ) CONTROL_EFFECT_MODEL = "control_effect_model" @@ -98,51 +101,76 @@ def fit( cv_split_indices = None cvs.append(cv_split_indices) - # TODO: Consider multiprocessing + nuisance_jobs: list[_ParallelJoblibSpecification | None] = [] for treatment_variant in range(self.n_variants): - self.fit_nuisance( - X=index_matrix(X, self._treatment_variants_indices[treatment_variant]), - y=y[self._treatment_variants_indices[treatment_variant]], - model_kind=VARIANT_OUTCOME_MODEL, - model_ord=treatment_variant, + nuisance_jobs.append( + self._nuisance_joblib_specifications( + X=index_matrix( + X, self._treatment_variants_indices[treatment_variant] + ), + y=y[self._treatment_variants_indices[treatment_variant]], + model_kind=VARIANT_OUTCOME_MODEL, + model_ord=treatment_variant, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL], + cv=cvs[treatment_variant], + ) + ) + + nuisance_jobs.append( + self._nuisance_joblib_specifications( + X=X, + y=w, + model_kind=PROPENSITY_MODEL, + model_ord=0, n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL], - cv=cvs[treatment_variant], + fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL], ) + ) - self.fit_nuisance( - X=X, - y=w, - model_kind=PROPENSITY_MODEL, - model_ord=0, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL], + parallel = Parallel(n_jobs=n_jobs_base_learners) + results = parallel( + delayed(_fit_cross_fit_estimator_joblib)(job) + for job in nuisance_jobs + if job is not None ) + self._assign_joblib_nuisance_results(results) + treatment_jobs: list[_ParallelJoblibSpecification] = [] for treatment_variant in range(1, self.n_variants): imputed_te_control, imputed_te_treatment = self._pseudo_outcome( X, y, w, treatment_variant ) - - self.fit_treatment( - X=index_matrix(X, self._treatment_variants_indices[treatment_variant]), - y=imputed_te_treatment, - model_kind=TREATMENT_EFFECT_MODEL, - model_ord=treatment_variant - 1, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[TREATMENT][TREATMENT_EFFECT_MODEL], - cv=cvs[treatment_variant], + treatment_jobs.append( + self._treatment_joblib_specifications( + X=index_matrix( + X, self._treatment_variants_indices[treatment_variant] + ), + y=imputed_te_treatment, + model_kind=TREATMENT_EFFECT_MODEL, + model_ord=treatment_variant - 1, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[TREATMENT][TREATMENT_EFFECT_MODEL], + cv=cvs[treatment_variant], + ) ) - self.fit_treatment( - X=index_matrix(X, self._treatment_variants_indices[0]), - y=imputed_te_control, - model_kind=CONTROL_EFFECT_MODEL, - model_ord=treatment_variant - 1, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[TREATMENT][CONTROL_EFFECT_MODEL], - cv=cvs[0], + + treatment_jobs.append( + self._treatment_joblib_specifications( + X=index_matrix(X, self._treatment_variants_indices[0]), + y=imputed_te_control, + model_kind=CONTROL_EFFECT_MODEL, + model_ord=treatment_variant - 1, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[TREATMENT][CONTROL_EFFECT_MODEL], + cv=cvs[0], + ) ) + results = parallel( + delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs + ) + self._assign_joblib_treatment_results(results) return self def predict( diff --git a/tests/test_learner.py b/tests/test_learner.py index e76018e..daf1a3e 100644 --- a/tests/test_learner.py +++ b/tests/test_learner.py @@ -141,6 +141,7 @@ def test_learner_synthetic( observed_outcomes_train, treatment_train, synchronize_cross_fitting=True, + n_jobs_base_learners=-1, ) # In sample CATEs @@ -236,6 +237,7 @@ def test_learner_synthetic_oos_ate(metalearner, treatment_kind, request): observed_outcomes_train, treatment_train, synchronize_cross_fitting=True, + n_jobs_base_learners=-1, ) for oos_method in _OOS_WHITELIST: cate_estimates = learner.predict( @@ -427,12 +429,14 @@ def test_x_t_conditional_average_outcomes(outcome_kind, is_oos, request): observed_outcomes_train, treatment_train, synchronize_cross_fitting=False, + n_jobs_base_learners=-1, ) xlearner.fit( covariates_train, observed_outcomes_train, treatment_train, synchronize_cross_fitting=False, + n_jobs_base_learners=-1, ) if not is_oos: @@ -707,7 +711,7 @@ def test_model_reusage(outcome_kind, request): n_variants=len(np.unique(treatment)), nuisance_model_params=nuisance_learner_params, ) - tlearner.fit(covariates, observed_outcomes, treatment) + tlearner.fit(covariates, observed_outcomes, treatment, n_jobs_base_learners=-1) xlearner = XLearner( is_classification=is_classification, n_variants=len(np.unique(treatment)), @@ -731,7 +735,7 @@ def test_model_reusage(outcome_kind, request): tlearner_pred_before_refitting = tlearner.predict_conditional_average_outcomes( covariates, False ) - xlearner.fit(covariates, observed_outcomes, treatment) + xlearner.fit(covariates, observed_outcomes, treatment, n_jobs_base_learners=-1) np.testing.assert_allclose( tlearner.predict_conditional_average_outcomes(covariates, False), tlearner_pred_before_refitting, diff --git a/tests/test_metalearner.py b/tests/test_metalearner.py index 9de1af3..d0d8956 100644 --- a/tests/test_metalearner.py +++ b/tests/test_metalearner.py @@ -988,7 +988,7 @@ def test_validate_n_folds_synchronize(n_folds, success): @pytest.mark.parametrize( "implementation", - [TLearner], + [TLearner, XLearner, RLearner, DRLearner], ) def test_n_jobs_base_learners(implementation, rng): n_variants = 5