Skip to content

Commit

Permalink
X, R, DR learners joblib
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancescMartiEscofetQC committed Jun 14, 2024
1 parent dfa95d8 commit b62ad9d
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 82 deletions.
71 changes: 48 additions & 23 deletions metalearners/drlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# # SPDX-License-Identifier: BSD-3-Clause

import numpy as np
from joblib import Parallel, delayed
from typing_extensions import Self

from metalearners._typing import Matrix, OosMethod, Vector
Expand All @@ -22,7 +23,9 @@
VARIANT_OUTCOME_MODEL,
MetaLearner,
_ConditionalAverageOutcomeMetaLearner,
_fit_cross_fit_estimator_joblib,
_ModelSpecifications,
_ParallelJoblibSpecification,
)

_EPSILON = 1e-09
Expand Down Expand Up @@ -102,27 +105,43 @@ def fit(
else:
cv_split_indices = None

# TODO: Consider multiprocessing
nuisance_jobs: list[_ParallelJoblibSpecification | None] = []
for treatment_variant in range(self.n_variants):
self.fit_nuisance(
X=index_matrix(X, self._treatment_variants_indices[treatment_variant]),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=index_matrix(
X, self._treatment_variants_indices[treatment_variant]
),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
)
)

nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
)
)

self.fit_nuisance(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
parallel = Parallel(n_jobs=n_jobs_base_learners)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job)
for job in nuisance_jobs
if job is not None
)

self._assign_joblib_nuisance_results(results)

treatment_jobs: list[_ParallelJoblibSpecification] = []
for treatment_variant in range(1, self.n_variants):
pseudo_outcomes = self._pseudo_outcome(
X=X,
Expand All @@ -131,15 +150,21 @@ def fit(
treatment_variant=treatment_variant,
)

self.fit_treatment(
X=X,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL],
cv=cv_split_indices,
treatment_jobs.append(
self._treatment_joblib_specifications(
X=X,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL],
cv=cv_split_indices,
)
)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs
)
self._assign_joblib_treatment_results(results)
return self

def predict(
Expand Down
72 changes: 48 additions & 24 deletions metalearners/rlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# # SPDX-License-Identifier: BSD-3-Clause

import numpy as np
from joblib import Parallel, delayed
from sklearn.metrics import log_loss, root_mean_squared_error
from typing_extensions import Self

Expand All @@ -23,7 +24,9 @@
TREATMENT,
TREATMENT_MODEL,
MetaLearner,
_fit_cross_fit_estimator_joblib,
_ModelSpecifications,
_ParallelJoblibSpecification,
)

OUTCOME_MODEL = "outcome_model"
Expand Down Expand Up @@ -175,25 +178,40 @@ def fit(
else:
cv_split_indices = None

self.fit_nuisance(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
nuisance_jobs: list[_ParallelJoblibSpecification | None] = []

nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
)
)
self.fit_nuisance(
X=X,
y=y,
model_kind=OUTCOME_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][OUTCOME_MODEL],
cv=cv_split_indices,
nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=y,
model_kind=OUTCOME_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][OUTCOME_MODEL],
cv=cv_split_indices,
)
)

parallel = Parallel(n_jobs=n_jobs_base_learners)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job)
for job in nuisance_jobs
if job is not None
)
self._assign_joblib_nuisance_results(results)

treatment_jobs: list[_ParallelJoblibSpecification] = []
for treatment_variant in range(1, self.n_variants):

is_treatment = w == treatment_variant
Expand All @@ -213,15 +231,21 @@ def fit(

X_filtered = index_matrix(X, mask)

self.fit_treatment(
X=X_filtered,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL]
| {_SAMPLE_WEIGHT: weights},
n_jobs_cross_fitting=n_jobs_cross_fitting,
treatment_jobs.append(
self._treatment_joblib_specifications(
X=X_filtered,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL]
| {_SAMPLE_WEIGHT: weights},
n_jobs_cross_fitting=n_jobs_cross_fitting,
)
)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs
)
self._assign_joblib_treatment_results(results)
return self

def predict(
Expand Down
92 changes: 60 additions & 32 deletions metalearners/xlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# # SPDX-License-Identifier: BSD-3-Clause

import numpy as np
from joblib import Parallel, delayed
from typing_extensions import Self

from metalearners._typing import Matrix, OosMethod, Vector
Expand All @@ -20,7 +21,9 @@
VARIANT_OUTCOME_MODEL,
MetaLearner,
_ConditionalAverageOutcomeMetaLearner,
_fit_cross_fit_estimator_joblib,
_ModelSpecifications,
_ParallelJoblibSpecification,
)

