Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tlearner joblib #1

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Changelog
0.4.0 (2024-06-**)
------------------

* Implemented :meth:`metalearners.cross_fit_estimator.CrossFitEstimator.clone`.

* Added ``n_jobs_base_learners`` to :meth:`metalearners.metalearner.MetaLearner.fit`.

* Renamed :meth:`metalearners.explainer.Explainer.feature_importances`. Note this is
a breaking change.

Expand Down
1 change: 1 addition & 0 deletions conda.recipe/recipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ requirements:
- typing-extensions
- git_root
- shap
- joblib >= 1.2.0
tests:
- python:
imports:
Expand Down
3 changes: 2 additions & 1 deletion docs/parallelism.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ We've discovered three potential levels for executing parallelism:
this would be one propensity model and an outcome model for each treatment variant.
This independence translates into another possibility for parallelism.

This level of parallelism is not yet implemented in ``metalearners``
To use parallelism at this level one can use the ``n_jobs_base_learners`` parameter of the
:py:meth:`~metalearners.metalearner.MetaLearner.fit` method of the metalearner.

Our experiments leveraging parallelism at various levels reveal that there is not a
'one-size-fits-all' setting; the optimal configuration varies significantly based on factors
Expand Down
2 changes: 2 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ dependencies:
- glum
- shap
- matplotlib-base
# Same as sklearn
- joblib>=1.2.0
10 changes: 10 additions & 0 deletions metalearners/cross_fit_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ def _train_overall_estimator(
model = self.estimator_factory(**self.estimator_params)
return model.fit(X, y, **fit_params)

def clone(self) -> "CrossFitEstimator":
r"""Construct a new unfitted CrossFitEstimator with the same init parameters."""
return CrossFitEstimator(
n_folds=self.n_folds,
estimator_factory=self.estimator_factory,
estimator_params=self.estimator_params,
enable_overall=self.enable_overall,
random_state=self.random_state,
)

def fit(
self,
X: Matrix,
Expand Down
1 change: 1 addition & 0 deletions metalearners/drlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def fit(
n_jobs_cross_fitting: int | None = None,
fit_params: dict | None = None,
synchronize_cross_fitting: bool = True,
n_jobs_base_learners: int | None = None,
) -> Self:
self._validate_treatment(w)
self._validate_outcome(y)
Expand Down
161 changes: 159 additions & 2 deletions metalearners/metalearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import ABC, abstractmethod
from collections.abc import Callable, Collection
from copy import deepcopy
from dataclasses import dataclass
from typing import TypedDict

import numpy as np
Expand Down Expand Up @@ -140,6 +141,47 @@ class _ModelSpecifications(TypedDict):
predict_method: Callable[["MetaLearner"], PredictMethod]


@dataclass(frozen=True)
class _ParallelJoblibSpecification:
r"""Specification parameters for a joblib delayed call."""

model_kind: str
model_ord: int
X: Matrix
y: Vector
fit_params: dict | None
n_jobs_cross_fitting: int | None
cross_fit_estimator: CrossFitEstimator
cv: SplitIndices | None


@dataclass(frozen=True)
class _ParallelJoblibResult:
r"""Result of a parallel joblib delayed call."""

model_kind: str
model_ord: int
cross_fit_estimator: CrossFitEstimator


def _fit_cross_fit_estimator_joblib(
parallel_joblib_job: _ParallelJoblibSpecification,
) -> _ParallelJoblibResult:
r"""Helper function to call from a delayed ``joblib`` object to fit a
:class:`~metaleaners.cross_fit_estimator.CrossFitEstimator` in parallel."""
return _ParallelJoblibResult(
model_kind=parallel_joblib_job.model_kind,
model_ord=parallel_joblib_job.model_ord,
cross_fit_estimator=parallel_joblib_job.cross_fit_estimator.fit(
X=parallel_joblib_job.X,
y=parallel_joblib_job.y,
fit_params=parallel_joblib_job.fit_params,
n_jobs_cross_fitting=parallel_joblib_job.n_jobs_cross_fitting,
cv=parallel_joblib_job.cv,
),
)


class MetaLearner(ABC):
r"""MetaLearner abstract class. All metalearner implementations should inherit from
it.
Expand Down Expand Up @@ -524,6 +566,62 @@ def fit_nuisance(
)
return self

def _nuisance_joblib_specifications(
self,
X: Matrix,
y: Vector,
model_kind: str,
model_ord: int,
fit_params: dict | None = None,
n_jobs_cross_fitting: int | None = None,
cv: SplitIndices | None = None,
) -> _ParallelJoblibSpecification | None:
r"""Create a :class:`metalearners.metalearner._ParallelJoblibSpecification` to
fit the corresponding nuisance model.

``y`` represents the objective of the given nuisance model, not necessarily the outcome of the experiment.
If pre-fitted models were passed at instantiation, these are never refitted.
"""
if model_kind in self._prefitted_nuisance_models:
return None
X_filtered = _filter_x_columns(X, self.feature_set[model_kind])

# Clone creates a new never fitted CrossFitEstimator, we could pass directly the
# object in self._treatment_models[model_kind][model_ord] but this could be have
# some state already set. To avoid any issues we clone it.
return _ParallelJoblibSpecification(
cross_fit_estimator=self._nuisance_models[model_kind][model_ord].clone(),
model_kind=model_kind,
model_ord=model_ord,
X=X_filtered,
y=y,
fit_params=fit_params,
n_jobs_cross_fitting=n_jobs_cross_fitting,
cv=cv,
)

def _assign_joblib_nuisance_results(
self, joblib_results: list[_ParallelJoblibResult]
) -> None:
r"""Collect the ``joblib`` results and assign the fitted
:class:`~metalearners.cross_fit_estimator.CrossFitEstimator` s."""
for result in joblib_results:
if result.model_kind not in self._nuisance_models:
raise ValueError(
f"{result.model_kind} is not a nuisance model for "
"{self.__class__.__name__}"
)
if result.model_ord >= (
cardinality := len(self._nuisance_models[result.model_kind])
):
raise ValueError(
f"{result.model_kind} has cardinality {cardinality} and "
f"model_ord is {result.model_ord}"
)
self._nuisance_models[result.model_kind][
result.model_ord
] = result.cross_fit_estimator

def fit_treatment(
self,
X: Matrix,
Expand All @@ -548,6 +646,60 @@ def fit_treatment(
)
return self

def _treatment_joblib_specifications(
self,
X: Matrix,
y: Vector,
model_kind: str,
model_ord: int,
fit_params: dict | None = None,
n_jobs_cross_fitting: int | None = None,
cv: SplitIndices | None = None,
) -> _ParallelJoblibSpecification:
r"""Create a :class:`metalearners.metalearner._ParallelJoblibSpecification` to
fit the corresponding treatment model.

`y`` represents the objective of the given treatment model, not necessarily the outcome of the experiment.
If pre-fitted models were passed at instantiation, these are never refitted.
"""
X_filtered = _filter_x_columns(X, self.feature_set[model_kind])

# Clone creates a new never fitted CrossFitEstimator, we could pass directly the
# object in self._treatment_models[model_kind][model_ord] but this could be have
# some state already set. To avoid any issues we clone it.
return _ParallelJoblibSpecification(
cross_fit_estimator=self._treatment_models[model_kind][model_ord].clone(),
model_kind=model_kind,
model_ord=model_ord,
X=X_filtered,
y=y,
fit_params=fit_params,
n_jobs_cross_fitting=n_jobs_cross_fitting,
cv=cv,
)

def _assign_joblib_treatment_results(
self, joblib_results: list[_ParallelJoblibResult]
) -> None:
r"""Collect the ``joblib`` results and assign the fitted
:class:`~metalearners.cross_fit_estimator.CrossFitEstimator` s."""
for result in joblib_results:
if result.model_kind not in self._treatment_models:
raise ValueError(
f"{result.model_kind} is not a treatment model for "
"{self.__class__.__name__}"
)
if result.model_ord >= (
cardinality := len(self._treatment_models[result.model_kind])
):
raise ValueError(
f"{result.model_kind} has cardinality {cardinality} and "
f"model_ord is {result.model_ord}"
)
self._treatment_models[result.model_kind][
result.model_ord
] = result.cross_fit_estimator

@abstractmethod
def fit(
self,
Expand All @@ -557,13 +709,18 @@ def fit(
n_jobs_cross_fitting: int | None = None,
fit_params: dict | None = None,
synchronize_cross_fitting: bool = True,
n_jobs_base_learners: int | None = None,
) -> Self:
"""Fit all models of the MetaLearner.

If pre-fitted models were passed at instantiation, these are never refitted.

``n_jobs_cross_fitting`` will be used at the cross-fitting level. For more information about
parallelism check :ref:`parallelism`
``n_jobs_cross_fitting`` will be used at the cross-fitting level and
``n_jobs_base_learners`` will be used at the stage level. ``None`` means 1 unless in a
`joblib.parallel_backend <https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_backend.html#joblib.parallel_backend>`_
context. ``-1`` means using all processors.
For more information about parallelism check :ref:`parallelism`


``fit_params`` is an optional ``dict`` to be forwarded to base estimator ``fit`` calls. It supports
two usages patterns:
Expand Down
1 change: 1 addition & 0 deletions metalearners/rlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def fit(
n_jobs_cross_fitting: int | None = None,
fit_params: dict | None = None,
synchronize_cross_fitting: bool = True,
n_jobs_base_learners: int | None = None,
epsilon: float = _EPSILON,
) -> Self:

Expand Down
1 change: 1 addition & 0 deletions metalearners/slearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def fit(
n_jobs_cross_fitting: int | None = None,
fit_params: dict | None = None,
synchronize_cross_fitting: bool = True,
n_jobs_base_learners: int | None = None,
) -> Self:
self._validate_treatment(w)
self._validate_outcome(y)
Expand Down
31 changes: 23 additions & 8 deletions metalearners/tlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


import numpy as np
from joblib import Parallel, delayed
from sklearn.metrics import log_loss, root_mean_squared_error
from typing_extensions import Self

Expand All @@ -14,7 +15,9 @@
VARIANT_OUTCOME_MODEL,
MetaLearner,
_ConditionalAverageOutcomeMetaLearner,
_fit_cross_fit_estimator_joblib,
_ModelSpecifications,
_ParallelJoblibSpecification,
)


Expand Down Expand Up @@ -54,6 +57,7 @@ def fit(
n_jobs_cross_fitting: int | None = None,
fit_params: dict | None = None,
synchronize_cross_fitting: bool = True,
n_jobs_base_learners: int | None = None,
) -> Self:
self._validate_treatment(w)
self._validate_outcome(y)
Expand All @@ -65,17 +69,28 @@ def fit(

qualified_fit_params = self._qualified_fit_params(fit_params)

# TODO: Consider multiprocessing
nuisance_jobs: list[_ParallelJoblibSpecification | None] = []
for treatment_variant in range(self.n_variants):
self.fit_nuisance(
X=index_matrix(X, self._treatment_variants_indices[treatment_variant]),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=index_matrix(
X, self._treatment_variants_indices[treatment_variant]
),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
)
)

parallel = Parallel(n_jobs=n_jobs_base_learners)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job)
for job in nuisance_jobs
if job is not None
)
self._assign_joblib_nuisance_results(results)
return self

