Skip to content
This repository has been archived by the owner on Nov 8, 2018. It is now read-only.

Commit

Permalink
Merge pull request #26 from ExpediaInc/master
Browse files Browse the repository at this point in the history
Support for two important Keras features: multiple inputs and customizable metrics.
  • Loading branch information
Joeri Hermans authored May 25, 2017
2 parents 945bf2e + d89480b commit 6f4265e
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 81 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ If you want to run the examples using Apache Spark 2.0.0 and higher. You will ne
This optimizer follows the traditional scheme of training a model, i.e., it uses sequential gradient updates to optimize the parameters. It does this by executing the training procedure on a single Spark executor.

```python
SingleTrainer(model, features_col, label_col, batch_size, optimizer, loss)
SingleTrainer(model, features_col, label_col, batch_size, optimizer, loss, metrics=["accuracy"])
```

### ADAG (Currently Recommended)

DOWNPOUR variant which is able to achieve significantly better statistical performance while being less sensitive to hyperparameters. This optimizer was developed using insights gained while developing this framework. More research regarding parameter staleness is still being conducted to further improve this optimizer.

```python
ADAG(keras_model, worker_optimizer, loss, num_workers=2, batch_size=32,
ADAG(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=12)
```

Expand All @@ -88,7 +88,7 @@ ADAG(keras_model, worker_optimizer, loss, num_workers=2, batch_size=32,
Dynamic SGD, dynamically maintains a learning rate for every worker by incorperating parameter staleness. This optimization scheme is introduced in "Heterogeneity-aware Distributed Parameter Servers" at the SIGMOD 2017 conference [[5]](http://net.pku.edu.cn/~cuibin/Papers/2017SIGMOD.pdf).

```python
DynSGD(keras_model, worker_optimizer, loss, num_workers=2, batch_size=32,
DynSGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
features_col="features", label_col="label", num_epoch=1, communication_window=10)
```

Expand All @@ -100,7 +100,7 @@ In this section we show the asynchronous version of EASGD. Instead of waiting on


```python
AEASGD(keras_model, worker_optimizer, loss, num_workers, batch_size, features_col,
AEASGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size, features_col,
label_col, num_epoch, communication_window, rho, learning_rate)
```

Expand All @@ -109,7 +109,7 @@ AEASGD(keras_model, worker_optimizer, loss, num_workers, batch_size, features_co
Asynchronous EAMSGD is a variant of asynchronous EASGD. It is based on the Nesterov's momentum scheme, where the update of the local worker is modified to incorepare a momentum term [[2]](https://arxiv.org/pdf/1412.6651.pdf) .

```python
EAMSGD(keras_model, worker_optimizer, loss, num_workers, batch_size,
EAMSGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size,
features_col, label_col, num_epoch, communication_window, rho,
learning_rate, momentum)
```
Expand All @@ -119,7 +119,7 @@ EAMSGD(keras_model, worker_optimizer, loss, num_workers, batch_size,
An asynchronous stochastic gradient descent procedure introduced by Dean et al., supporting a large number of model replicas and leverages adaptive learning rates. This implementation is based on the pseudocode provided by [[1]](http://papers.nips.cc/paper/4687-large-scale-distributed-deep-networks.pdf) .

```python
DOWNPOUR(keras_model, worker_optimizer, loss, num_workers, batch_size,
DOWNPOUR(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size,
features_col, label_col, num_epoch, learning_rate, communication_window)
```

Expand All @@ -128,7 +128,7 @@ DOWNPOUR(keras_model, worker_optimizer, loss, num_workers, batch_size,
In ensemble training, we train `n` models in parallel on the same dataset. All models are trained in parallel, but the training of a single model is done in a sequential manner using Keras optimizers. After the training process, one can combine and, for example, average the output of the models.

```python
EnsembleTrainer(keras_model, worker_optimizer, loss, features_col,
EnsembleTrainer(keras_model, worker_optimizer, loss, metrics=["accuracy"], features_col,
label_col, batch_size, num_ensembles)
```

Expand All @@ -137,7 +137,7 @@ EnsembleTrainer(keras_model, worker_optimizer, loss, features_col,
Model averaging is a data parallel technique which will average the trainable parameters of model replicas after every epoch.

```python
AveragingTrainer(keras_model, worker_optimizer, loss, features_col,
AveragingTrainer(keras_model, worker_optimizer, loss, metrics=["accuracy"], features_col,
label_col, num_epoch, batch_size, num_workers)
```

Expand All @@ -149,7 +149,7 @@ In order to submit a job to a remote cluster, you simply run the following code:

```python
# Define the distributed optimization procedure, and its parameters.
trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=20,
trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, metrics=["accuracy"], num_workers=20,
batch_size=32, communication_window=15, num_epoch=1,
features_col="features_normalized_dense", label_col="label_encoded")

Expand Down
5 changes: 3 additions & 2 deletions distkeras/predictors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class ModelPredictor(Predictor):

def __init__(self, keras_model, features_col="features", output_col="prediction"):
super(ModelPredictor, self).__init__(keras_model)
self.features_column = features_col
assert isinstance(features_col, (str, list)), "'features_col' must be a string or a list of strings"
self.features_column = [features_col] if isinstance(features_col, str) else features_col
self.output_column = output_col

def _predict(self, iterator):
Expand All @@ -54,7 +55,7 @@ def _predict(self, iterator):
"""
model = deserialize_keras_model(self.model)
for row in iterator:
features = np.asarray([row[self.features_column]])
features = [np.asarray([row[c]]) for c in self.features_column]
prediction = model.predict(features)
dense_prediction = DenseVector(prediction[0])
new_row = new_dataframe_row(row, self.output_column, dense_prediction)
Expand Down
Loading

0 comments on commit 6f4265e

Please sign in to comment.