CONTROL_EFFECT_MODEL = "control_effect_model"
Expand Down Expand Up @@ -98,51 +101,76 @@ def fit(
cv_split_indices = None
cvs.append(cv_split_indices)

# TODO: Consider multiprocessing
nuisance_jobs: list[_ParallelJoblibSpecification | None] = []
for treatment_variant in range(self.n_variants):
self.fit_nuisance(
X=index_matrix(X, self._treatment_variants_indices[treatment_variant]),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=index_matrix(
X, self._treatment_variants_indices[treatment_variant]
),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
cv=cvs[treatment_variant],
)
)

nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
cv=cvs[treatment_variant],
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
)
)

self.fit_nuisance(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
parallel = Parallel(n_jobs=n_jobs_base_learners)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job)
for job in nuisance_jobs
if job is not None
)
self._assign_joblib_nuisance_results(results)

treatment_jobs: list[_ParallelJoblibSpecification] = []
for treatment_variant in range(1, self.n_variants):
imputed_te_control, imputed_te_treatment = self._pseudo_outcome(
X, y, w, treatment_variant
)

self.fit_treatment(
X=index_matrix(X, self._treatment_variants_indices[treatment_variant]),
y=imputed_te_treatment,
model_kind=TREATMENT_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_EFFECT_MODEL],
cv=cvs[treatment_variant],
treatment_jobs.append(
self._treatment_joblib_specifications(
X=index_matrix(
X, self._treatment_variants_indices[treatment_variant]
),
y=imputed_te_treatment,
model_kind=TREATMENT_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_EFFECT_MODEL],
cv=cvs[treatment_variant],
)
)
self.fit_treatment(
X=index_matrix(X, self._treatment_variants_indices[0]),
y=imputed_te_control,
model_kind=CONTROL_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][CONTROL_EFFECT_MODEL],
cv=cvs[0],

treatment_jobs.append(
self._treatment_joblib_specifications(
X=index_matrix(X, self._treatment_variants_indices[0]),
y=imputed_te_control,
model_kind=CONTROL_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][CONTROL_EFFECT_MODEL],
cv=cvs[0],
)
)

results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs
)
self._assign_joblib_treatment_results(results)
return self

def predict(
Expand Down
8 changes: 6 additions & 2 deletions tests/test_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def test_learner_synthetic(
observed_outcomes_train,
treatment_train,
synchronize_cross_fitting=True,
n_jobs_base_learners=-1,
)

# In sample CATEs
Expand Down Expand Up @@ -236,6 +237,7 @@ def test_learner_synthetic_oos_ate(metalearner, treatment_kind, request):
observed_outcomes_train,
treatment_train,
synchronize_cross_fitting=True,
n_jobs_base_learners=-1,
)
for oos_method in _OOS_WHITELIST:
cate_estimates = learner.predict(
Expand Down Expand Up @@ -427,12 +429,14 @@ def test_x_t_conditional_average_outcomes(outcome_kind, is_oos, request):
observed_outcomes_train,
treatment_train,
synchronize_cross_fitting=False,
n_jobs_base_learners=-1,
)
xlearner.fit(
covariates_train,
observed_outcomes_train,
treatment_train,
synchronize_cross_fitting=False,
n_jobs_base_learners=-1,
)

if not is_oos:
Expand Down Expand Up @@ -707,7 +711,7 @@ def test_model_reusage(outcome_kind, request):
n_variants=len(np.unique(treatment)),
nuisance_model_params=nuisance_learner_params,
)
tlearner.fit(covariates, observed_outcomes, treatment)
tlearner.fit(covariates, observed_outcomes, treatment, n_jobs_base_learners=-1)
xlearner = XLearner(
is_classification=is_classification,
n_variants=len(np.unique(treatment)),
Expand All @@ -731,7 +735,7 @@ def test_model_reusage(outcome_kind, request):
tlearner_pred_before_refitting = tlearner.predict_conditional_average_outcomes(
covariates, False
)
xlearner.fit(covariates, observed_outcomes, treatment)
xlearner.fit(covariates, observed_outcomes, treatment, n_jobs_base_learners=-1)
np.testing.assert_allclose(
tlearner.predict_conditional_average_outcomes(covariates, False),
tlearner_pred_before_refitting,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_metalearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ def test_validate_n_folds_synchronize(n_folds, success):

@pytest.mark.parametrize(
"implementation",
[TLearner],
[TLearner, XLearner, RLearner, DRLearner],
)
def test_n_jobs_base_learners(implementation, rng):
n_variants = 5
Expand Down

0 comments on commit b62ad9d

Please sign in to comment.