diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp index d0c32d65..38f3b1a5 100644 --- a/tdigest/include/tdigest.hpp +++ b/tdigest/include/tdigest.hpp @@ -20,8 +20,10 @@ #ifndef _TDIGEST_HPP_ #define _TDIGEST_HPP_ -#include +#include #include +#include +#include #include "common_defs.hpp" @@ -84,6 +86,7 @@ class tdigest { T mean_; W weight_; }; + using vector_t = std::vector; using vector_centroid = std::vector::template rebind_alloc>; using vector_bytes = std::vector::template rebind_alloc>; @@ -165,20 +168,29 @@ class tdigest { */ string to_string(bool print_centroids = false) const; + /** + * Computes size needed to serialize the current state. + * @param with_buffer optionally serialize buffered values avoiding compression + * @return size in bytes needed to serialize this tdigest + */ + size_t get_serialized_size_bytes(bool with_buffer = false) const; + /** * This method serializes t-Digest into a given stream in a binary form * @param os output stream + * @param with_buffer optionally serialize buffered values avoiding compression */ - void serialize(std::ostream& os) const; + void serialize(std::ostream& os, bool with_buffer = false) const; /** * This method serializes t-Digest as a vector of bytes. * An optional header can be reserved in front of the sketch. * It is an uninitialized space of a given size. * @param header_size_bytes space to reserve in front of the sketch + * @param with_buffer optionally serialize buffered values avoiding compression * @return serialized sketch as a vector of bytes */ - vector_bytes serialize(unsigned header_size_bytes = 0) const; + vector_bytes serialize(unsigned header_size_bytes = 0, bool with_buffer = false) const; /** * This method deserializes t-Digest from a given stream. @@ -198,7 +210,6 @@ class tdigest { static tdigest deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator()); private: - Allocator allocator_; bool reverse_merge_; uint16_t k_; uint16_t internal_k_; @@ -208,8 +219,9 @@ class tdigest { vector_centroid centroids_; uint64_t centroids_weight_; size_t buffer_capacity_; - vector_centroid buffer_; - uint64_t buffered_weight_; + vector_t buffer_; + + static const size_t BUFFER_MULTIPLIER = 4; static const uint8_t PREAMBLE_LONGS_EMPTY_OR_SINGLE = 1; static const uint8_t PREAMBLE_LONGS_MULTIPLE = 2; @@ -222,11 +234,11 @@ class tdigest { enum flags { IS_EMPTY, IS_SINGLE_VALUE, REVERSE_MERGE }; bool is_single_value() const; + uint8_t get_preamble_longs() const; + void merge(vector_centroid& buffer, W weight); // for deserialize - tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, const Allocator& allocator); - - void merge_buffered(); + tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, vector_t&& buffer); static double weighted_average(double x1, double w1, double x2, double w2); diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp index 1a48f887..f8af5322 100644 --- a/tdigest/include/tdigest_impl.hpp +++ b/tdigest/include/tdigest_impl.hpp @@ -30,15 +30,14 @@ namespace datasketches { template tdigest::tdigest(uint16_t k, const A& allocator): -tdigest(false, k, std::numeric_limits::infinity(), -std::numeric_limits::infinity(), vector_centroid(allocator), 0, allocator) +tdigest(false, k, std::numeric_limits::infinity(), -std::numeric_limits::infinity(), vector_centroid(allocator), 0, vector_t(allocator)) {} template void tdigest::update(T value) { if (std::isnan(value)) return; - if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_buffered(); - buffer_.push_back(centroid(value, 1)); - ++buffered_weight_; + if (buffer_.size() == centroids_capacity_ * BUFFER_MULTIPLIER) compress(); + buffer_.push_back(value); min_ = std::min(min_, value); max_ = std::max(max_, value); } @@ -46,22 +45,21 @@ void tdigest::update(T value) { template void tdigest::merge(tdigest& other) { if (other.is_empty()) return; - size_t num = buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size(); - buffer_.reserve(num); - std::copy(other.buffer_.begin(), other.buffer_.end(), std::back_inserter(buffer_)); - std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_)); - buffered_weight_ += other.get_total_weight(); - if (num > buffer_capacity_) { - merge_buffered(); - } else { - min_ = std::min(min_, other.get_min_value()); - max_ = std::max(max_, other.get_max_value()); - } + vector_centroid tmp(buffer_.get_allocator()); + tmp.reserve(buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size()); + for (const T value: buffer_) tmp.push_back(centroid(value, 1)); + for (const T value: other.buffer_) tmp.push_back(centroid(value, 1)); + std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(tmp)); + merge(tmp, buffer_.size() + other.get_total_weight()); } template void tdigest::compress() { - merge_buffered(); + if (buffer_.size() == 0) return; + vector_centroid tmp(buffer_.get_allocator()); + tmp.reserve(buffer_.size() + centroids_.size()); + for (const T value: buffer_) tmp.push_back(centroid(value, 1)); + merge(tmp, buffer_.size()); } template @@ -83,7 +81,7 @@ T tdigest::get_max_value() const { template uint64_t tdigest::get_total_weight() const { - return centroids_weight_ + buffered_weight_; + return centroids_weight_ + buffer_.size(); } template @@ -95,7 +93,7 @@ double tdigest::get_rank(T value) const { // one centroid and value == min_ == max_ if ((centroids_.size() + buffer_.size()) == 1) return 0.5; - const_cast(this)->merge_buffered(); // side effect + const_cast(this)->compress(); // side effect // left tail const T first_mean = centroids_.front().get_mean(); @@ -149,7 +147,7 @@ T tdigest::get_quantile(double rank) const { if ((rank < 0.0) || (rank > 1.0)) { throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1"); } - const_cast(this)->merge_buffered(); // side effect + const_cast(this)->compress(); // side effect if (centroids_.size() == 1) return centroids_.front().get_mean(); // at least 2 centroids @@ -204,13 +202,11 @@ string tdigest::to_string(bool print_centroids) const { std::ostringstream os; os << "### t-Digest summary:" << std::endl; os << " Nominal k : " << k_ << std::endl; - os << " Internal k : " << internal_k_ << std::endl; os << " Centroids : " << centroids_.size() << std::endl; os << " Buffered : " << buffer_.size() << std::endl; os << " Centroids capacity : " << centroids_capacity_ << std::endl; - os << " Buffer capacity : " << buffer_capacity_ << std::endl; + os << " Buffer capacity : " << centroids_capacity_ * BUFFER_MULTIPLIER << std::endl; os << " Centroids Weight : " << centroids_weight_ << std::endl; - os << " Buffered Weight : " << buffered_weight_ << std::endl; os << " Total Weight : " << get_total_weight() << std::endl; os << " Reverse Merge : " << (reverse_merge_ ? "true" : "false") << std::endl; if (!is_empty()) { @@ -229,33 +225,33 @@ string tdigest::to_string(bool print_centroids) const { if (buffer_.size() > 0) { os << "Buffer:" << std::endl; int i = 0; - for (const auto& b: buffer_) { - os << i++ << ": " << b.get_mean() << ", " << b.get_weight() << std::endl; + for (const T value: buffer_) { + os << i++ << ": " << value << std::endl; } } } - return string(os.str().c_str(), allocator_); + return string(os.str().c_str(), buffer_.get_allocator()); } +// assumes that there is enough room in the input buffer to add centroids from this tdigest template -void tdigest::merge_buffered() { - if (buffered_weight_ == 0) return; - std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer_)); +void tdigest::merge(vector_centroid& buffer, W weight) { + std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer)); centroids_.clear(); - std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp()); - if (reverse_merge_) std::reverse(buffer_.begin(), buffer_.end()); - centroids_weight_ += buffered_weight_; - auto it = buffer_.begin(); + std::stable_sort(buffer.begin(), buffer.end(), centroid_cmp()); + if (reverse_merge_) std::reverse(buffer.begin(), buffer.end()); + centroids_weight_ += weight; + auto it = buffer.begin(); centroids_.push_back(*it); ++it; double weight_so_far = 0; - while (it != buffer_.end()) { + while (it != buffer.end()) { const double proposed_weight = centroids_.back().get_weight() + it->get_weight(); bool add_this = false; - if (std::distance(buffer_.begin(), it) != 1 && std::distance(buffer_.end(), it) != 1) { + if (std::distance(buffer.begin(), it) != 1 && std::distance(buffer.end(), it) != 1) { const double q0 = weight_so_far / centroids_weight_; const double q2 = (weight_so_far + proposed_weight) / centroids_weight_; - const double normalizer = scale_function().normalizer(internal_k_, centroids_weight_); + const double normalizer = scale_function().normalizer(2 * k_, centroids_weight_); add_this = proposed_weight <= centroids_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer)); } if (add_this) { @@ -267,13 +263,10 @@ void tdigest::merge_buffered() { ++it; } if (reverse_merge_) std::reverse(centroids_.begin(), centroids_.end()); - if (centroids_weight_ > 0) { - min_ = std::min(min_, centroids_.front().get_mean()); - max_ = std::max(max_, centroids_.back().get_mean()); - } + min_ = std::min(min_, centroids_.front().get_mean()); + max_ = std::max(max_, centroids_.back().get_mean()); reverse_merge_ = !reverse_merge_; buffer_.clear(); - buffered_weight_ = 0; } template @@ -282,68 +275,76 @@ double tdigest::weighted_average(double x1, double w1, double x2, double w } template -void tdigest::serialize(std::ostream& os) const { - const_cast(this)->merge_buffered(); // side effect - write(os, is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE); +void tdigest::serialize(std::ostream& os, bool with_buffer) const { + if (!with_buffer) const_cast(this)->compress(); // side effect + write(os, get_preamble_longs()); write(os, SERIAL_VERSION); write(os, SKETCH_TYPE); write(os, k_); const uint8_t flags_byte( - (is_empty() ? 1 << flags::IS_EMPTY : 0) | - (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) | - (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0) + (is_empty() ? 1 << flags::IS_EMPTY : 0) + | (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) + | (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0) ); write(os, flags_byte); write(os, 0); // unused - if (is_empty()) return; - if (is_single_value()) { write(os, min_); return; } - write(os, static_cast(centroids_.size())); - write(os, 0); // unused - + write(os, static_cast(buffer_.size())); write(os, min_); write(os, max_); - write(os, centroids_.data(), centroids_.size() * sizeof(centroid)); + if (centroids_.size() > 0) write(os, centroids_.data(), centroids_.size() * sizeof(centroid)); + if (buffer_.size() > 0) write(os, buffer_.data(), buffer_.size() * sizeof(T)); } template -auto tdigest::serialize(unsigned header_size_bytes) const -> vector_bytes { - const_cast(this)->merge_buffered(); // side effect - const uint8_t preamble_longs = is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE; - const size_t size_bytes = preamble_longs * sizeof(uint64_t) + - (is_empty() ? 0 : (is_single_value() ? sizeof(T) : sizeof(T) * 2 + sizeof(centroid) * centroids_.size())); - vector_bytes bytes(size_bytes, 0, allocator_); - uint8_t* ptr = bytes.data() + header_size_bytes; +uint8_t tdigest::get_preamble_longs() const { + return is_empty() || is_single_value() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE; +} - *ptr++ = preamble_longs; +template +size_t tdigest::get_serialized_size_bytes(bool with_buffer) const { + if (!with_buffer) const_cast(this)->compress(); // side effect + size_t size_bytes = get_preamble_longs() * sizeof(uint64_t); + if (is_empty()) return size_bytes; + if (is_single_value()) return size_bytes + sizeof(T); + size_bytes += sizeof(T) * 2 // min and max + + sizeof(centroid) * centroids_.size(); + if (with_buffer) size_bytes += sizeof(T) * buffer_.size(); // count is a part of preamble + return size_bytes; +} + +template +auto tdigest::serialize(unsigned header_size_bytes, bool with_buffer) const -> vector_bytes { + if (!with_buffer) const_cast(this)->compress(); // side effect + vector_bytes bytes(get_serialized_size_bytes(with_buffer), 0, buffer_.get_allocator()); + uint8_t* ptr = bytes.data() + header_size_bytes; + *ptr++ = get_preamble_longs(); *ptr++ = SERIAL_VERSION; *ptr++ = SKETCH_TYPE; ptr += copy_to_mem(k_, ptr); const uint8_t flags_byte( - (is_empty() ? 1 << flags::IS_EMPTY : 0) | - (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) | - (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0) + (is_empty() ? 1 << flags::IS_EMPTY : 0) + | (is_single_value() ? 1 << flags::IS_SINGLE_VALUE : 0) + | (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0) ); *ptr++ = flags_byte; ptr += 2; // unused if (is_empty()) return bytes; - if (is_single_value()) { copy_to_mem(min_, ptr); return bytes; } - ptr += copy_to_mem(static_cast(centroids_.size()), ptr); - ptr += 4; // unused - + ptr += copy_to_mem(static_cast(buffer_.size()), ptr); ptr += copy_to_mem(min_, ptr); ptr += copy_to_mem(max_, ptr); - copy_to_mem(centroids_.data(), ptr, centroids_.size() * sizeof(centroid)); + if (centroids_.size() > 0) ptr += copy_to_mem(centroids_.data(), ptr, centroids_.size() * sizeof(centroid)); + if (buffer_.size() > 0) copy_to_mem(buffer_.data(), ptr, buffer_.size() * sizeof(T)); return bytes; } @@ -374,19 +375,21 @@ tdigest tdigest::deserialize(std::istream& is, const A& allocator) { const bool reverse_merge = flags_byte & (1 << flags::REVERSE_MERGE); if (is_single_value) { const T value = read(is); - return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, allocator); + return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, vector_t(allocator)); } const auto num_centroids = read(is); - read(is); // unused + const auto num_buffered = read(is); const T min = read(is); const T max = read(is); vector_centroid centroids(num_centroids, centroid(0, 0), allocator); - read(is, centroids.data(), num_centroids * sizeof(centroid)); - uint64_t total_weight = 0; - for (const auto& c: centroids) total_weight += c.get_weight(); - return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator); + if (num_centroids > 0) read(is, centroids.data(), num_centroids * sizeof(centroid)); + vector_t buffer(num_buffered, 0, allocator); + if (num_buffered > 0) read(is, buffer.data(), num_buffered * sizeof(T)); + uint64_t weight = 0; + for (const auto& c: centroids) weight += c.get_weight(); + return tdigest(reverse_merge, k, min, max, std::move(centroids), weight, std::move(buffer)); } template @@ -423,24 +426,27 @@ tdigest tdigest::deserialize(const void* bytes, size_t size, const A ensure_minimum_memory(end_ptr - ptr, sizeof(T)); T value; ptr += copy_from_mem(ptr, value); - return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, allocator); + return tdigest(reverse_merge, k, value, value, vector_centroid(1, centroid(value, 1), allocator), 1, vector_t(allocator)); } ensure_minimum_memory(end_ptr - ptr, 8); uint32_t num_centroids; ptr += copy_from_mem(ptr, num_centroids); - ptr += 4; // unused + uint32_t num_buffered; + ptr += copy_from_mem(ptr, num_buffered); - ensure_minimum_memory(end_ptr - ptr, sizeof(T) * 2 + sizeof(centroid) * num_centroids); + ensure_minimum_memory(end_ptr - ptr, sizeof(T) * 2 + sizeof(centroid) * num_centroids + sizeof(T) * num_buffered); T min; ptr += copy_from_mem(ptr, min); T max; ptr += copy_from_mem(ptr, max); vector_centroid centroids(num_centroids, centroid(0, 0), allocator); - copy_from_mem(ptr, centroids.data(), sizeof(centroid) * num_centroids); - uint64_t total_weight = 0; - for (const auto& c: centroids) total_weight += c.get_weight(); - return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator); + if (num_centroids > 0) ptr += copy_from_mem(ptr, centroids.data(), num_centroids * sizeof(centroid)); + vector_t buffer(num_buffered, 0, allocator); + if (num_buffered > 0) copy_from_mem(ptr, buffer.data(), num_buffered * sizeof(T)); + uint64_t weight = 0; + for (const auto& c: centroids) weight += c.get_weight(); + return tdigest(reverse_merge, k, min, max, std::move(centroids), weight, std::move(buffer)); } // compatibility with the format of the reference implementation @@ -466,7 +472,7 @@ tdigest tdigest::deserialize_compat(std::istream& is, const A& alloc c = centroid(mean, weight); total_weight += weight; } - return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator); + return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator)); } // COMPAT_FLOAT: compatibility with asSmallBytes() const auto min = read_big_endian(is); // reference implementation uses doubles for min and max @@ -484,7 +490,7 @@ tdigest tdigest::deserialize_compat(std::istream& is, const A& alloc c = centroid(mean, weight); total_weight += weight; } - return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator); + return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator)); } // compatibility with the format of the reference implementation @@ -526,7 +532,7 @@ tdigest tdigest::deserialize_compat(const void* bytes, size_t size, c = centroid(mean, static_cast(weight)); total_weight += static_cast(weight); } - return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator); + return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator)); } // COMPAT_FLOAT: compatibility with asSmallBytes() ensure_minimum_memory(end_ptr - ptr, sizeof(double) * 2 + sizeof(float) + sizeof(uint16_t) * 3); @@ -558,7 +564,7 @@ tdigest tdigest::deserialize_compat(const void* bytes, size_t size, c = centroid(mean, static_cast(weight)); total_weight += static_cast(weight); } - return tdigest(false, k, min, max, std::move(centroids), total_weight, allocator); + return tdigest(false, k, min, max, std::move(centroids), total_weight, vector_t(allocator)); } template @@ -567,30 +573,21 @@ bool tdigest::is_single_value() const { } template -tdigest::tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight, const A& allocator): -allocator_(allocator), +tdigest::tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t weight, vector_t&& buffer): reverse_merge_(reverse_merge), k_(k), -internal_k_(k), min_(min), max_(max), centroids_capacity_(0), centroids_(std::move(centroids)), -centroids_weight_(total_weight), -buffer_capacity_(0), -buffer_(allocator), -buffered_weight_(0) +centroids_weight_(weight), +buffer_(std::move(buffer)) { if (k < 10) throw std::invalid_argument("k must be at least 10"); const size_t fudge = k < 30 ? 30 : 10; centroids_capacity_ = 2 * k_ + fudge; - buffer_capacity_ = 5 * centroids_capacity_; - const double scale = std::max(1.0, static_cast(buffer_capacity_) / centroids_capacity_ - 1.0); - internal_k_ = std::ceil(std::sqrt(scale) * k_); - centroids_capacity_ = std::max(centroids_capacity_, internal_k_ + fudge); - buffer_capacity_ = std::max(buffer_capacity_, 2 * centroids_capacity_); centroids_.reserve(centroids_capacity_); - buffer_.reserve(buffer_capacity_); + buffer_.reserve(centroids_capacity_ * BUFFER_MULTIPLIER); } } /* namespace datasketches */ diff --git a/tdigest/test/tdigest_serialize_for_java.cpp b/tdigest/test/tdigest_serialize_for_java.cpp index 1f3c1fb1..fe0a9d4f 100644 --- a/tdigest/test/tdigest_serialize_for_java.cpp +++ b/tdigest/test/tdigest_serialize_for_java.cpp @@ -34,6 +34,16 @@ TEST_CASE("tdigest double generate", "[serialize_for_java]") { } } +TEST_CASE("tdigest double generate with buffer", "[serialize_for_java]") { + const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000}; + for (const unsigned n: n_arr) { + tdigest_double td(100); + for (unsigned i = 1; i <= n; ++i) td.update(i); + std::ofstream os("tdigest_double_buf_n" + std::to_string(n) + "_cpp.sk", std::ios::binary); + td.serialize(os, true); + } +} + TEST_CASE("tdigest float generate", "[serialize_for_java]") { const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000}; for (const unsigned n: n_arr) { @@ -44,4 +54,14 @@ TEST_CASE("tdigest float generate", "[serialize_for_java]") { } } +TEST_CASE("tdigest float generate with buffer", "[serialize_for_java]") { + const unsigned n_arr[] = {0, 1, 10, 100, 1000, 10000, 100000, 1000000}; + for (const unsigned n: n_arr) { + tdigest_float td(100); + for (unsigned i = 1; i <= n; ++i) td.update(i); + std::ofstream os("tdigest_float_buf_n" + std::to_string(n) + "_cpp.sk", std::ios::binary); + td.serialize(os, true); + } +} + } /* namespace datasketches */ diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp index ac52f554..bf64dbb4 100644 --- a/tdigest/test/tdigest_test.cpp +++ b/tdigest/test/tdigest_test.cpp @@ -54,8 +54,9 @@ TEST_CASE("one value", "[tdigest]") { TEST_CASE("many values", "[tdigest]") { const size_t n = 10000; - tdigest_double td(100); + tdigest_double td; for (size_t i = 0; i < n; ++i) td.update(i); +// std::cout << td.to_string(true); // td.compress(); // std::cout << td.to_string(true); REQUIRE_FALSE(td.is_empty()); @@ -68,7 +69,7 @@ TEST_CASE("many values", "[tdigest]") { REQUIRE(td.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001)); REQUIRE(td.get_rank(n) == 1); REQUIRE(td.get_quantile(0) == 0); - REQUIRE(td.get_quantile(0.5) == Approx(n / 2).epsilon(0.01)); + REQUIRE(td.get_quantile(0.5) == Approx(n / 2).epsilon(0.03)); REQUIRE(td.get_quantile(0.9) == Approx(n * 0.9).epsilon(0.01)); REQUIRE(td.get_quantile(0.95) == Approx(n * 0.95).epsilon(0.01)); REQUIRE(td.get_quantile(1) == n - 1); @@ -137,12 +138,14 @@ TEST_CASE("merge small", "[tdigest]") { TEST_CASE("merge large", "[tdigest]") { const size_t n = 10000; - tdigest_double td1(100); - tdigest_double td2(100); + tdigest_double td1; + tdigest_double td2; for (size_t i = 0; i < n / 2; ++i) { td1.update(i); td2.update(n / 2 + i); } +// std::cout << td1.to_string(); +// std::cout << td2.to_string(); td1.merge(td2); // td1.compress(); // std::cout << td1.to_string(true); @@ -179,6 +182,19 @@ TEST_CASE("serialize deserialize stream single value", "[tdigest]") { REQUIRE(deserialized_td.get_max_value() == 123); } +TEST_CASE("serialize deserialize stream single value buffered", "[tdigest]") { + tdigest td; + td.update(123); + std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); + td.serialize(s, true); + auto deserialized_td = tdigest::deserialize(s); + REQUIRE(deserialized_td.get_k() == 200); + REQUIRE(deserialized_td.get_total_weight() == 1); + REQUIRE_FALSE(deserialized_td.is_empty()); + REQUIRE(deserialized_td.get_min_value() == 123); + REQUIRE(deserialized_td.get_max_value() == 123); +} + TEST_CASE("serialize deserialize stream many values", "[tdigest]") { tdigest td(100); for (int i = 0; i < 1000; ++i) td.update(i); @@ -194,6 +210,21 @@ TEST_CASE("serialize deserialize stream many values", "[tdigest]") { REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5)); } +TEST_CASE("serialize deserialize stream many values with buffer", "[tdigest]") { + tdigest td(100); + for (int i = 0; i < 10000; ++i) td.update(i); + std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); + td.serialize(s, true); + auto deserialized_td = tdigest::deserialize(s); + REQUIRE(td.get_k() == deserialized_td.get_k()); + REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight()); + REQUIRE(td.is_empty() == deserialized_td.is_empty()); + REQUIRE(td.get_min_value() == deserialized_td.get_min_value()); + REQUIRE(td.get_max_value() == deserialized_td.get_max_value()); + REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500)); + REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5)); +} + TEST_CASE("serialize deserialize bytes empty", "[tdigest]") { tdigest td(100); auto bytes = td.serialize(); @@ -215,6 +246,18 @@ TEST_CASE("serialize deserialize bytes single value", "[tdigest]") { REQUIRE(deserialized_td.get_max_value() == 123); } +TEST_CASE("serialize deserialize bytes single value buffered", "[tdigest]") { + tdigest td(200); + td.update(123); + auto bytes = td.serialize(0, true); + auto deserialized_td = tdigest::deserialize(bytes.data(), bytes.size()); + REQUIRE(deserialized_td.get_k() == 200); + REQUIRE(deserialized_td.get_total_weight() == 1); + REQUIRE_FALSE(deserialized_td.is_empty()); + REQUIRE(deserialized_td.get_min_value() == 123); + REQUIRE(deserialized_td.get_max_value() == 123); +} + TEST_CASE("serialize deserialize bytes many values", "[tdigest]") { tdigest td(100); for (int i = 0; i < 1000; ++i) td.update(i); @@ -229,6 +272,20 @@ TEST_CASE("serialize deserialize bytes many values", "[tdigest]") { REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5)); } +TEST_CASE("serialize deserialize bytes many values with buffer", "[tdigest]") { + tdigest td(100); + for (int i = 0; i < 10000; ++i) td.update(i); + auto bytes = td.serialize(); + auto deserialized_td = tdigest::deserialize(bytes.data(), bytes.size()); + REQUIRE(td.get_k() == deserialized_td.get_k()); + REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight()); + REQUIRE(td.is_empty() == deserialized_td.is_empty()); + REQUIRE(td.get_min_value() == deserialized_td.get_min_value()); + REQUIRE(td.get_max_value() == deserialized_td.get_max_value()); + REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500)); + REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5)); +} + TEST_CASE("serialize deserialize steam and bytes equivalence empty", "[tdigest]") { tdigest td(100); std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); @@ -287,6 +344,40 @@ TEST_CASE("serialize deserialize steam and bytes equivalence", "[tdigest]") { REQUIRE(deserialized_td1.get_quantile(0.5) == deserialized_td2.get_quantile(0.5)); } +TEST_CASE("serialize deserialize steam and bytes equivalence with buffer", "[tdigest]") { + tdigest td(100); + const int n = 10000; + for (int i = 0; i < n; ++i) td.update(i); + std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); + td.serialize(s, true); + auto bytes = td.serialize(0, true); + + REQUIRE(bytes.size() == static_cast(s.tellp())); + for (size_t i = 0; i < bytes.size(); ++i) { + REQUIRE(((char*)bytes.data())[i] == (char)s.get()); + } + + s.seekg(0); // rewind + auto deserialized_td1 = tdigest::deserialize(s); + auto deserialized_td2 = tdigest::deserialize(bytes.data(), bytes.size()); + REQUIRE(bytes.size() == static_cast(s.tellg())); + + REQUIRE_FALSE(deserialized_td1.is_empty()); + REQUIRE(deserialized_td1.get_k() == 100); + REQUIRE(deserialized_td1.get_total_weight() == n); + REQUIRE(deserialized_td1.get_min_value() == 0); + REQUIRE(deserialized_td1.get_max_value() == n - 1); + + REQUIRE_FALSE(deserialized_td2.is_empty()); + REQUIRE(deserialized_td2.get_k() == 100); + REQUIRE(deserialized_td2.get_total_weight() == n); + REQUIRE(deserialized_td2.get_min_value() == 0); + REQUIRE(deserialized_td2.get_max_value() == n - 1); + + REQUIRE(deserialized_td1.get_rank(n / 2) == deserialized_td2.get_rank(n / 2)); + REQUIRE(deserialized_td1.get_quantile(0.5) == deserialized_td2.get_quantile(0.5)); +} + TEST_CASE("deserialize from reference implementation stream double", "[tdigest]") { std::ifstream is; is.exceptions(std::ios::failbit | std::ios::badbit);