Skip to content

Commit

Permalink
Merge branch 'main' into cfe_classes_
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancescMartiEscofetQC committed Jun 14, 2024
2 parents 642cb2e + 8efba91 commit 32c721d
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 129 deletions.
19 changes: 13 additions & 6 deletions metalearners/_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# # Copyright (c) QuantCo 2024-2024
# # SPDX-License-Identifier: BSD-3-Clause

import operator
from collections.abc import Callable
from inspect import signature
from operator import le, lt
Expand Down Expand Up @@ -66,14 +65,22 @@ def validate_all_vectors_same_index(*args: Vector) -> None:


def validate_number_positive(
value: int | float, name: str, strict: bool = False
value: int | float, name: str, strict: bool = True
) -> None:
"""Validates that a number is positive.
If ``strict = True`` then it validates that the number is strictly positive.
"""
if strict:
comparison = operator.lt
if value <= 0:
raise ValueError(
f"{name} was expected to be strictly positive but was {value}."
)
else:
comparison = operator.le
if comparison(value, 0):
raise ValueError(f"{name} was expected to be positive but was {value}.")
if value < 0:
raise ValueError(
f"{name} was expected to be positive or zero but was {value}."
)


def check_propensity_score(
Expand Down
2 changes: 1 addition & 1 deletion metalearners/cross_fit_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _validate_data_match_prior_split(
) -> None:
"""Validate whether the previous test_indices and the passed data are based on the
same number of observations."""
validate_number_positive(n_observations, "n_observations", strict=False)
validate_number_positive(n_observations, "n_observations", strict=True)
if test_indices is None:
return
expected_n_observations = sum(len(x) for x in test_indices)
Expand Down
71 changes: 48 additions & 23 deletions metalearners/drlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# # SPDX-License-Identifier: BSD-3-Clause

import numpy as np
from joblib import Parallel, delayed
from typing_extensions import Self

from metalearners._typing import Matrix, OosMethod, Vector
Expand All @@ -22,7 +23,9 @@
VARIANT_OUTCOME_MODEL,
MetaLearner,
_ConditionalAverageOutcomeMetaLearner,
_fit_cross_fit_estimator_joblib,
_ModelSpecifications,
_ParallelJoblibSpecification,
)

_EPSILON = 1e-09
Expand Down Expand Up @@ -102,27 +105,43 @@ def fit(
else:
cv_split_indices = None

# TODO: Consider multiprocessing
nuisance_jobs: list[_ParallelJoblibSpecification | None] = []
for treatment_variant in range(self.n_variants):
self.fit_nuisance(
X=index_matrix(X, self._treatment_variants_indices[treatment_variant]),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=index_matrix(
X, self._treatment_variants_indices[treatment_variant]
),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
)
)

nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
)
)

self.fit_nuisance(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
parallel = Parallel(n_jobs=n_jobs_base_learners)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job)
for job in nuisance_jobs
if job is not None
)

self._assign_joblib_nuisance_results(results)

treatment_jobs: list[_ParallelJoblibSpecification] = []
for treatment_variant in range(1, self.n_variants):
pseudo_outcomes = self._pseudo_outcome(
X=X,
Expand All @@ -131,15 +150,21 @@ def fit(
treatment_variant=treatment_variant,
)

self.fit_treatment(
X=X,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL],
cv=cv_split_indices,
treatment_jobs.append(
self._treatment_joblib_specifications(
X=X,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL],
cv=cv_split_indices,
)
)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs
)
self._assign_joblib_treatment_results(results)
return self

def predict(
Expand Down
72 changes: 48 additions & 24 deletions metalearners/rlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# # SPDX-License-Identifier: BSD-3-Clause

import numpy as np
from joblib import Parallel, delayed
from sklearn.metrics import log_loss, root_mean_squared_error
from typing_extensions import Self

Expand All @@ -23,7 +24,9 @@
TREATMENT,
TREATMENT_MODEL,
MetaLearner,
_fit_cross_fit_estimator_joblib,
_ModelSpecifications,
_ParallelJoblibSpecification,
)

OUTCOME_MODEL = "outcome_model"
Expand Down Expand Up @@ -175,25 +178,40 @@ def fit(
else:
cv_split_indices = None

self.fit_nuisance(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
nuisance_jobs: list[_ParallelJoblibSpecification | None] = []

nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
cv=cv_split_indices,
)
)
self.fit_nuisance(
X=X,
y=y,
model_kind=OUTCOME_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][OUTCOME_MODEL],
cv=cv_split_indices,
nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=y,
model_kind=OUTCOME_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][OUTCOME_MODEL],
cv=cv_split_indices,
)
)

