From 0579b56ca83f53274f2f18ec8ce694d8072dcf94 Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Wed, 19 Oct 2022 15:18:36 -0700 Subject: [PATCH 01/13] Fix OpenMP thread allocation in Linux --- include/LightGBM/utils/openmp_wrapper.h | 4 ++++ src/c_api.cpp | 6 ++---- src/io/sparse_bin.hpp | 5 ++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/include/LightGBM/utils/openmp_wrapper.h b/include/LightGBM/utils/openmp_wrapper.h index a337fc353b75..4de2d68e9701 100644 --- a/include/LightGBM/utils/openmp_wrapper.h +++ b/include/LightGBM/utils/openmp_wrapper.h @@ -117,6 +117,10 @@ class ThreadExceptionHelper { #define OMP_LOOP_EX_END() #define OMP_THROW_EX() +/** In an external multi-threaded environment, the above methods can return different values on + different threads (in Linux at least). To share allocations between threads, use a constant. **/ +#define MAX_THREAD_ALLOCATION 128 + #endif diff --git a/src/c_api.cpp b/src/c_api.cpp index 20633273134e..4af5c72dc30f 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -1073,7 +1073,6 @@ int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset, if (!data) { Log::Fatal("data cannot be null."); } - const int num_omp_threads = OMP_NUM_THREADS(); auto p_dataset = reinterpret_cast(dataset); auto get_row_fun = RowFunctionFromDenseMatric(data, nrow, ncol, data_type, 1); if (p_dataset->has_raw()) { @@ -1085,7 +1084,7 @@ int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset, for (int i = 0; i < nrow; ++i) { OMP_LOOP_EX_BEGIN(); // convert internal thread id to be unique based on external thread id - const int internal_tid = omp_get_thread_num() + (num_omp_threads * tid); + const int internal_tid = omp_get_thread_num() + (MAX_THREAD_ALLOCATION * tid); auto one_row = get_row_fun(i); p_dataset->PushOneRow(internal_tid, start_row + i, one_row); OMP_LOOP_EX_END(); @@ -1154,7 +1153,6 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, if (!data) { Log::Fatal("data cannot be null."); } - const int num_omp_threads = OMP_NUM_THREADS(); auto p_dataset = reinterpret_cast(dataset); auto get_row_fun = RowFunctionFromCSR(indptr, indptr_type, indices, data, data_type, nindptr, nelem); int32_t nrow = static_cast(nindptr - 1); @@ -1166,7 +1164,7 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, for (int i = 0; i < nrow; ++i) { OMP_LOOP_EX_BEGIN(); // convert internal thread id to be unique based on external thread id - const int internal_tid = omp_get_thread_num() + (num_omp_threads * tid); + const int internal_tid = omp_get_thread_num() + (MAX_THREAD_ALLOCATION * tid); auto one_row = get_row_fun(i); p_dataset->PushOneRow(internal_tid, static_cast(start_row + i), one_row); OMP_LOOP_EX_END(); diff --git a/src/io/sparse_bin.hpp b/src/io/sparse_bin.hpp index 24931fd4eff0..97fcee3dd412 100644 --- a/src/io/sparse_bin.hpp +++ b/src/io/sparse_bin.hpp @@ -82,9 +82,8 @@ class SparseBin : public Bin { ~SparseBin() {} void InitStreaming(uint32_t num_thread) override { - // Each thread needs its own push buffer, so allocate external num_thread times the number of OMP threads - int num_omp_threads = OMP_NUM_THREADS(); - push_buffers_.resize(num_omp_threads * num_thread); + // Each thread needs its own push buffer, so allocate external num_thread times the max number of OMP threads + push_buffers_.resize(MAX_THREAD_ALLOCATION * num_thread); }; void ReSize(data_size_t num_data) override { num_data_ = num_data; } From 8adfab2fb4d6022ea9609438d130787aae0f545c Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Wed, 19 Oct 2022 18:25:08 -0700 Subject: [PATCH 02/13] move constant out of ifdef --- include/LightGBM/utils/openmp_wrapper.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/LightGBM/utils/openmp_wrapper.h b/include/LightGBM/utils/openmp_wrapper.h index 4de2d68e9701..b849b038d29a 100644 --- a/include/LightGBM/utils/openmp_wrapper.h +++ b/include/LightGBM/utils/openmp_wrapper.h @@ -5,6 +5,10 @@ #ifndef LIGHTGBM_OPENMP_WRAPPER_H_ #define LIGHTGBM_OPENMP_WRAPPER_H_ +/** In an external multi-threaded environment, the above methods can return different values on + different threads (in Linux at least). To share allocations between threads, use a constant. **/ +#define MAX_THREAD_ALLOCATION 128 + #ifdef _OPENMP #include @@ -117,10 +121,6 @@ class ThreadExceptionHelper { #define OMP_LOOP_EX_END() #define OMP_THROW_EX() -/** In an external multi-threaded environment, the above methods can return different values on - different threads (in Linux at least). To share allocations between threads, use a constant. **/ -#define MAX_THREAD_ALLOCATION 128 - #endif From c47852316d7a8a24a2daf91d61e8b988aae1ce06 Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Wed, 19 Oct 2022 20:16:46 -0700 Subject: [PATCH 03/13] edit comments --- include/LightGBM/utils/openmp_wrapper.h | 2 +- src/io/sparse_bin.hpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/include/LightGBM/utils/openmp_wrapper.h b/include/LightGBM/utils/openmp_wrapper.h index b849b038d29a..dc505fc28d10 100644 --- a/include/LightGBM/utils/openmp_wrapper.h +++ b/include/LightGBM/utils/openmp_wrapper.h @@ -5,7 +5,7 @@ #ifndef LIGHTGBM_OPENMP_WRAPPER_H_ #define LIGHTGBM_OPENMP_WRAPPER_H_ -/** In an external multi-threaded environment, the above methods can return different values on +/** In an external multi-threaded environment, the below methods can return different values on different threads (in Linux at least). To share allocations between threads, use a constant. **/ #define MAX_THREAD_ALLOCATION 128 diff --git a/src/io/sparse_bin.hpp b/src/io/sparse_bin.hpp index 97fcee3dd412..032a40a07220 100644 --- a/src/io/sparse_bin.hpp +++ b/src/io/sparse_bin.hpp @@ -82,7 +82,8 @@ class SparseBin : public Bin { ~SparseBin() {} void InitStreaming(uint32_t num_thread) override { - // Each thread needs its own push buffer, so allocate external num_thread times the max number of OMP threads + // Each external thread needs its own set of OpenMP push buffers, + // so allocate num_thread times the maximum number of OMP threads per external thread push_buffers_.resize(MAX_THREAD_ALLOCATION * num_thread); }; From c82f5f9f3017d59d2ae9bced6049ec33ac90a76e Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sat, 12 Nov 2022 19:13:09 -0800 Subject: [PATCH 04/13] switch to OMP_GET_STREAMING_MAX_THREADS --- include/LightGBM/dataset.h | 3 ++ include/LightGBM/utils/openmp_wrapper.h | 41 ++++++++++++++++++++++--- src/c_api.cpp | 9 ++++-- src/io/sparse_bin.hpp | 3 +- 4 files changed, 48 insertions(+), 8 deletions(-) diff --git a/include/LightGBM/dataset.h b/include/LightGBM/dataset.h index c60aaf037c71..9cbfacd015ee 100644 --- a/include/LightGBM/dataset.h +++ b/include/LightGBM/dataset.h @@ -459,6 +459,9 @@ class Dataset { int32_t has_queries, int32_t nclasses, int32_t nthreads) { + // Initialize max thread count + OMP_GET_STREAMING_MAX_THREADS(false); + metadata_.Init(num_data, has_weights, has_init_scores, has_queries, nclasses); for (int i = 0; i < num_groups_; ++i) { feature_groups_[i]->InitStreaming(nthreads); diff --git a/include/LightGBM/utils/openmp_wrapper.h b/include/LightGBM/utils/openmp_wrapper.h index dc505fc28d10..4e5766e34eba 100644 --- a/include/LightGBM/utils/openmp_wrapper.h +++ b/include/LightGBM/utils/openmp_wrapper.h @@ -5,9 +5,11 @@ #ifndef LIGHTGBM_OPENMP_WRAPPER_H_ #define LIGHTGBM_OPENMP_WRAPPER_H_ -/** In an external multi-threaded environment, the below methods can return different values on - different threads (in Linux at least). To share allocations between threads, use a constant. **/ -#define MAX_THREAD_ALLOCATION 128 +inline static std::string getEnvVar(const char* name) { + const char *tmp = getenv(name); + std::string env_var(tmp ? tmp : ""); + return env_var; +} #ifdef _OPENMP @@ -21,6 +23,8 @@ #include #include +#define DEFAULT_STREAMING_MAX_THREADS 8 // TODO decide + inline int OMP_NUM_THREADS() { int ret = 1; #pragma omp parallel @@ -38,6 +42,30 @@ inline void OMP_SET_NUM_THREADS(int num_threads) { } } +inline void OMP_SET_DYNAMIC(int dyn) { + omp_set_dynamic(dyn); +} + +inline int OMP_GET_STREAMING_MAX_THREADS(bool use_cached = true) { + // Create a constant max OpenMP thread count per calling thread. + // Allow override by environment variable. Cache to avoid repeated system calls. + + static int max_streaming_threads = -1; + + if (use_cached && max_streaming_threads != -1) { + return max_streaming_threads; + } + + std::string env_var = getEnvVar("OMP_STREAMING_MAX_THREADS"); + int max_threads = DEFAULT_STREAMING_MAX_THREADS; + if (!env_var.empty()) { + max_threads=atoi(env_var.c_str()); + } + + max_streaming_threads = max_threads; + return max_streaming_threads; +} + class ThreadExceptionHelper { public: ThreadExceptionHelper() { @@ -112,6 +140,11 @@ class ThreadExceptionHelper { inline int omp_get_max_threads() __GOMP_NOTHROW {return 1;} inline int omp_get_thread_num() __GOMP_NOTHROW {return 0;} inline int OMP_NUM_THREADS() __GOMP_NOTHROW { return 1; } + inline int omp_get_dynamic() __GOMP_NOTHROW {return 1; } + inline void OMP_SET_DYNAMIC(int) __GOMP_NOTHROW {} + inline int omp_get_thread_limit() __GOMP_NOTHROW {return 1;} + inline int omp_get_active_level() __GOMP_NOTHROW {return 1;} + inline int OMP_GET_STREAMING_MAX_THREADS(bool use_cached = true) __GOMP_NOTHROW {return 1;} #ifdef __cplusplus } // extern "C" #endif @@ -123,6 +156,4 @@ class ThreadExceptionHelper { #endif - - #endif /* LIGHTGBM_OPENMP_WRAPPER_H_ */ diff --git a/src/c_api.cpp b/src/c_api.cpp index 4af5c72dc30f..9ffbae390c16 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -1079,12 +1079,14 @@ int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset, p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } + const int max_omp_threads = OMP_GET_STREAMING_MAX_THREADS(); + OMP_INIT_EX(); #pragma omp parallel for schedule(static) for (int i = 0; i < nrow; ++i) { OMP_LOOP_EX_BEGIN(); // convert internal thread id to be unique based on external thread id - const int internal_tid = omp_get_thread_num() + (MAX_THREAD_ALLOCATION * tid); + const int internal_tid = omp_get_thread_num() + (max_omp_threads * tid); auto one_row = get_row_fun(i); p_dataset->PushOneRow(internal_tid, start_row + i, one_row); OMP_LOOP_EX_END(); @@ -1159,12 +1161,15 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, if (p_dataset->has_raw()) { p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } + + const int max_omp_threads = OMP_GET_STREAMING_MAX_THREADS(); + OMP_INIT_EX(); #pragma omp parallel for schedule(static) for (int i = 0; i < nrow; ++i) { OMP_LOOP_EX_BEGIN(); // convert internal thread id to be unique based on external thread id - const int internal_tid = omp_get_thread_num() + (MAX_THREAD_ALLOCATION * tid); + const int internal_tid = omp_get_thread_num() + (max_omp_threads * tid); auto one_row = get_row_fun(i); p_dataset->PushOneRow(internal_tid, static_cast(start_row + i), one_row); OMP_LOOP_EX_END(); diff --git a/src/io/sparse_bin.hpp b/src/io/sparse_bin.hpp index 032a40a07220..4f24444e4a60 100644 --- a/src/io/sparse_bin.hpp +++ b/src/io/sparse_bin.hpp @@ -84,7 +84,8 @@ class SparseBin : public Bin { void InitStreaming(uint32_t num_thread) override { // Each external thread needs its own set of OpenMP push buffers, // so allocate num_thread times the maximum number of OMP threads per external thread - push_buffers_.resize(MAX_THREAD_ALLOCATION * num_thread); + int max_omp_threads = OMP_GET_STREAMING_MAX_THREADS(); + push_buffers_.resize(max_omp_threads * num_thread); }; void ReSize(data_size_t num_data) override { num_data_ = num_data; } From ef7badeefbcbdfbe8703b759e98741b164cef776 Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sun, 13 Nov 2022 14:21:28 -0800 Subject: [PATCH 05/13] switch to using InitStreaming param --- include/LightGBM/bin.h | 2 +- include/LightGBM/c_api.h | 4 ++- include/LightGBM/dataset.h | 15 +++++++--- include/LightGBM/feature_group.h | 7 +++-- include/LightGBM/utils/openmp_wrapper.h | 39 ++----------------------- src/c_api.cpp | 9 +++--- src/io/sparse_bin.hpp | 5 ++-- tests/cpp_tests/test_stream.cpp | 2 +- 8 files changed, 29 insertions(+), 54 deletions(-) diff --git a/include/LightGBM/bin.h b/include/LightGBM/bin.h index ec330bd94f8c..40511ab18006 100644 --- a/include/LightGBM/bin.h +++ b/include/LightGBM/bin.h @@ -262,7 +262,7 @@ class Bin { * \brief Initialize for pushing. By default, no action needed. * \param num_thread The number of external threads that will be calling the push APIs */ - virtual void InitStreaming(uint32_t /*num_thread*/) { } + virtual void InitStreaming(uint32_t /*num_thread*/, int32_t /*omp_max_threads*/) { } /*! * \brief Push one record * \param tid Thread id diff --git a/include/LightGBM/c_api.h b/include/LightGBM/c_api.h index 287826ea182c..432385bdabc4 100644 --- a/include/LightGBM/c_api.h +++ b/include/LightGBM/c_api.h @@ -153,6 +153,7 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateByReference(const DatasetHandle referenc * \param has_queries Whether the dataset has Metadata queries/groups * \param nclasses Number of initial score classes * \param nthreads Number of external threads that will use the PushRows APIs + * \param omp_max_threads Maximum number of OpenMP threads (-1 for default) * \return 0 when succeed, -1 when failure happens */ LIGHTGBM_C_EXPORT int LGBM_DatasetInitStreaming(DatasetHandle dataset, @@ -160,7 +161,8 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetInitStreaming(DatasetHandle dataset, int32_t has_init_scores, int32_t has_queries, int32_t nclasses, - int32_t nthreads); + int32_t nthreads, + int32_t omp_max_threads); /*! * \brief Push data to existing dataset, if ``nrow + start_row == num_total_row``, will call ``dataset->FinishLoad``. diff --git a/include/LightGBM/dataset.h b/include/LightGBM/dataset.h index 9cbfacd015ee..3c251a79786b 100644 --- a/include/LightGBM/dataset.h +++ b/include/LightGBM/dataset.h @@ -458,13 +458,16 @@ class Dataset { int32_t has_init_scores, int32_t has_queries, int32_t nclasses, - int32_t nthreads) { - // Initialize max thread count - OMP_GET_STREAMING_MAX_THREADS(false); + int32_t nthreads, + int32_t omp_max_threads) { + // Initialize optional max thread count + if (omp_max_threads > 0) { + omp_max_threads_ = omp_max_threads; + } metadata_.Init(num_data, has_weights, has_init_scores, has_queries, nclasses); for (int i = 0; i < num_groups_; ++i) { - feature_groups_[i]->InitStreaming(nthreads); + feature_groups_[i]->InitStreaming(nthreads, omp_max_threads_); } } @@ -849,6 +852,9 @@ class Dataset { /*! \brief Get whether FinishLoad is automatically called when pushing last row. */ inline bool wait_for_manual_finish() const { return wait_for_manual_finish_; } + /*! \brief Get the maximum number of OpenMP threads to allocate for. */ + inline int omp_max_threads() const { return omp_max_threads_; } + /*! \brief Set whether the Dataset is finished automatically when last row is pushed or with a manual * MarkFinished API call. Set to true for thread-safe streaming and/or if will be coalesced later. * FinishLoad should not be called on any Dataset that will be coalesced. @@ -950,6 +956,7 @@ class Dataset { std::vector feature_need_push_zeros_; std::vector> raw_data_; bool wait_for_manual_finish_; + int omp_max_threads_ = 16; // TODO decide bool has_raw_; /*! map feature (inner index) to its index in the list of numeric (non-categorical) features */ std::vector numeric_feature_map_; diff --git a/include/LightGBM/feature_group.h b/include/LightGBM/feature_group.h index 72d9fcac08dc..0ddfd857bce1 100644 --- a/include/LightGBM/feature_group.h +++ b/include/LightGBM/feature_group.h @@ -192,14 +192,15 @@ class FeatureGroup { /*! * \brief Initialize for pushing in a streaming fashion. By default, no action needed. * \param num_thread The number of external threads that will be calling the push APIs + * \param omp_max_threads The maximum number of OpenMP threads to allocate for */ - void InitStreaming(int32_t num_thread) { + void InitStreaming(int32_t num_thread, int32_t omp_max_threads) { if (is_multi_val_) { for (int i = 0; i < num_feature_; ++i) { - multi_bin_data_[i]->InitStreaming(num_thread); + multi_bin_data_[i]->InitStreaming(num_thread, omp_max_threads); } } else { - bin_data_->InitStreaming(num_thread); + bin_data_->InitStreaming(num_thread, omp_max_threads); } } diff --git a/include/LightGBM/utils/openmp_wrapper.h b/include/LightGBM/utils/openmp_wrapper.h index 4e5766e34eba..a337fc353b75 100644 --- a/include/LightGBM/utils/openmp_wrapper.h +++ b/include/LightGBM/utils/openmp_wrapper.h @@ -5,12 +5,6 @@ #ifndef LIGHTGBM_OPENMP_WRAPPER_H_ #define LIGHTGBM_OPENMP_WRAPPER_H_ -inline static std::string getEnvVar(const char* name) { - const char *tmp = getenv(name); - std::string env_var(tmp ? tmp : ""); - return env_var; -} - #ifdef _OPENMP #include @@ -23,8 +17,6 @@ inline static std::string getEnvVar(const char* name) { #include #include -#define DEFAULT_STREAMING_MAX_THREADS 8 // TODO decide - inline int OMP_NUM_THREADS() { int ret = 1; #pragma omp parallel @@ -42,30 +34,6 @@ inline void OMP_SET_NUM_THREADS(int num_threads) { } } -inline void OMP_SET_DYNAMIC(int dyn) { - omp_set_dynamic(dyn); -} - -inline int OMP_GET_STREAMING_MAX_THREADS(bool use_cached = true) { - // Create a constant max OpenMP thread count per calling thread. - // Allow override by environment variable. Cache to avoid repeated system calls. - - static int max_streaming_threads = -1; - - if (use_cached && max_streaming_threads != -1) { - return max_streaming_threads; - } - - std::string env_var = getEnvVar("OMP_STREAMING_MAX_THREADS"); - int max_threads = DEFAULT_STREAMING_MAX_THREADS; - if (!env_var.empty()) { - max_threads=atoi(env_var.c_str()); - } - - max_streaming_threads = max_threads; - return max_streaming_threads; -} - class ThreadExceptionHelper { public: ThreadExceptionHelper() { @@ -140,11 +108,6 @@ class ThreadExceptionHelper { inline int omp_get_max_threads() __GOMP_NOTHROW {return 1;} inline int omp_get_thread_num() __GOMP_NOTHROW {return 0;} inline int OMP_NUM_THREADS() __GOMP_NOTHROW { return 1; } - inline int omp_get_dynamic() __GOMP_NOTHROW {return 1; } - inline void OMP_SET_DYNAMIC(int) __GOMP_NOTHROW {} - inline int omp_get_thread_limit() __GOMP_NOTHROW {return 1;} - inline int omp_get_active_level() __GOMP_NOTHROW {return 1;} - inline int OMP_GET_STREAMING_MAX_THREADS(bool use_cached = true) __GOMP_NOTHROW {return 1;} #ifdef __cplusplus } // extern "C" #endif @@ -156,4 +119,6 @@ class ThreadExceptionHelper { #endif + + #endif /* LIGHTGBM_OPENMP_WRAPPER_H_ */ diff --git a/src/c_api.cpp b/src/c_api.cpp index 9ffbae390c16..bdd42a0981b4 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -1018,11 +1018,12 @@ int LGBM_DatasetInitStreaming(DatasetHandle dataset, int32_t has_init_scores, int32_t has_queries, int32_t nclasses, - int32_t nthreads) { + int32_t nthreads, + int32_t omp_max_threads) { API_BEGIN(); auto p_dataset = reinterpret_cast(dataset); auto num_data = p_dataset->num_data(); - p_dataset->InitStreaming(num_data, has_weights, has_init_scores, has_queries, nclasses, nthreads); + p_dataset->InitStreaming(num_data, has_weights, has_init_scores, has_queries, nclasses, nthreads, omp_max_threads); p_dataset->set_wait_for_manual_finish(true); API_END(); } @@ -1079,7 +1080,7 @@ int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset, p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } - const int max_omp_threads = OMP_GET_STREAMING_MAX_THREADS(); + const int max_omp_threads = p_dataset->omp_max_threads(); OMP_INIT_EX(); #pragma omp parallel for schedule(static) @@ -1162,7 +1163,7 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } - const int max_omp_threads = OMP_GET_STREAMING_MAX_THREADS(); + const int max_omp_threads = p_dataset->omp_max_threads(); OMP_INIT_EX(); #pragma omp parallel for schedule(static) diff --git a/src/io/sparse_bin.hpp b/src/io/sparse_bin.hpp index 4f24444e4a60..79ebb25d08dd 100644 --- a/src/io/sparse_bin.hpp +++ b/src/io/sparse_bin.hpp @@ -81,11 +81,10 @@ class SparseBin : public Bin { ~SparseBin() {} - void InitStreaming(uint32_t num_thread) override { + void InitStreaming(uint32_t num_thread, int32_t omp_max_threads) override { // Each external thread needs its own set of OpenMP push buffers, // so allocate num_thread times the maximum number of OMP threads per external thread - int max_omp_threads = OMP_GET_STREAMING_MAX_THREADS(); - push_buffers_.resize(max_omp_threads * num_thread); + push_buffers_.resize(omp_max_threads * num_thread); }; void ReSize(data_size_t num_data) override { num_data_ = num_data; } diff --git a/tests/cpp_tests/test_stream.cpp b/tests/cpp_tests/test_stream.cpp index e8bcc7a76a22..d7d4aa04b52f 100644 --- a/tests/cpp_tests/test_stream.cpp +++ b/tests/cpp_tests/test_stream.cpp @@ -197,7 +197,7 @@ void test_stream_sparse( EXPECT_EQ(0, result) << "LGBM_DatasetCreateFromSampledColumn result code: " << result; dataset = static_cast(dataset_handle); - dataset->InitStreaming(nrows, has_weights, has_init_scores, has_queries, nclasses, 2); + dataset->InitStreaming(nrows, has_weights, has_init_scores, has_queries, nclasses, 2, -1); break; } From 5f793bdcc40435b336434a3ca85693fab2731ffb Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sun, 13 Nov 2022 14:24:50 -0800 Subject: [PATCH 06/13] fix comment --- include/LightGBM/bin.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/LightGBM/bin.h b/include/LightGBM/bin.h index 40511ab18006..71b60c493504 100644 --- a/include/LightGBM/bin.h +++ b/include/LightGBM/bin.h @@ -261,6 +261,7 @@ class Bin { /*! * \brief Initialize for pushing. By default, no action needed. * \param num_thread The number of external threads that will be calling the push APIs + * \param omp_max_threads The maximum number of OpenMP threads to allocate for */ virtual void InitStreaming(uint32_t /*num_thread*/, int32_t /*omp_max_threads*/) { } /*! From 1b94aadbefb26066649053a2bf6374c006c9f35c Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sun, 13 Nov 2022 19:42:29 -0800 Subject: [PATCH 07/13] style fix --- include/LightGBM/dataset.h | 2 +- src/c_api.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/LightGBM/dataset.h b/include/LightGBM/dataset.h index 3c251a79786b..862756ac2cde 100644 --- a/include/LightGBM/dataset.h +++ b/include/LightGBM/dataset.h @@ -956,7 +956,7 @@ class Dataset { std::vector feature_need_push_zeros_; std::vector> raw_data_; bool wait_for_manual_finish_; - int omp_max_threads_ = 16; // TODO decide + int omp_max_threads_ = 16; // TODO(svotaw) decide before checkin bool has_raw_; /*! map feature (inner index) to its index in the list of numeric (non-categorical) features */ std::vector numeric_feature_map_; diff --git a/src/c_api.cpp b/src/c_api.cpp index bdd42a0981b4..66a2839989c4 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -1164,7 +1164,7 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, } const int max_omp_threads = p_dataset->omp_max_threads(); - + OMP_INIT_EX(); #pragma omp parallel for schedule(static) for (int i = 0; i < nrow; ++i) { From b812cb00f4446b0eac61f41a5ac72ac57130c28d Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sat, 19 Nov 2022 22:18:01 -0800 Subject: [PATCH 08/13] default to -1 --- include/LightGBM/dataset.h | 2 +- src/c_api.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/LightGBM/dataset.h b/include/LightGBM/dataset.h index 862756ac2cde..e8db217f6bcb 100644 --- a/include/LightGBM/dataset.h +++ b/include/LightGBM/dataset.h @@ -956,7 +956,7 @@ class Dataset { std::vector feature_need_push_zeros_; std::vector> raw_data_; bool wait_for_manual_finish_; - int omp_max_threads_ = 16; // TODO(svotaw) decide before checkin + int omp_max_threads_ = -1; bool has_raw_; /*! map feature (inner index) to its index in the list of numeric (non-categorical) features */ std::vector numeric_feature_map_; diff --git a/src/c_api.cpp b/src/c_api.cpp index 66a2839989c4..7a8507d36f95 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -1080,7 +1080,7 @@ int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset, p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } - const int max_omp_threads = p_dataset->omp_max_threads(); + const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads(): OMP_NUM_THREADS(); OMP_INIT_EX(); #pragma omp parallel for schedule(static) @@ -1163,8 +1163,8 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } - const int max_omp_threads = p_dataset->omp_max_threads(); - + const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads(): OMP_NUM_THREADS(); + OMP_INIT_EX(); #pragma omp parallel for schedule(static) for (int i = 0; i < nrow; ++i) { From 3c9375d15b9b26a6d6a4a40ce4e98a17c810bbd5 Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sat, 19 Nov 2022 22:24:35 -0800 Subject: [PATCH 09/13] style fix --- src/c_api.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/c_api.cpp b/src/c_api.cpp index 7a8507d36f95..3bfd48d9dbc8 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -1080,7 +1080,7 @@ int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset, p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } - const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads(): OMP_NUM_THREADS(); + const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads() : OMP_NUM_THREADS(); OMP_INIT_EX(); #pragma omp parallel for schedule(static) @@ -1163,7 +1163,7 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow); } - const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads(): OMP_NUM_THREADS(); + const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads() : OMP_NUM_THREADS(); OMP_INIT_EX(); #pragma omp parallel for schedule(static) From ff33af59aa90f017828f9f9e161836d2cfffd66f Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sat, 19 Nov 2022 22:52:43 -0800 Subject: [PATCH 10/13] style fix --- src/c_api.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/c_api.cpp b/src/c_api.cpp index 3bfd48d9dbc8..004a1f230c74 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -1164,7 +1164,7 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset, } const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads() : OMP_NUM_THREADS(); - + OMP_INIT_EX(); #pragma omp parallel for schedule(static) for (int i = 0; i < nrow; ++i) { From 4b0e07ff31ab982d9f0d547e4bbdbd0205280f3d Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Sun, 20 Nov 2022 08:21:51 -0800 Subject: [PATCH 11/13] adjust initialization --- include/LightGBM/dataset.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/LightGBM/dataset.h b/include/LightGBM/dataset.h index e8db217f6bcb..073afb8f9d99 100644 --- a/include/LightGBM/dataset.h +++ b/include/LightGBM/dataset.h @@ -463,6 +463,8 @@ class Dataset { // Initialize optional max thread count if (omp_max_threads > 0) { omp_max_threads_ = omp_max_threads; + } else if (omp_max_threads_ <= 0) { + omp_max_threads_ = OMP_NUM_THREADS(); } metadata_.Init(num_data, has_weights, has_init_scores, has_queries, nclasses); From 28098e6d8efdbb4a43e9e3be5f4821b114731473 Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Mon, 21 Nov 2022 14:26:26 -0800 Subject: [PATCH 12/13] comment edit --- include/LightGBM/dataset.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/LightGBM/dataset.h b/include/LightGBM/dataset.h index 073afb8f9d99..74e3e9c1dad4 100644 --- a/include/LightGBM/dataset.h +++ b/include/LightGBM/dataset.h @@ -460,7 +460,7 @@ class Dataset { int32_t nclasses, int32_t nthreads, int32_t omp_max_threads) { - // Initialize optional max thread count + // Initialize optional max thread count with either parameter or OMP setting if (omp_max_threads > 0) { omp_max_threads_ = omp_max_threads; } else if (omp_max_threads_ <= 0) { From ab67eefba51432611e7442af863d6cb6662bf578 Mon Sep 17 00:00:00 2001 From: Scott Votaw Date: Mon, 21 Nov 2022 16:08:42 -0800 Subject: [PATCH 13/13] fix test --- tests/cpp_tests/test_stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cpp_tests/test_stream.cpp b/tests/cpp_tests/test_stream.cpp index d7d4aa04b52f..bc5f73b0a3ee 100644 --- a/tests/cpp_tests/test_stream.cpp +++ b/tests/cpp_tests/test_stream.cpp @@ -79,7 +79,7 @@ void test_stream_dense( &dataset_handle); EXPECT_EQ(0, result) << "LGBM_DatasetCreateFromSampledColumn result code: " << result; - result = LGBM_DatasetInitStreaming(dataset_handle, has_weights, has_init_scores, has_queries, nclasses, 1); + result = LGBM_DatasetInitStreaming(dataset_handle, has_weights, has_init_scores, has_queries, nclasses, 1, -1); EXPECT_EQ(0, result) << "LGBM_DatasetInitStreaming result code: " << result; break; }