-
Notifications
You must be signed in to change notification settings - Fork 108
Distributed TensorFlow On Shifu
*Logo of TensorFlow is copyright from TensorFlow Org.
TensorFlow is widely used with a very good community and lots of new Deep Learning features under development of Google. Many state-of-art machine learning models are implemented in it and most of modelers are familiar to use it in Deep Learning area. While a ML pipeline was defined well in Shifu but not being supported in TensorFlow.
By supporting TensorFlow into Shifu pipeline, users can seamlessly leverage both powerful existing features of Shifu pipeline and fruitful modeling methods in TensorFlow. While even in Shifu pipeline, Python based model graph defined in TensorFlow will still be exposed in Python script to users for customization which proves power and flexibility of such pipeline integration. Besides that, native distributed TensorFlow is supported in Hadoop YARN, which means users can still leverage their Hadoop clusters to run TensorFlow model training without any new Python runtime dependency or new cluster dependency.
- Download Shifu code from https://github.com/ShifuML/shifu/tree/develop
- 'mvn install' under Shifu project. In this step, make sure to install shifu.jar into local maven repository successfully
- Download shifu-tensorflow from https://github.com/ShifuML/shifu-tensorflow/tree/master
- Run './package-shifu.sh' under shifu-tensorflow
- And then you will get a "shifu--SNAPSHOT-hdp-yarn.tar.gz" which is Shifu with TensorFlow
- This is a bigger lib to including existing Shifu features and distributed TensorFlow support in Hadoop
Running TensorFlow in SHIFU is very easy. The only change from Shifu NN is: ModelConfig.json: "train"->"algorithm" to "TensorFlow".
Compared with Shifu native NN, more parameters can be tuned in train#params part if 'TensorFlow' is selected in train#algorithm:
"train" : {
"baggingNum" : 1,
"baggingWithReplacement" : true,
"baggingSampleRate" : 1.0,
"validSetRate" : 0.2,
"numTrainEpochs" : 200,
"isContinuous" : true,
"algorithm" : "TensorFlow",
"params" : {
"NumHiddenLayers" : 1,
"ActivationFunc" : [ "tanh" ],
"NumHiddenNodes" : [ 50 ],
"LearningRate" : 0.002,
"MiniBatchs" : 128,
"RegularizedConstant" : 0.01,
"Propagation": 'Adam'
}
},
- train::baggingNum: bagging of TensorFlow is not supported yet, working on it, no need set baggingNum, baggingWithReplacement and baggingSampleRate now;
- train::numTrainEpochs: how many training epochs in the training, this is current stop hook inside of TensorFlow training;
- train::isContinuous: if continuous training, set it to true is helpful if some occasional failure, restart job will start from failure point.
- train::params::LearningRate: Learning rate isn't like Native Shifu NN, recommendation value is much smaller like 0.002.
- train::params::MiniBatchs: mini batch size, by default is 128;
- train::params::RegularizedConstant: l2 regularization value in TensorFlow training, by default is 0.01;
- train::params::Propagation: optimizer used in TensorFlow training, by default 'Adam' is AdamOptimizer, others can be specified 'B'(GradientDescentOptimizer), 'AdaGrad'(AdagradOptimizer).
Basically, worker/ps timeouts, worker/ps memory could be changed in SHIFU_HOME/conf/global-default.xml.
- shifu.task.executor.jvm.opts: This is each worker's java memory. Not means memory that TensorFlow could use.
- shifu.ps.memory: Memory of parameter server
- shifu.ps.vcores: CPU Core of parameter server
- shifu.ps.instances: Number of parameter server
- shifu.worker.memory: Memory of each worker
- shifu.worker.vcores: CPU Core of each worker
- shifu.worker.instances: number of workers. We DO NOT recommend user to change it, instead Shifu will auto set it by the number of input files.
- If you want to use more workers, please split input data files into small pieces by this: "shifu norm -shuffle -Dshifu.norm.shuffle.size=(NUMBER OF WORKER YOUR WANT TO USE)"
Basic DNN feature is supported well in Shifu ModelConfig.json API while some other Deep Learning features like CNN/LSTM/WDL/DeepFM cannot be supported well in this API.
Keras model definition is supported and it is flexible to change Keras model in this python file: SHIFU_HOME/scripts/distributed_tf_keras.py:
To support such script can be customized in each model training, in each modelset folder, you can copy distributed_tf_keras.py to your modelset folder and this can make Keras edited model be independent in different modelset.
- Keras Model override in get_model function: https://github.com/ShifuML/shifu/blob/develop/src/main/python/distributed_tf_keras.py#L43-L73
- 'Inputs' and 'predictions' layer please don't changed it, it is linked with input and output;
- 'loss' now only supports string type of loss definition; optimizers only 'adam', 'adagrad', 'sgd' are supported.
From user perspective, it is flexible to change TensorFlow graph definition in this python file: SHIFU_HOME/scripts/distributed_tf_tensor.py:
- Change parameter in ModelConfig.json 'train#params#TF_Type' to 'tensor', by default is 'keras'
- Update this line from True to False. https://github.com/ShifuML/shifu/blob/develop/src/main/python/distributed_tf_tensor.py#L30
- Overwrite your own graph here: https://github.com/ShifuML/shifu/blob/develop/src/main/python/distributed_tf_tensor.py#L57-L95
Or change it directly if users are familiar with TensorFlow raw API and distributed TensorFlow API: Below are reference URLs:
It has PS(parameter servers) to distribute a big graph into multiple piece so that mitigate network traffic load. PS is used for accumulate gradients and update model weights. Each work pull newest model and calculate gradients and send to PS.
We prepare a backup pool. When some worker dead, we use containers in backup pool to be replacement
We did comparison with original Shifu NN model over same dataset. Model recall and precision are same, which means our solution is correct theoretically.
As YARN application, to check or debug it, all logs from TensorFlow python process is redirected to container syserr and sysout log files. Please check them in YARN web UI. Or you can log all exceptions by this command after job is done or failed:
yarn logs -applicationId <app_id> > logs
Please be noted, if your ps count is 4, the first container which is application master, the 2nd - 5th containers for ps job. The others are worker containers.