Skip to content

Commit

Permalink
Improved multi-node multi-GPU random forests. (dmlc#4238)
Browse files Browse the repository at this point in the history
* Improved multi-node multi-GPU random forests.

- removed rabit::Broadcast() from each invocation of column sampling
- instead, syncing the PRNG seed when a ColumnSampler() object is constructed
- this makes non-trivial column sampling significantly faster in the distributed case
- refactored distributed GPU tests
- added distributed random forests tests
  • Loading branch information
canonizer authored and RAMitchell committed Mar 12, 2019
1 parent 99a714b commit b833b64
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 216 deletions.
21 changes: 16 additions & 5 deletions src/common/random.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ class ColumnSampler {
float colsample_bylevel_{1.0f};
float colsample_bytree_{1.0f};
float colsample_bynode_{1.0f};
GlobalRandomEngine rng_;

std::shared_ptr<std::vector<int>> ColSample
(std::shared_ptr<std::vector<int>> p_features, float colsample) const {
(std::shared_ptr<std::vector<int>> p_features, float colsample) {
if (colsample == 1.0f) return p_features;
const auto& features = *p_features;
CHECK_GT(features.size(), 0);
Expand All @@ -100,17 +101,24 @@ class ColumnSampler {
auto& new_features = *p_new_features;
new_features.resize(features.size());
std::copy(features.begin(), features.end(), new_features.begin());
std::shuffle(new_features.begin(), new_features.end(), common::GlobalRandom());
std::shuffle(new_features.begin(), new_features.end(), rng_);
new_features.resize(n);
std::sort(new_features.begin(), new_features.end());

// ensure that new_features are the same across ranks
rabit::Broadcast(&new_features, 0);

return p_new_features;
}

public:
/**
* \brief Column sampler constructor.
* \note This constructor synchronizes the RNG seed across processes.
*/
ColumnSampler() {
uint32_t seed = common::GlobalRandom()();
rabit::Broadcast(&seed, sizeof(seed), 0);
rng_.seed(seed);
}

/**
* \brief Initialise this object before use.
*
Expand Down Expand Up @@ -153,6 +161,9 @@ class ColumnSampler {
* \return The sampled feature set.
* \note If colsample_bynode_ < 1.0, this method creates a new feature set each time it
* is called. Therefore, it should be called only once per node.
* \note With distributed xgboost, this function must be called exactly once for the
* construction of each tree node, and must be called the same number of times in each
* process and with the same parameters to return the same feature set across processes.
*/
std::shared_ptr<std::vector<int>> GetFeatureSet(int depth) {
if (colsample_bylevel_ == 1.0f && colsample_bynode_ == 1.0f) {
Expand Down
4 changes: 2 additions & 2 deletions tests/ci_build/test_mgpu.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ cd ..
pytest -v -s --fulltrace -m "(not slow) and mgpu" tests/python-gpu
./testxgboost --gtest_filter=*.MGPU_*

cd tests/distributed-gpu
./runtests-gpu.sh
cd tests/distributed
./runtests-gpu.sh
19 changes: 0 additions & 19 deletions tests/distributed-gpu/runtests-gpu.sh

This file was deleted.

51 changes: 0 additions & 51 deletions tests/distributed-gpu/test_gpu_basic_1x4.py

This file was deleted.

51 changes: 0 additions & 51 deletions tests/distributed-gpu/test_gpu_basic_2x2.py

This file was deleted.

34 changes: 0 additions & 34 deletions tests/distributed-gpu/test_gpu_basic_4x1.py

This file was deleted.

54 changes: 0 additions & 54 deletions tests/distributed-gpu/test_gpu_basic_asym.py

This file was deleted.

113 changes: 113 additions & 0 deletions tests/distributed/distributed_gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Distributed GPU tests."""
import sys
import time
import xgboost as xgb


def run_test(name, params_fun):
"""Runs a distributed GPU test."""
# Always call this before using distributed module
xgb.rabit.init()
rank = xgb.rabit.get_rank()
world = xgb.rabit.get_world_size()

# Load file, file will be automatically sharded in distributed mode.
dtrain = xgb.DMatrix('../../demo/data/agaricus.txt.train')
dtest = xgb.DMatrix('../../demo/data/agaricus.txt.test')

params, n_rounds = params_fun(rank)

# Specify validations set to watch performance
watchlist = [(dtest, 'eval'), (dtrain, 'train')]

# Run training, all the features in training API is available.
# Currently, this script only support calling train once for fault recovery purpose.
bst = xgb.train(params, dtrain, n_rounds, watchlist, early_stopping_rounds=2)

# Have each worker save its model
model_name = "test.model.%s.%d" % (name, rank)
bst.dump_model(model_name, with_stats=True)
time.sleep(2)
xgb.rabit.tracker_print("Finished training\n")

if (rank == 0):
for i in range(0, world):
model_name_root = "test.model.%s.%d" % (name, i)
for j in range(0, world):
if i == j:
continue
with open(model_name_root, 'r') as model_root:
contents_root = model_root.read()
model_name_rank = "test.model.%s.%d" % (name, j)
with open(model_name_rank, 'r') as model_rank:
contents_rank = model_rank.read()
if contents_root != contents_rank:
raise Exception(
('Worker models diverged: test.model.%s.%d '
'differs from test.model.%s.%d') % (name, i, name, j))

xgb.rabit.finalize()


base_params = {
'tree_method': 'gpu_hist',
'max_depth': 2,
'eta': 1,
'verbosity': 0,
'objective': 'binary:logistic'
}


def params_basic_1x4(rank):
return dict(base_params, **{
'n_gpus': 1,
'gpu_id': rank,
}), 20


def params_basic_2x2(rank):
return dict(base_params, **{
'n_gpus': 2,
'gpu_id': 2*rank,
}), 20


def params_basic_4x1(rank):
return dict(base_params, **{
'n_gpus': 4,
'gpu_id': rank,
}), 20


def params_basic_asym(rank):
return dict(base_params, **{
'n_gpus': 1 if rank == 0 else 3,
'gpu_id': rank,
}), 20


rf_update_params = {
'subsample': 0.5,
'colsample_bynode': 0.5
}


def wrap_rf(params_fun):
def wrapped_params_fun(rank):
params, n_estimators = params_fun(rank)
rf_params = dict(rf_update_params, num_parallel_tree=n_estimators)
return dict(params, **rf_params), 1
return wrapped_params_fun


params_rf_1x4 = wrap_rf(params_basic_1x4)

params_rf_2x2 = wrap_rf(params_basic_2x2)

params_rf_4x1 = wrap_rf(params_basic_4x1)

params_rf_asym = wrap_rf(params_basic_asym)


test_name = sys.argv[1]
run_test(test_name, globals()['params_%s' % test_name])
Loading

0 comments on commit b833b64

Please sign in to comment.