Skip to content

Commit

Permalink
Merge pull request #2 from owkin/thibault/debug-docker
Browse files Browse the repository at this point in the history
debug: mode docker
  • Loading branch information
jeandut authored Mar 27, 2023
2 parents 0d5cbe6 + a797955 commit 4c58812
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 252 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,6 @@ dmypy.json

# Pyre type checker
.pyre/

# other files
.DS_Store
Empty file added fedeca/__init__.py
Empty file.
99 changes: 87 additions & 12 deletions fedeca/algorithms/torch_webdisco_algo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import abc
import math
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple

import numpy as np
import torch
Expand All @@ -24,6 +21,7 @@

from substrafl.schemas import StrategyName
from fedeca.utils.tensor_utils import build_X_y
from fedeca.utils.moments_utils import compute_uncentered_moment


class TorchWebDiscoAlgo(TorchAlgo):
Expand Down Expand Up @@ -63,7 +61,7 @@ def __init__(
standardize_data: bool = True,
tol: float = 1e-16,
penalizer: float = 0.0,
l1_ratio: float = 0.,
l1_ratio: float = 0.0,
with_batch_norm_parameters: bool = False,
use_gpu: bool = True,
*args,
Expand Down Expand Up @@ -151,7 +149,11 @@ def compute_updates(

global_survival_statistics = shared_state["global_survival_statistics"]
distinct_event_times = global_survival_statistics["distinct_event_times"]
X, y = build_X_y(self, datasamples, shared_state["moments"],)
X, y = build_X_y(
self,
datasamples,
shared_state["moments"],
)

# Need pytorch conversion to be fed to model
X = torch.from_numpy(X)
Expand All @@ -175,9 +177,7 @@ def compute_updates(
risk_phi.append(factor * expbetaTx_stable[Rt].sum(axis=(0, 1)))
common_block = np.multiply(expbetaTx_stable[Rt], X[Rt])
risk_phi_x.append(factor * common_block.sum(axis=0))
risk_phi_x_x.append(
factor * np.einsum("ij,ik->jk", common_block, X[Rt])
)
risk_phi_x_x.append(factor * np.einsum("ij,ik->jk", common_block, X[Rt]))
local_phi_stats = {}
local_phi_stats["risk_phi"] = risk_phi
local_phi_stats["risk_phi_x"] = risk_phi_x
Expand All @@ -189,6 +189,84 @@ def compute_updates(
"moments": shared_state["moments"],
}

@remote_data
def local_uncentered_moments(self, datasamples, shared_state=None):
"""Compute the local uncentered moments.
This method is transformed by the decorator to meet Substra API,
and is executed in the training nodes. See build_graph.
Parameters
----------
datasamples : pd.DataFrame
Dataframe returned by the opener.
shared_state : None, optional
Given by the aggregation node, here nothing, by default None.
Returns
-------
dict
Local results to be shared via shared_state to the aggregation node.
"""
del shared_state # unused
results = {f"moment{k}": compute_uncentered_moment(datasamples, k) for k in range(1, 3)}
results["n_samples"] = datasamples.select_dtypes(include=np.number).count()
return results

@remote_data
def _compute_local_survival_statistics(self, datasamples, shared_state):
"""Computes local statistics and Dt for all ts in the distinct event times.
Those statistics are useful for to compute the global statistics that
will be used throughout training.
The definition of :math:`\\mathcal{D}_t` (Dt) associated to the value t is the set
of indices of all the individuals that experience an event at time t.
More formally:
.. math::
\\mathcal{D}_{t} = \{ i \in [0, n] | e_i = 0, t_i = t\}
Parameters
----------
tokens_list : list
Normally a list of size one since we should use all samples in one batch.
Returns
-------
dict
Where we can find the following keys 'sum_features_on_events', 'distinct_event_times',
'number_events_by_time' and 'total_number_samples', where:
- "sum_features_on_events" contains the sum of the features
across samples for all the distinct event times of the given clients,
i.e. a single vector per time stamp
- "distinct_event_times": list of floating values containing the
unique times at which at least 1 death is registered in the
current dataset
- "number_events_by_time": number of events occurring at each distinct_event_times
- "total_number_samples": total number of samples
"""
X, y = build_X_y(self, datasamples, shared_state)

distinct_event_times = np.unique(y[y > 0]).tolist()

sum_features_on_events = np.zeros(X.shape[1:])
number_events_by_time = []

for t in distinct_event_times:
Dt = np.where(y == t)[0]
num_events = len(Dt)
sum_features_on_events += X[Dt, :].sum(axis=0)
number_events_by_time.append(num_events)

return {
"sum_features_on_events": sum_features_on_events,
"distinct_event_times": distinct_event_times,
"number_events_by_time": number_events_by_time,
"total_number_samples": X.shape[0],
"moments": shared_state,
}

@remote_data
def train(
self,
Expand Down Expand Up @@ -259,9 +337,7 @@ def elastic_net_penalty(beta, a):
self.server_state["current_weights"], alpha
)

inv_h_dot_g_T = spsolve(
-hessian, gradient, assume_a="pos", check_finite=False
)
inv_h_dot_g_T = spsolve(-hessian, gradient, assume_a="pos", check_finite=False)

norm_delta = norm(inv_h_dot_g_T)

Expand All @@ -288,7 +364,6 @@ def elastic_net_penalty(beta, a):
self.server_state["converging"] = converging
self.server_state["success"] = success


def _local_predict(self, predict_dataset: torch.utils.data.Dataset, predictions_path):
"""Executes the following operations:
* Create the torch dataloader using the batch size given at the ``__init__`` of the class
Expand Down
64 changes: 41 additions & 23 deletions fedeca/scripts/fl_iptw.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import torch
from torch import nn
from fedeca.utils.survival_utils import generate_survival_data

# from substrafl.index_generator import NpIndexGenerator
from substrafl.algorithms.pytorch import TorchNewtonRaphsonAlgo
from substrafl.strategies import NewtonRaphson
Expand Down Expand Up @@ -39,8 +40,8 @@
# Ohers backend_types are:
# "docker" mode where computations run locally in docker containers
# "remote" mode where computations run remotely (you need to have a deployed platform for that)
client_0 = Client(backend_type="subprocess")
client_1 = Client(backend_type="subprocess")
client_0 = Client(backend_type="docker")
client_1 = Client(backend_type="docker")
# To run in remote mode you have to also use the function `Client.login(username, password)`

clients = {
Expand All @@ -63,9 +64,9 @@
os.makedirs(data_path / "center1", exist_ok=True)


df0, _ = generate_survival_data(na_proportion=0., ncategorical=5, ndim=10, seed=seed, n_samples=200)
df0, _ = generate_survival_data(na_proportion=0.0, ncategorical=5, ndim=10, seed=seed, n_samples=200)
df0["treated"] = 1
df1, _ = generate_survival_data(na_proportion=0., ncategorical=5, ndim=10, seed=seed)
df1, _ = generate_survival_data(na_proportion=0.0, ncategorical=5, ndim=10, seed=seed)
df1["treated"] = 0

df0.to_csv(data_path / "center0" / "data.csv", index=False)
Expand All @@ -77,7 +78,6 @@
datasample_keys = {}

for i, org_id in enumerate(DATA_PROVIDER_ORGS_ID):

client = clients[org_id]

permissions_dataset = Permissions(public=False, authorized_ids=[ALGO_ORG_ID])
Expand Down Expand Up @@ -109,7 +109,11 @@

# The Dependency object is instantiated in order to install the right libraries in
# the Python environment of each organization.
metric_deps = Dependency(pypi_dependencies=["numpy==1.23.1", "scikit-learn==1.1.1"])
metric_deps = Dependency(
pypi_dependencies=["numpy==1.23.1", "scikit-learn==1.1.1"],
local_dependencies=[pathlib.Path.cwd().parent.parent],
editable_mode=True,
)


def accuracy(datasamples, predictions_path):
Expand Down Expand Up @@ -144,7 +148,9 @@ def forward(self, x, eval=False):


class TorchDataset(torch.utils.data.Dataset):
def __init__(self, datasamples, is_inference: bool, target_columns=["treated"], columns_to_drop=["T", "E"], dtype="float32"):
def __init__(
self, datasamples, is_inference: bool, target_columns=["treated"], columns_to_drop=["T", "E"], dtype="float32"
):
self.data = datasamples
self.is_inference = is_inference
self.target_columns = target_columns
Expand Down Expand Up @@ -175,7 +181,10 @@ def __init__(self):
)


strategy = NewtonRaphson(damping_factor=0.8)
strategy = NewtonRaphson(
damping_factor=0.8,
algo=MyAlgo(),
)


aggregation_node = AggregationNode(ALGO_ORG_ID)
Expand All @@ -184,7 +193,6 @@ def __init__(self):
test_data_nodes = list()

for org_id in DATA_PROVIDER_ORGS_ID:

# Create the Train Data Node (or training task) and save it in a list
train_data_node = TrainDataNode(
organization_id=org_id,
Expand All @@ -204,20 +212,21 @@ def __init__(self):


# Test at the end of every round
my_eval_strategy = None#EvaluationStrategy(test_data_nodes=test_data_nodes, eval_frequency=1)
my_eval_strategy = None # EvaluationStrategy(test_data_nodes=test_data_nodes, eval_frequency=1)


# A round is defined by a local training step followed by an aggregation
# operation
NUM_ROUNDS = 0
client_to_dowload_from = DATA_PROVIDER_ORGS_ID[0]


# The Dependency object is instantiated in order to install the right
# libraries in the Python environment of each organization.
algo_deps = Dependency(pypi_dependencies=["numpy==1.23.1", "torch==1.11.0"])

compute_plan = execute_experiment(
client=clients[ALGO_ORG_ID],
algo=MyAlgo(),
strategy=strategy,
train_data_nodes=train_data_nodes,
evaluation_strategy=my_eval_strategy,
Expand All @@ -232,7 +241,6 @@ def __init__(self):
print(performances_df[["worker", "round_idx", "performance"]])


client_to_dowload_from = DATA_PROVIDER_ORGS_ID[0]
round_idx = None

algo_nr_files_folder = str(pathlib.Path.cwd() / "tmp" / "algo_nr_files")
Expand Down Expand Up @@ -269,8 +277,8 @@ def __init__(self):
print(accuracy_score(y[200:], y_pred[200:] > 0.5))


#assert np.allclose(LR.coef_, model.fc1.weight.detach().numpy(), atol=1e-1)
#assert np.allclose(LR.intercept_, model.fc1.bias.detach().numpy(), atol=1e-1)
# assert np.allclose(LR.coef_, model.fc1.weight.detach().numpy(), atol=1e-1)
# assert np.allclose(LR.intercept_, model.fc1.bias.detach().numpy(), atol=1e-1)


# Ok now that we have the propensity model downloaded we need to use it in the next compute plan
Expand Down Expand Up @@ -310,14 +318,17 @@ def __init__(self):
)


strategy = WebDisco(standardize_data=False)
strategy = WebDisco(
standardize_data=False,
algo=MyAlgo(),
)


def c_index(datasamples, predictions_path):
times_true = datasamples["T"]
events = datasamples["E"]
y_pred = np.load(predictions_path)
c_index = lifelines.utils.concordance_index(times_true, - y_pred, events)
c_index = lifelines.utils.concordance_index(times_true, -y_pred, events)
return c_index


Expand All @@ -339,10 +350,14 @@ def c_index(datasamples, predictions_path):

# The Dependency object is instantiated in order to install the right
# libraries in the Python environment of each organization.
algo_deps = Dependency(pypi_dependencies=["numpy==1.23.1", "torch==1.11.0"])

algo_deps = Dependency(
pypi_dependencies=["numpy==1.23.1", "torch==1.11.0"],
local_dependencies=[pathlib.Path.cwd().parent.parent],
editable_mode=True,
)
compute_plan = execute_experiment(
client=clients[ALGO_ORG_ID],
algo=MyAlgo(),
strategy=strategy,
train_data_nodes=train_data_nodes,
evaluation_strategy=my_eval_strategy,
Expand All @@ -352,7 +367,9 @@ def c_index(datasamples, predictions_path):
dependencies=algo_deps,
)
tasks = clients[ALGO_ORG_ID].list_task()
import pdb; pdb.set_trace()
import pdb

pdb.set_trace()
performances_df = pd.DataFrame(client.get_performances(compute_plan.key).dict())
print("\nPerformance Table: \n")
print(performances_df[["worker", "round_idx", "performance"]])
Expand All @@ -369,7 +386,7 @@ def c_index(datasamples, predictions_path):
print(model.fc1.weight)


class MockSelf():
class MockSelf:
def __init__(self):
self._target_cols = ["T", "E"]
self._duration_col = "T"
Expand All @@ -386,16 +403,17 @@ def __init__(self):
# Need pytorch conversion to be fed to model
X = torch.from_numpy(X)
print("C-index:")
print(lifelines.utils.concordance_index(df["T"], - model(X).detach().numpy(), df["E"]))
print(lifelines.utils.concordance_index(df["T"], -model(X).detach().numpy(), df["E"]))

from lifelines import CoxPHFitter

cph = CoxPHFitter()
df = df.drop(columns=["time_multiplier"])
cph.fit(df, 'T', 'E')
cph.fit(df, "T", "E")
cph.print_summary()
print("C-index:")

model.fc1.weight.data = torch.from_numpy(cph.params_.to_numpy() * df.drop(columns=["E", "T"]).std().to_numpy())
print("=====")
print(model.fc1.weight)
print(lifelines.utils.concordance_index(df["T"], - model(X).detach().numpy(), df["E"]))
print(lifelines.utils.concordance_index(df["T"], -model(X).detach().numpy(), df["E"]))
Loading

0 comments on commit 4c58812

Please sign in to comment.