parallel = Parallel(n_jobs=n_jobs_base_learners)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job)
for job in nuisance_jobs
if job is not None
)
self._assign_joblib_nuisance_results(results)

treatment_jobs: list[_ParallelJoblibSpecification] = []
for treatment_variant in range(1, self.n_variants):

is_treatment = w == treatment_variant
Expand All @@ -213,15 +231,21 @@ def fit(

X_filtered = index_matrix(X, mask)

self.fit_treatment(
X=X_filtered,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL]
| {_SAMPLE_WEIGHT: weights},
n_jobs_cross_fitting=n_jobs_cross_fitting,
treatment_jobs.append(
self._treatment_joblib_specifications(
X=X_filtered,
y=pseudo_outcomes,
model_kind=TREATMENT_MODEL,
model_ord=treatment_variant - 1,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_MODEL]
| {_SAMPLE_WEIGHT: weights},
n_jobs_cross_fitting=n_jobs_cross_fitting,
)
)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs
)
self._assign_joblib_treatment_results(results)
return self

def predict(
Expand Down
92 changes: 60 additions & 32 deletions metalearners/xlearner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# # SPDX-License-Identifier: BSD-3-Clause

import numpy as np
from joblib import Parallel, delayed
from typing_extensions import Self

from metalearners._typing import Matrix, OosMethod, Vector
Expand All @@ -20,7 +21,9 @@
VARIANT_OUTCOME_MODEL,
MetaLearner,
_ConditionalAverageOutcomeMetaLearner,
_fit_cross_fit_estimator_joblib,
_ModelSpecifications,
_ParallelJoblibSpecification,
)

CONTROL_EFFECT_MODEL = "control_effect_model"
Expand Down Expand Up @@ -98,51 +101,76 @@ def fit(
cv_split_indices = None
cvs.append(cv_split_indices)

# TODO: Consider multiprocessing
nuisance_jobs: list[_ParallelJoblibSpecification | None] = []
for treatment_variant in range(self.n_variants):
self.fit_nuisance(
X=index_matrix(X, self._treatment_variants_indices[treatment_variant]),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=index_matrix(
X, self._treatment_variants_indices[treatment_variant]
),
y=y[self._treatment_variants_indices[treatment_variant]],
model_kind=VARIANT_OUTCOME_MODEL,
model_ord=treatment_variant,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
cv=cvs[treatment_variant],
)
)

nuisance_jobs.append(
self._nuisance_joblib_specifications(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][VARIANT_OUTCOME_MODEL],
cv=cvs[treatment_variant],
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
)
)

self.fit_nuisance(
X=X,
y=w,
model_kind=PROPENSITY_MODEL,
model_ord=0,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[NUISANCE][PROPENSITY_MODEL],
parallel = Parallel(n_jobs=n_jobs_base_learners)
results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job)
for job in nuisance_jobs
if job is not None
)
self._assign_joblib_nuisance_results(results)

treatment_jobs: list[_ParallelJoblibSpecification] = []
for treatment_variant in range(1, self.n_variants):
imputed_te_control, imputed_te_treatment = self._pseudo_outcome(
X, y, w, treatment_variant
)

self.fit_treatment(
X=index_matrix(X, self._treatment_variants_indices[treatment_variant]),
y=imputed_te_treatment,
model_kind=TREATMENT_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_EFFECT_MODEL],
cv=cvs[treatment_variant],
treatment_jobs.append(
self._treatment_joblib_specifications(
X=index_matrix(
X, self._treatment_variants_indices[treatment_variant]
),
y=imputed_te_treatment,
model_kind=TREATMENT_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][TREATMENT_EFFECT_MODEL],
cv=cvs[treatment_variant],
)
)
self.fit_treatment(
X=index_matrix(X, self._treatment_variants_indices[0]),
y=imputed_te_control,
model_kind=CONTROL_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][CONTROL_EFFECT_MODEL],
cv=cvs[0],

treatment_jobs.append(
self._treatment_joblib_specifications(
X=index_matrix(X, self._treatment_variants_indices[0]),
y=imputed_te_control,
model_kind=CONTROL_EFFECT_MODEL,
model_ord=treatment_variant - 1,
n_jobs_cross_fitting=n_jobs_cross_fitting,
fit_params=qualified_fit_params[TREATMENT][CONTROL_EFFECT_MODEL],
cv=cvs[0],
)
)

results = parallel(
delayed(_fit_cross_fit_estimator_joblib)(job) for job in treatment_jobs
)
self._assign_joblib_treatment_results(results)
return self

def predict(
Expand Down
Loading

0 comments on commit 32c721d

Please sign in to comment.