diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9e3d407..f5a59b2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,10 @@ Changelog 0.4.0 (2024-06-**) ------------------ +* Implemented :meth:`metalearners.cross_fit_estimator.CrossFitEstimator.clone`. + +* Added ``n_jobs_base_learners`` to :meth:`metalearners.metalearner.MetaLearner.fit`. + * Renamed :meth:`metalearners.explainer.Explainer.feature_importances`. Note this is a breaking change. diff --git a/conda.recipe/recipe.yaml b/conda.recipe/recipe.yaml index 088edf8..192234c 100644 --- a/conda.recipe/recipe.yaml +++ b/conda.recipe/recipe.yaml @@ -30,6 +30,7 @@ requirements: - typing-extensions - git_root - shap + - joblib >= 1.2.0 tests: - python: imports: diff --git a/docs/parallelism.rst b/docs/parallelism.rst index 1875865..0e6b85a 100644 --- a/docs/parallelism.rst +++ b/docs/parallelism.rst @@ -30,7 +30,8 @@ We've discovered three potential levels for executing parallelism: this would be one propensity model and an outcome model for each treatment variant. This independence translates into another possibility for parallelism. - This level of parallelism is not yet implemented in ``metalearners`` + To use parallelism at this level one can use the ``n_jobs_base_learners`` parameter of the + :py:meth:`~metalearners.metalearner.MetaLearner.fit` method of the metalearner. Our experiments leveraging parallelism at various levels reveal that there is not a 'one-size-fits-all' setting; the optimal configuration varies significantly based on factors diff --git a/environment.yml b/environment.yml index 670a3e3..98fd60c 100644 --- a/environment.yml +++ b/environment.yml @@ -34,3 +34,5 @@ dependencies: - glum - shap - matplotlib-base + # Same as sklearn + - joblib>=1.2.0 diff --git a/metalearners/cross_fit_estimator.py b/metalearners/cross_fit_estimator.py index f9bd8e1..e26d898 100644 --- a/metalearners/cross_fit_estimator.py +++ b/metalearners/cross_fit_estimator.py @@ -123,6 +123,16 @@ def _train_overall_estimator( model = self.estimator_factory(**self.estimator_params) return model.fit(X, y, **fit_params) + def clone(self) -> "CrossFitEstimator": + r"""Construct a new unfitted CrossFitEstimator with the same init parameters.""" + return CrossFitEstimator( + n_folds=self.n_folds, + estimator_factory=self.estimator_factory, + estimator_params=self.estimator_params, + enable_overall=self.enable_overall, + random_state=self.random_state, + ) + def fit( self, X: Matrix, diff --git a/metalearners/drlearner.py b/metalearners/drlearner.py index 417d245..1d24d89 100644 --- a/metalearners/drlearner.py +++ b/metalearners/drlearner.py @@ -85,6 +85,7 @@ def fit( n_jobs_cross_fitting: int | None = None, fit_params: dict | None = None, synchronize_cross_fitting: bool = True, + n_jobs_base_learners: int | None = None, ) -> Self: self._validate_treatment(w) self._validate_outcome(y) diff --git a/metalearners/metalearner.py b/metalearners/metalearner.py index 4041c01..945c791 100644 --- a/metalearners/metalearner.py +++ b/metalearners/metalearner.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod from collections.abc import Callable, Collection from copy import deepcopy +from dataclasses import dataclass from typing import TypedDict import numpy as np @@ -140,6 +141,47 @@ class _ModelSpecifications(TypedDict): predict_method: Callable[["MetaLearner"], PredictMethod] +@dataclass(frozen=True) +class _ParallelJoblibSpecification: + r"""Specification parameters for a joblib delayed call.""" + + model_kind: str + model_ord: int + X: Matrix + y: Vector + fit_params: dict | None + n_jobs_cross_fitting: int | None + cross_fit_estimator: CrossFitEstimator + cv: SplitIndices | None + + +@dataclass(frozen=True) +class _ParallelJoblibResult: + r"""Result of a parallel joblib delayed call.""" + + model_kind: str + model_ord: int + cross_fit_estimator: CrossFitEstimator + + +def _fit_cross_fit_estimator_joblib( + parallel_joblib_job: _ParallelJoblibSpecification, +) -> _ParallelJoblibResult: + r"""Helper function to call from a delayed ``joblib`` object to fit a + :class:`~metaleaners.cross_fit_estimator.CrossFitEstimator` in parallel.""" + return _ParallelJoblibResult( + model_kind=parallel_joblib_job.model_kind, + model_ord=parallel_joblib_job.model_ord, + cross_fit_estimator=parallel_joblib_job.cross_fit_estimator.fit( + X=parallel_joblib_job.X, + y=parallel_joblib_job.y, + fit_params=parallel_joblib_job.fit_params, + n_jobs_cross_fitting=parallel_joblib_job.n_jobs_cross_fitting, + cv=parallel_joblib_job.cv, + ), + ) + + class MetaLearner(ABC): r"""MetaLearner abstract class. All metalearner implementations should inherit from it. @@ -524,6 +566,62 @@ def fit_nuisance( ) return self + def _nuisance_joblib_specifications( + self, + X: Matrix, + y: Vector, + model_kind: str, + model_ord: int, + fit_params: dict | None = None, + n_jobs_cross_fitting: int | None = None, + cv: SplitIndices | None = None, + ) -> _ParallelJoblibSpecification | None: + r"""Create a :class:`metalearners.metalearner._ParallelJoblibSpecification` to + fit the corresponding nuisance model. + + ``y`` represents the objective of the given nuisance model, not necessarily the outcome of the experiment. + If pre-fitted models were passed at instantiation, these are never refitted. + """ + if model_kind in self._prefitted_nuisance_models: + return None + X_filtered = _filter_x_columns(X, self.feature_set[model_kind]) + + # Clone creates a new never fitted CrossFitEstimator, we could pass directly the + # object in self._treatment_models[model_kind][model_ord] but this could be have + # some state already set. To avoid any issues we clone it. + return _ParallelJoblibSpecification( + cross_fit_estimator=self._nuisance_models[model_kind][model_ord].clone(), + model_kind=model_kind, + model_ord=model_ord, + X=X_filtered, + y=y, + fit_params=fit_params, + n_jobs_cross_fitting=n_jobs_cross_fitting, + cv=cv, + ) + + def _assign_joblib_nuisance_results( + self, joblib_results: list[_ParallelJoblibResult] + ) -> None: + r"""Collect the ``joblib`` results and assign the fitted + :class:`~metalearners.cross_fit_estimator.CrossFitEstimator` s.""" + for result in joblib_results: + if result.model_kind not in self._nuisance_models: + raise ValueError( + f"{result.model_kind} is not a nuisance model for " + "{self.__class__.__name__}" + ) + if result.model_ord >= ( + cardinality := len(self._nuisance_models[result.model_kind]) + ): + raise ValueError( + f"{result.model_kind} has cardinality {cardinality} and " + f"model_ord is {result.model_ord}" + ) + self._nuisance_models[result.model_kind][ + result.model_ord + ] = result.cross_fit_estimator + def fit_treatment( self, X: Matrix, @@ -548,6 +646,60 @@ def fit_treatment( ) return self + def _treatment_joblib_specifications( + self, + X: Matrix, + y: Vector, + model_kind: str, + model_ord: int, + fit_params: dict | None = None, + n_jobs_cross_fitting: int | None = None, + cv: SplitIndices | None = None, + ) -> _ParallelJoblibSpecification: + r"""Create a :class:`metalearners.metalearner._ParallelJoblibSpecification` to + fit the corresponding treatment model. + + `y`` represents the objective of the given treatment model, not necessarily the outcome of the experiment. + If pre-fitted models were passed at instantiation, these are never refitted. + """ + X_filtered = _filter_x_columns(X, self.feature_set[model_kind]) + + # Clone creates a new never fitted CrossFitEstimator, we could pass directly the + # object in self._treatment_models[model_kind][model_ord] but this could be have + # some state already set. To avoid any issues we clone it. + return _ParallelJoblibSpecification( + cross_fit_estimator=self._treatment_models[model_kind][model_ord].clone(), + model_kind=model_kind, + model_ord=model_ord, + X=X_filtered, + y=y, + fit_params=fit_params, + n_jobs_cross_fitting=n_jobs_cross_fitting, + cv=cv, + ) + + def _assign_joblib_treatment_results( + self, joblib_results: list[_ParallelJoblibResult] + ) -> None: + r"""Collect the ``joblib`` results and assign the fitted + :class:`~metalearners.cross_fit_estimator.CrossFitEstimator` s.""" + for result in joblib_results: + if result.model_kind not in self._treatment_models: + raise ValueError( + f"{result.model_kind} is not a treatment model for " + "{self.__class__.__name__}" + ) + if result.model_ord >= ( + cardinality := len(self._treatment_models[result.model_kind]) + ): + raise ValueError( + f"{result.model_kind} has cardinality {cardinality} and " + f"model_ord is {result.model_ord}" + ) + self._treatment_models[result.model_kind][ + result.model_ord + ] = result.cross_fit_estimator + @abstractmethod def fit( self, @@ -557,13 +709,18 @@ def fit( n_jobs_cross_fitting: int | None = None, fit_params: dict | None = None, synchronize_cross_fitting: bool = True, + n_jobs_base_learners: int | None = None, ) -> Self: """Fit all models of the MetaLearner. If pre-fitted models were passed at instantiation, these are never refitted. - ``n_jobs_cross_fitting`` will be used at the cross-fitting level. For more information about - parallelism check :ref:`parallelism` + ``n_jobs_cross_fitting`` will be used at the cross-fitting level and + ``n_jobs_base_learners`` will be used at the stage level. ``None`` means 1 unless in a + `joblib.parallel_backend `_ + context. ``-1`` means using all processors. + For more information about parallelism check :ref:`parallelism` + ``fit_params`` is an optional ``dict`` to be forwarded to base estimator ``fit`` calls. It supports two usages patterns: diff --git a/metalearners/rlearner.py b/metalearners/rlearner.py index 68bebe4..bf39caa 100644 --- a/metalearners/rlearner.py +++ b/metalearners/rlearner.py @@ -158,6 +158,7 @@ def fit( n_jobs_cross_fitting: int | None = None, fit_params: dict | None = None, synchronize_cross_fitting: bool = True, + n_jobs_base_learners: int | None = None, epsilon: float = _EPSILON, ) -> Self: diff --git a/metalearners/slearner.py b/metalearners/slearner.py index f03684e..553b558 100644 --- a/metalearners/slearner.py +++ b/metalearners/slearner.py @@ -103,6 +103,7 @@ def fit( n_jobs_cross_fitting: int | None = None, fit_params: dict | None = None, synchronize_cross_fitting: bool = True, + n_jobs_base_learners: int | None = None, ) -> Self: self._validate_treatment(w) self._validate_outcome(y) diff --git a/metalearners/tlearner.py b/metalearners/tlearner.py index 1e39244..9380144 100644 --- a/metalearners/tlearner.py +++ b/metalearners/tlearner.py @@ -3,6 +3,7 @@ import numpy as np +from joblib import Parallel, delayed from sklearn.metrics import log_loss, root_mean_squared_error from typing_extensions import Self @@ -14,7 +15,9 @@ VARIANT_OUTCOME_MODEL, MetaLearner, _ConditionalAverageOutcomeMetaLearner, + _fit_cross_fit_estimator_joblib, _ModelSpecifications, + _ParallelJoblibSpecification, ) @@ -54,6 +57,7 @@ def fit( n_jobs_cross_fitting: int | None = None, fit_params: dict | None = None, synchronize_cross_fitting: bool = True, + n_jobs_base_learners: int | None = None, ) -> Self: self._validate_treatment(w) self._validate_outcome(y) @@ -65,17 +69,28 @@ def fit( qualified_fit_params = self._qualified_fit_params(fit_params) - # TODO: Consider multiprocessing + nuisance_jobs: list[_ParallelJoblibSpecification | None] = [] for treatment_variant in range(self.n_variants): - self.fit_nuisance( - X=index_matrix(X, self._treatment_variants_indices[treatment_variant]), - y=y[self._treatment_variants_indices[treatment_variant]], - model_kind=VARIANT_OUTCOME_MODEL, - model_ord=treatment_variant, - n_jobs_cross_fitting=n_jobs_cross_fitting, - fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL], + nuisance_jobs.append( + self._nuisance_joblib_specifications( + X=index_matrix( + X, self._treatment_variants_indices[treatment_variant] + ), + y=y[self._treatment_variants_indices[treatment_variant]], + model_kind=VARIANT_OUTCOME_MODEL, + model_ord=treatment_variant, + n_jobs_cross_fitting=n_jobs_cross_fitting, + fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL], + ) ) + parallel = Parallel(n_jobs=n_jobs_base_learners) + results = parallel( + delayed(_fit_cross_fit_estimator_joblib)(job) + for job in nuisance_jobs + if job is not None + ) + self._assign_joblib_nuisance_results(results) return self def predict( diff --git a/metalearners/xlearner.py b/metalearners/xlearner.py index 751d14c..64d059f 100644 --- a/metalearners/xlearner.py +++ b/metalearners/xlearner.py @@ -77,6 +77,7 @@ def fit( n_jobs_cross_fitting: int | None = None, fit_params: dict | None = None, synchronize_cross_fitting: bool = True, + n_jobs_base_learners: int | None = None, ) -> Self: self._validate_treatment(w) self._validate_outcome(y) diff --git a/tests/test_metalearner.py b/tests/test_metalearner.py index 1e41423..9de1af3 100644 --- a/tests/test_metalearner.py +++ b/tests/test_metalearner.py @@ -76,6 +76,7 @@ def fit( n_jobs_cross_fitting: int | None = None, fit_params: dict | None = None, synchronize_cross_fitting: bool = True, + n_jobs_base_learners: int | None = None, ): for model_kind in self.__class__.nuisance_model_specifications(): for model_ord in range( @@ -983,3 +984,39 @@ def test_validate_n_folds_synchronize(n_folds, success): else: with pytest.raises(ValueError, match="synchronization"): _validate_n_folds_synchronize(n_folds) + + +@pytest.mark.parametrize( + "implementation", + [TLearner], +) +def test_n_jobs_base_learners(implementation, rng): + n_variants = 5 + X = rng.standard_normal((1000, 10)) + y = rng.standard_normal(1000) + w = rng.integers(0, n_variants, 1000) + + ml = implementation( + is_classification=False, + n_variants=n_variants, + nuisance_model_factory=LinearRegression, + treatment_model_factory=LinearRegression, + propensity_model_factory=LogisticRegression, + random_state=_SEED, + ) + + ml.fit(X, y, w, n_jobs_base_learners=None) + + ml_2 = implementation( + is_classification=False, + n_variants=n_variants, + nuisance_model_factory=LinearRegression, + treatment_model_factory=LinearRegression, + propensity_model_factory=LogisticRegression, + random_state=_SEED, + ) + + ml_2.fit(X, y, w, n_jobs_base_learners=-1) + + np.testing.assert_allclose(ml.predict(X, False), ml_2.predict(X, False)) + np.testing.assert_allclose(ml.predict(X, True), ml_2.predict(X, True))