From 027afdede9989c500cbef2f5d561d9007459e3a4 Mon Sep 17 00:00:00 2001 From: Alberto Ferreira Date: Tue, 23 Feb 2021 12:23:02 +0000 Subject: [PATCH] [feature] Add streaming support (#10) Through the new class ChunkedArray it's possible to collect data in Java in streaming. Rationale described at https://github.com/microsoft/LightGBM/issues/3995. --- swig/ChunkedArray.hpp | 240 ++++++++++++++++++++++++++++ swig/ChunkedArray_API_extensions.i | 13 ++ swig/lightgbmlib.i | 1 + swig/test_ChunkedArray_manually.cpp | 179 +++++++++++++++++++++ 4 files changed, 433 insertions(+) create mode 100644 swig/ChunkedArray.hpp create mode 100644 swig/ChunkedArray_API_extensions.i create mode 100644 swig/test_ChunkedArray_manually.cpp diff --git a/swig/ChunkedArray.hpp b/swig/ChunkedArray.hpp new file mode 100644 index 000000000000..6674244ef628 --- /dev/null +++ b/swig/ChunkedArray.hpp @@ -0,0 +1,240 @@ +#ifndef __CHUNKED_ARRAY_H__ +#define __CHUNKED_ARRAY_H__ + +#include +#include +#include + +#include +#include + +/** + * Container that manages a dynamic array of fixed-length chunks. + * + * The class also takes care of allocation & release of the underlying + * memory. It can be used with either a high or low-level API. + * + * The high-level API allocates chunks as needed, manages addresses automatically and keeps + * track of number of inserted elements, but is not thread-safe (ok as usually input is a streaming iterator). + * For parallel input sources the low-level API must be used. + * + * Note: When using this for `LGBM_DatasetCreateFromMats` use a + * chunk_size multiple of #num_cols for your dataset, so each chunk + * contains "complete" instances. + * + * === High-level insert API intro === + * + * The easiest way to use is: + * 0. ChunkedArray(chunk_size) # Choose appropriate size + * 1. add(value) # as many times as you want (will generate chunks as needed) + * 2. data() or void_data() # retrieves a T** or void** pointer (useful for `LGBM_DatasetCreateFromMats`). + * + * Useful query methods (all O(1)): + * - get_add_count() # total count of added elements + * - get_chunks_count() # how many chunks are currently allocated. + * - get_current_chunk_added_count() # for the last add() chunk, how many items there are. + * - get_chunk_size() # Get constant chunk_size from constructor call. + * + * With those you can generate int32_t sizes[]. Last chunk can be smaller than chunk_size, so, for any i: + * - sizes[i +class ChunkedArray +{ + public: + ChunkedArray(size_t chunk_size) + : _chunk_size(chunk_size), _current_chunks_idx(0), _current_chunk_idx(0) + { + new_chunk(); + } + + ~ChunkedArray() + { + release(); + } + + /** + * Adds a value to the chunks sequentially. + * If the last chunk is full it creates a new one and appends to it. + */ + void add(T value) { + if (! within_bounds(_current_chunks_idx, _current_chunk_idx)) { + new_chunk(); + _current_chunks_idx += 1; + _current_chunk_idx = 0; + } + + assert (setitem(_current_chunks_idx, _current_chunk_idx, value) == 0); + _current_chunk_idx += 1; + } + + size_t get_add_count() const { + return _current_chunks_idx * _chunk_size + _current_chunk_idx; + } + + size_t get_chunks_count() const { + return _chunks.size(); + } + + size_t get_current_chunk_added_count() const { + return _current_chunk_idx; + } + + size_t get_chunk_size() const { + return _chunk_size; + } + + /** + * Returns the pointer to the raw chunks data. + * + * @return T** pointer to raw data. + */ + T **data() noexcept + { + return _chunks.data(); + } + + /** + * Returns the pointer to the raw chunks data, but cast to void**. + * This is so ``LGBM_DatasetCreateFromMats`` accepts it. + * + * @return void** pointer to raw data. + */ + void **data_as_void() noexcept + { + return reinterpret_cast(_chunks.data()); + } + + /** + * Coalesces (copies chunked data) to an array of the same type. + * It assumes that ``other`` has enough space to receive that data. + */ + void coalesce_to(T *other) const { + if (this->empty()) { + return; + } + + const size_t full_chunks = this->get_chunks_count() - 1; + + // Copy full chunks: + size_t i = 0; + for(size_t chunk = 0; chunk < full_chunks; ++chunk) { + T* chunk_ptr = _chunks[chunk]; + for(size_t chunk_pos = 0; chunk_pos < _chunk_size; ++chunk_pos) { + other[i++] = chunk_ptr[chunk_pos]; + } + } + // Copy filled values from last chunk only: + const size_t last_chunk_elems = this->get_current_chunk_added_count(); + T* chunk_ptr = _chunks[full_chunks]; + for(size_t chunk_pos = 0; chunk_pos < last_chunk_elems; ++chunk_pos) { + other[i++] = chunk_ptr[chunk_pos]; + } + } + + /** + * Return value from array of chunks. + * + * @param chunks_index index of the chunk + * @param index index within chunk + * @param on_fail_value sentinel value. If out of bounds returns that value. + * + * @return pointer or nullptr if index is out of bounds. + */ + T getitem(size_t chunks_index, size_t index, T on_fail_value) noexcept + { + if (within_bounds(chunks_index, index)) + return _chunks[chunks_index][index]; + else + return on_fail_value; + } + + /** + * + * @param chunks_index index of the chunk + * @param index index within chunk + * @param value value to store + * + * @return 0 = success, -1 = out of bounds access. + */ + int setitem(size_t chunks_index, size_t index, T value) noexcept + { + if (within_bounds(chunks_index, index)) + { + _chunks[chunks_index][index] = value; + return 0; + } else { + return -1; + } + } + + /** + * To reset storage call this. + * Will release existing resources and prepare for reuse. + */ + void clear() noexcept + { + release(); + new_chunk(); + } + + /** + * Returns true if is empty. + */ + bool empty() const noexcept + { + return get_current_chunk_added_count() == 0; + } + + /** + * Deletes all the allocated chunks. + * Do not use container after this! See ``clear()`` instead. + */ + void release() noexcept + { + std::for_each(_chunks.begin(), _chunks.end(), [](T* c) { delete[] c; }); + _chunks.clear(); + _chunks.shrink_to_fit(); + _current_chunks_idx = 0; + _current_chunk_idx = 0; + } + + inline bool within_bounds(size_t chunks_index, size_t index) { + return (chunks_index < _chunks.size()) && (index < _chunk_size); + } + + /** + * Adds a new chunk to the array of chunks. Not thread-safe. + */ + void new_chunk() + { + _chunks.push_back(new (std::nothrow) T[_chunk_size]); + + // Check memory allocation success: + if (! _chunks[_chunks.size()-1]) { + release(); + throw std::bad_alloc(); + } + } + + private: + + const size_t _chunk_size; + std::vector _chunks; + + // For add() interface & some of the get_*() queries: + size_t _current_chunks_idx; //; +%template(floatChunkedArray) ChunkedArray; +%template(doubleChunkedArray) ChunkedArray; \ No newline at end of file diff --git a/swig/lightgbmlib.i b/swig/lightgbmlib.i index 0a27427d0969..ff970de8ac10 100644 --- a/swig/lightgbmlib.i +++ b/swig/lightgbmlib.i @@ -378,3 +378,4 @@ void NAME##_setitem(TYPE *ary, int64_t index, TYPE value); %pointer_handle(void*, voidpp) %include "StringArray_API_extensions.i" +%include "ChunkedArray_API_extensions.i" diff --git a/swig/test_ChunkedArray_manually.cpp b/swig/test_ChunkedArray_manually.cpp new file mode 100644 index 000000000000..1794ba795f17 --- /dev/null +++ b/swig/test_ChunkedArray_manually.cpp @@ -0,0 +1,179 @@ +/** + * Tests for ChunkedArray. + * + * Some tests require visual assessment. + * We should move this to googletest/Catch2 in the future + * and get rid of the tests that require visual checks. + */ + +#include +#include +#include +#include + +#include "ChunkedArray.hpp" +using namespace std; + +using intChunkedArray=ChunkedArray; +using doubleChunkedArray = ChunkedArray; + +// Test data +const int out_of_bounds = 4; // test get outside bounds. +const size_t chunk_size = 3; +const std::vector ref = {1, 2, 3, 4, 5, 6, 7}; + +template +size_t _get_merged_array_size(ChunkedArray &ca) { + if (ca.empty()) { + return 0; + } else { + size_t prior_chunks_total_size = (ca.get_chunks_count() - 1) * ca.get_chunk_size(); + return prior_chunks_total_size + ca.get_current_chunk_added_count(); + } +} + +template +void print_container_stats(ChunkedArray &ca) { + printf("\n\nContainer stats: %ld chunks of size %ld with %ld item(s) on last chunk (#elements=%ld).\n" + " > Should result in single array of size %ld.\n\n", + ca.get_chunks_count(), + ca.get_chunk_size(), + ca.get_current_chunk_added_count(), + ca.get_add_count(), + _get_merged_array_size(ca) + ); +} + +template +void _print_chunked_data(ChunkedArray &x, T** data, std::ostream &o = std::cout) { + int chunk = 0; + int pos = 0; + + for (int i = 0; i < x.get_add_count(); ++i) { + o << data[chunk][pos] << " "; + + ++pos; + if (pos == x.get_chunk_size()) { + pos = 0; + ++chunk; + o << "\n"; + } + } +} + +template +void print_data(ChunkedArray &x) { + T **data = x.data(); + cout << "Printing from T** data(): \n"; + _print_chunked_data(x, data); + cout << "\n^ Print complete ^\n"; +} + +template +void print_void_data(ChunkedArray &x) { + T **data = reinterpret_cast(x.data_as_void()); + cout << "Printing from reinterpret_cast(data_as_void()):\n"; + _print_chunked_data(x, data); + cout << "\n^ Print complete ^\n"; +} + +template +void print_ChunkedArray_contents(ChunkedArray &ca) { + int chunk = 0; + int pos = 0; + for (int i = 0; i < ca.get_add_count() + out_of_bounds; ++i) { + + bool within_added = i < ca.get_add_count(); + bool within_bounds = ca.within_bounds(chunk, pos); + cout << "@(" << chunk << "," << pos << ") = " << ca.getitem(chunk, pos, 10) + << " " << within_added << " " << within_bounds << endl; + + ++pos; + + if (pos == ca.get_chunk_size()) { + ++chunk; + pos = 0; + } + } +} + +/** + * Ensure coalesce_to works and dumps all the inserted data correctly. + */ +void test_coalesce_to(const intChunkedArray &ca, const std::vector &ref) { + std::vector coalesced_out(ca.get_add_count()); + + ca.coalesce_to(coalesced_out.data()); + + assert(ref.size() == coalesced_out.size()); + assert(std::equal(ref.begin(), ref.end(), coalesced_out.begin())); +} + +/** + * By retrieving all the data to a format split by chunks, one can ensure + * that the data was stored correctly and with the correct memory layout. + */ +template +void test_data_layout(ChunkedArray &ca, const std::vector &ref, bool data_as_void) { + std::stringstream ss, ss_ref; + T **data = data_as_void? reinterpret_cast(ca.data_as_void()) : ca.data(); + // Dump each chunk represented by a line with elements split by space: + for (int i = 0; i < ref.size(); ++i) { + if ((i > 0) && (i % chunk_size == 0)) + ss_ref << "\n"; + ss_ref << ref[i] << " "; + } + + _print_chunked_data(ca, data, ss); // Dump chunked data to this same string format. + assert (ss_ref.str() == ss.str()); +} + +/** + * Test that using, clearing and reusing uses the latest data only. + */ +void test_clear() { + // Set-up with some data + const std::vector ref2 = {1, 2, 5, -1}; + ChunkedArray ca = ChunkedArray(chunk_size); + for (auto v: ref) { + ca.add(v); + } + test_coalesce_to(ca, ref); // Should have the same contents. + + // Clear & re-use: + ca.clear(); + for (auto v: ref2) { + ca.add(v); // Fill with new contents. + } + + // Ensure it still works: + test_coalesce_to(ca, ref2); // Should match the new reference content. +} + +int main() { + // Initialize test variables. //////////////////////////////////////////////////// + ChunkedArray ca = ChunkedArray(chunk_size); + for (auto v: ref) { + ca.add(v); // Indirectly test insertions through the retrieval tests. + } + + // Tests ///////////////////////////////////////////////////////////////////////// + + assert(ca.get_add_count() == ref.size()); + test_coalesce_to(ca, ref); + + // Test chunked data layout for retrieval: + test_data_layout(ca, ref, false); + test_data_layout(ca, ref, true); + + test_clear(); + + // For manual verification - useful outputs ////////////////////////////////////// + print_container_stats(ca); + print_ChunkedArray_contents(ca); + print_data(ca); + print_void_data(ca); + ca.release(); ca.release(); print_container_stats(ca); // Check double free behaviour. + cout << "Done!" << endl; + return 0; +}