def predict(
Expand Down
1 change: 1 addition & 0 deletions metalearners/xlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def fit(
n_jobs_cross_fitting: int | None = None,
fit_params: dict | None = None,
synchronize_cross_fitting: bool = True,
n_jobs_base_learners: int | None = None,
) -> Self:
self._validate_treatment(w)
self._validate_outcome(y)
Expand Down
37 changes: 37 additions & 0 deletions tests/test_metalearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def fit(
n_jobs_cross_fitting: int | None = None,
fit_params: dict | None = None,
synchronize_cross_fitting: bool = True,
n_jobs_base_learners: int | None = None,
):
for model_kind in self.__class__.nuisance_model_specifications():
for model_ord in range(
Expand Down Expand Up @@ -983,3 +984,39 @@ def test_validate_n_folds_synchronize(n_folds, success):
else:
with pytest.raises(ValueError, match="synchronization"):
_validate_n_folds_synchronize(n_folds)


@pytest.mark.parametrize(
"implementation",
[TLearner],
)
def test_n_jobs_base_learners(implementation, rng):
n_variants = 5
X = rng.standard_normal((1000, 10))
y = rng.standard_normal(1000)
w = rng.integers(0, n_variants, 1000)

ml = implementation(
is_classification=False,
n_variants=n_variants,
nuisance_model_factory=LinearRegression,
treatment_model_factory=LinearRegression,
propensity_model_factory=LogisticRegression,
random_state=_SEED,
)

ml.fit(X, y, w, n_jobs_base_learners=None)

ml_2 = implementation(
is_classification=False,
n_variants=n_variants,
nuisance_model_factory=LinearRegression,
treatment_model_factory=LinearRegression,
propensity_model_factory=LogisticRegression,
random_state=_SEED,
)

ml_2.fit(X, y, w, n_jobs_base_learners=-1)

np.testing.assert_allclose(ml.predict(X, False), ml_2.predict(X, False))
np.testing.assert_allclose(ml.predict(X, True), ml_2.predict(X, True))