From 3b9ef635590598af2ae12f4f0c9ec6aa3fd03a94 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Tue, 19 Dec 2023 12:41:45 -0800 Subject: [PATCH 1/6] incomplete implementation of t-digest --- CMakeLists.txt | 1 + tdigest/CMakeLists.txt | 41 +++++ tdigest/include/tdigest.hpp | 202 +++++++++++++++++++++++ tdigest/include/tdigest_impl.hpp | 264 +++++++++++++++++++++++++++++++ tdigest/test/CMakeLists.txt | 41 +++++ tdigest/test/tdigest_test.cpp | 111 +++++++++++++ 6 files changed, 660 insertions(+) create mode 100755 tdigest/CMakeLists.txt create mode 100644 tdigest/include/tdigest.hpp create mode 100644 tdigest/include/tdigest_impl.hpp create mode 100644 tdigest/test/CMakeLists.txt create mode 100644 tdigest/test/tdigest_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5336ea24..e3505c19 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,6 +118,7 @@ add_subdirectory(req) add_subdirectory(quantiles) add_subdirectory(count) add_subdirectory(density) +add_subdirectory(tdigest) if (WITH_PYTHON) add_subdirectory(python) diff --git a/tdigest/CMakeLists.txt b/tdigest/CMakeLists.txt new file mode 100755 index 00000000..d7deee3d --- /dev/null +++ b/tdigest/CMakeLists.txt @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_library(tdigest INTERFACE) + +add_library(${PROJECT_NAME}::QUANTILES ALIAS tdigest) + +if (BUILD_TESTS) + add_subdirectory(test) +endif() + +target_include_directories(tdigest + INTERFACE + $ + $/include> +) + +target_link_libraries(tdigest INTERFACE common) + +install(TARGETS tdigest + EXPORT ${PROJECT_NAME} +) + +install(FILES + include/tdigest.hpp + include/tdigest_impl.hpp + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/DataSketches") diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp new file mode 100644 index 00000000..8ebd13ce --- /dev/null +++ b/tdigest/include/tdigest.hpp @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _TDIGEST_HPP_ +#define _TDIGEST_HPP_ + +#include +#include + +#include "common_defs.hpp" + +namespace datasketches { + +// this is equivalent of K_2 (default) in the Java implementation mentioned below +struct scale_function { + double k(double q, double normalizer) const { + return limit([normalizer] (double q) { return std::log(q / (1 - q)) * normalizer; }, q, 1e-15, 1 - 1e-15); + } + double q(double k, double normalizer) const { + const double w = std::exp(k / normalizer); + return w / (1 + w); + } + double max(double q, double normalizer) const { + return q * (1 - q) / normalizer; + } + double normalizer(double compression, double n) const { + return compression / z(compression, n); + } + double z(double compression, double n) const { + return 4 * std::log(n / compression) + 24; + } + + template + double limit(Func f, double x, double low, double high) const { + if (x < low) return f(low); + if (x > high) return f(high); + return f(x); + } +}; + +// forward declaration +template > class tdigest; + +/// TDigest float sketch +using tdigest_float = tdigest; +/// TDigest double sketch +using tdigest_double = tdigest; + +/** + * t-Digest for estimating quantiles and ranks. + * This implementation is based on the following paper: + * Ted Dunning, Otmar Ertl. Extremely Accurate Quantiles Using t-Digests + * and the following implementation in Java: + * https://github.com/tdunning/t-digest + * This implementation is similar to MergingDigest in the above Java implementation + */ +template +class tdigest { + static_assert(std::is_floating_point::value, "Floating-point type expected"); + static_assert(std::numeric_limits::is_iec559, "IEEE 754 compatibility required"); +public: + using value_type = T; + using allocator_type = Allocator; + + static const bool USE_ALTERNATING_SORT = true; + static const bool USE_TWO_LEVEL_COMPRESSION = true; + static const bool USE_WEIGHT_LIMIT = true; + + class centroid { + public: + centroid(T value, uint64_t weight): mean_(value), weight_(weight) {} + void add(const centroid& other) { + weight_ += other.weight_; + mean_ += (other.mean_ - mean_) * other.weight_ / weight_; + } + T get_mean() const { return mean_; } + uint64_t get_weight() const { return weight_; } + private: + T mean_; + uint64_t weight_; + }; + using vector_centroid = std::vector::template rebind_alloc>; + + struct centroid_cmp { + centroid_cmp(bool reverse): reverse_(reverse) {} + bool operator()(const centroid& a, const centroid& b) const { + if (a.get_mean() < b.get_mean()) return !reverse_; + return reverse_; + } + bool reverse_; + }; + + /** + * Constructor + * @param k affects the size of the sketch and its estimation error + * @param allocator used to allocate memory + */ + explicit tdigest(uint16_t k = 100, const Allocator& allocator = Allocator()); + + /** + * Update this t-Digest with the given value + * @param value to update the t-Digest with + */ + void update(T value); + + /** + * Merge the given t-Digest into this one + * @param other t-Digest to merge + */ + void merge(tdigest& other); + + /** + * Process buffered values and merge centroids if needed + */ + void compress(); + + /** + * @return true if t-Digest has not seen any data + */ + bool is_empty() const; + + /** + * @return minimum value seen by t-Digest + */ + T get_min_value() const; + + /** + * @return maximum value seen by t-Digest + */ + T get_max_value() const; + + /** + * @return total weight + */ + uint64_t get_total_weight() const; + + /** + * Compute approximate normalized rank of the given value. + * @param value to be ranked + * @return normalized rank (from 0 to 1 inclusive) + */ + double get_rank(T value) const; + + /** + * Compute approximate quantile value corresponding to the given normalized rank + * @param rank normalized rank (from 0 to 1 inclusive) + * @return quantile value corresponding to the given rank + */ + T get_quantile(double rank) const; + + /** + * @return parameter k (compression) that was used to configure this t-Digest + */ + uint16_t get_k() const; + + /** + * Human-readable summary of this t-Digest as a string + * @param print_centroids if true append the list of centroids with weights + * @return summary of this t-Digest + */ + string to_string(bool print_centroids = false) const; + +private: + Allocator allocator_; + uint16_t k_; + uint16_t internal_k_; + uint32_t merge_count_; + T min_; + T max_; + size_t centroids_capacity_; + vector_centroid centroids_; + uint64_t total_weight_; + size_t buffer_capacity_; + vector_centroid buffer_; + uint64_t buffered_weight_; + + void merge_new_values(); + void merge_new_values(bool force, uint16_t k); + void merge_new_values(vector_centroid& centroids, uint64_t weight, uint16_t k, bool reverse); +}; + +} /* namespace datasketches */ + +#include "tdigest_impl.hpp" + +#endif // _TDIGEST_HPP_ diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp new file mode 100644 index 00000000..5f57e854 --- /dev/null +++ b/tdigest/include/tdigest_impl.hpp @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _TDIGEST_IMPL_HPP_ +#define _TDIGEST_IMPL_HPP_ + +#include +#include + +namespace datasketches { + +template +tdigest::tdigest(uint16_t k, const A& allocator): +allocator_(allocator), +k_(k), +internal_k_(k), +merge_count_(0), +min_(std::numeric_limits::infinity()), +max_(-std::numeric_limits::infinity()), +centroids_capacity_(0), +centroids_(allocator), +total_weight_(0), +buffer_capacity_(0), +buffer_(allocator), +buffered_weight_(0) +{ + if (k < 10) throw std::invalid_argument("k must be at least 10"); + size_t fudge = 0; + if (USE_WEIGHT_LIMIT) { + fudge = 10; + if (k < 30) fudge +=20; + } + centroids_capacity_ = 2 * k_ + fudge; + buffer_capacity_ = 5 * centroids_capacity_; + double scale = std::max(1.0, static_cast(buffer_capacity_) / centroids_capacity_ - 1.0); + if (!USE_TWO_LEVEL_COMPRESSION) scale = 1; + internal_k_ = std::ceil(std::sqrt(scale) * k_); + if (centroids_capacity_ < internal_k_ + fudge) { + centroids_capacity_ = internal_k_ + fudge; + } + if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_; +} + +template +void tdigest::update(T value) { + // check for NaN + if (buffer_.size() >= buffer_capacity_ - centroids_.size() - 1) merge_new_values(); // - 1 for compatibility with Java + buffer_.push_back(centroid(value, 1)); + ++buffered_weight_; + min_ = std::min(min_, value); + max_ = std::max(max_, value); +} + +template +void tdigest::merge(tdigest& other) { +} + +template +void tdigest::compress() { + merge_new_values(true, k_); +} + +template +bool tdigest::is_empty() const { + return centroids_.empty() && buffer_.empty(); +} + +template +T tdigest::get_min_value() const { + if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); + return min_; +} + +template +T tdigest::get_max_value() const { + if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); + return max_; +} + +template +uint64_t tdigest::get_total_weight() const { + return total_weight_ + buffered_weight_; +} + +template +double tdigest::get_rank(T value) const { + if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); + // check for NaN + if (value < min_) return 0; + if (value > max_) return 1; + // one centroid and value == min_ == max_ + if ((centroids_.size() + buffer_.size()) == 1) return 0.5; + + const_cast(this)->merge_new_values(); // side effect + + // left tail + const T first_mean = centroids_.front().get_mean(); + if (value < first_mean) { + if (first_mean - min_ > 0) { + if (value == min_) return 0.5 / total_weight_; + return (1.0 + (value - min_) / (first_mean - min_) * (centroids_.front().get_weight() / 2.0 - 1.0)); // ? + } + return 0; // should never happen + } + + // right tail + const T last_mean = centroids_.back().get_mean(); + if (value > last_mean) { + if (max_ - last_mean > 0) { + if (value == max_) return 1.0 - 0.5 / total_weight_; + return 1 - ((1 + (max_ - value) / (max_ - last_mean) * (centroids_.back().get_weight() / 2.0 - 1.0)) / total_weight_); // ? + } + return 1; // should never happen + } + + auto lower = std::lower_bound(centroids_.begin(), centroids_.end(), centroid(value, 1), centroid_cmp(false)); + if (lower == centroids_.end()) throw std::logic_error("lower == end in get_rank()"); + auto upper = std::upper_bound(lower, centroids_.end(), centroid(value, 1), centroid_cmp(false)); + if (upper == centroids_.begin()) throw std::logic_error("upper == begin in get_rank()"); + if (value < lower->get_mean()) --lower; + if (upper == centroids_.end() || (upper != centroids_.begin() && !((upper - 1)->get_mean() < value))) --upper; + double weight_below = 0; + auto it = centroids_.begin(); + while (it != lower) { + weight_below += it->get_weight(); + ++it; + } + weight_below += lower->get_weight() / 2.0; + double weight_delta = 0; + while (it != upper) { + weight_delta += it->get_weight(); + ++it; + } + weight_delta -= lower->get_weight() / 2.0; + weight_delta += upper->get_weight() / 2.0; + if (upper->get_mean() - lower->get_mean() > 0) { + return (weight_below + weight_delta * (value - lower->get_mean()) / (upper->get_mean() - lower->get_mean())) / total_weight_; + } + return (weight_below + weight_delta / 2.0) / total_weight_; +} + +template +T tdigest::get_quantile(double rank) const { + if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); + return 0; +} + +template +uint16_t tdigest::get_k() const { + return k_; +} + +template +string tdigest::to_string(bool print_centroids) const { + // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements. + // The stream does not support passing an allocator instance, and alternatives are complicated. + std::ostringstream os; + os << "### t-Digest summary:" << std::endl; + os << " Nominal k : " << k_ << std::endl; + os << " Internal k : " << internal_k_ << std::endl; + os << " Centroids : " << centroids_.size() << std::endl; + os << " Buffered : " << buffer_.size() << std::endl; + os << " Centroids capacity : " << centroids_capacity_ << std::endl; + os << " Buffer capacity : " << buffer_capacity_ << std::endl; + os << " Total Weight : " << total_weight_ << std::endl; + os << " Buffered Weight : " << buffered_weight_ << std::endl; + os << " Merge count : " << merge_count_ << std::endl; + if (!is_empty()) { + os << " Min : " << min_ << std::endl; + os << " Max : " << max_ << std::endl; + } + os << "### End t-Digest summary" << std::endl; + if (print_centroids) { + os << "Centroids:" << std::endl; + int i = 0; + for (auto centroid: centroids_) { + os << i << ": " << centroid.get_mean() << ", " << centroid.get_weight() << std::endl; + ++i; + } + } + return string(os.str().c_str(), allocator_); +} + +template +void tdigest::merge_new_values() { + merge_new_values(false, internal_k_); +} + +template +void tdigest::merge_new_values(bool force, uint16_t k) { + if (total_weight_ == 0 && buffered_weight_ == 0) return; + if (force || buffered_weight_ > 0) { + merge_new_values(buffer_, buffered_weight_, k, USE_ALTERNATING_SORT & (merge_count_ & 1)); + ++merge_count_; + buffer_.clear(); + buffered_weight_ = 0; + } +} + +template +void tdigest::merge_new_values(vector_centroid& incoming_centroids, uint64_t weight, uint16_t k, bool reverse) { + for (const auto& centroid: centroids_) incoming_centroids.push_back(centroid); + centroids_.clear(); + std::stable_sort(incoming_centroids.begin(), incoming_centroids.end(), centroid_cmp(reverse)); + total_weight_ += weight; + auto it = incoming_centroids.begin(); + centroids_.push_back(*it); + ++it; + double weight_so_far = 0; + const double normalizer = scale_function().normalizer(k, total_weight_); + double k1 = scale_function().k(0, normalizer); + double w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer); + while (it != incoming_centroids.end()) { + const double proposed_weight = centroids_.back().get_weight() + it->get_weight(); + const double projected_weight = weight_so_far + proposed_weight; + bool add_this; + if (USE_WEIGHT_LIMIT) { + const double q0 = weight_so_far / total_weight_; + const double q2 = (weight_so_far + proposed_weight) / total_weight_; + add_this = proposed_weight <= total_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer)); + } else { + add_this = projected_weight <= w_limit; + } + if (std::distance(incoming_centroids.begin(), it) == 1 || std::distance(incoming_centroids.end(), it) == 1) { + add_this = false; + } + if (add_this) { + centroids_.back().add(*it); + } else { + weight_so_far += centroids_.back().get_weight(); + if (!USE_WEIGHT_LIMIT) { + k1 = scale_function().k(weight_so_far / total_weight_, normalizer); + w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer); + } + centroids_.push_back(*it); + } + ++it; + } + if (reverse) std::reverse(centroids_.begin(), centroids_.end()); + if (total_weight_ > 0) { + min_ = std::min(min_, centroids_.front().get_mean()); + max_ = std::max(max_, centroids_.back().get_mean()); + } +} + +} /* namespace datasketches */ + +#endif // _TDIGEST_IMPL_HPP_ diff --git a/tdigest/test/CMakeLists.txt b/tdigest/test/CMakeLists.txt new file mode 100644 index 00000000..2382c706 --- /dev/null +++ b/tdigest/test/CMakeLists.txt @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_executable(tdigest_test) + +target_link_libraries(tdigest_test tdigest common common_test_lib) + +set_target_properties(tdigest_test PROPERTIES + CXX_STANDARD_REQUIRED YES +) + +file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}" tdigest_TEST_BINARY_PATH) +string(APPEND tdigest_TEST_BINARY_PATH "/") +target_compile_definitions(tdigest_test + PRIVATE + TEST_BINARY_INPUT_PATH="${tdigest_TEST_BINARY_PATH}" +) + +add_test( + NAME tdigest_test + COMMAND tdigest_test +) + +target_sources(tdigest_test + PRIVATE + tdigest_test.cpp +) diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp new file mode 100644 index 00000000..e90faa70 --- /dev/null +++ b/tdigest/test/tdigest_test.cpp @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "tdigest.hpp" + +namespace datasketches { + +TEST_CASE("empty", "[tdigest]") { + tdigest_double td(10); +// std::cout << td.to_string(); + REQUIRE(td.is_empty()); + REQUIRE(td.get_k() == 10); + REQUIRE(td.get_total_weight() == 0); + REQUIRE_THROWS_AS(td.get_min_value(), std::runtime_error); + REQUIRE_THROWS_AS(td.get_max_value(), std::runtime_error); + REQUIRE_THROWS_AS(td.get_rank(0), std::runtime_error); + REQUIRE_THROWS_AS(td.get_quantile(0.5), std::runtime_error); +} + +TEST_CASE("one value", "[tdigest]") { + tdigest_double td(100); + td.update(1); + REQUIRE(td.get_k() == 100); + REQUIRE(td.get_total_weight() == 1); + REQUIRE(td.get_min_value() == 1); + REQUIRE(td.get_max_value() == 1); + REQUIRE(td.get_rank(0.99) == 0); + REQUIRE(td.get_rank(1) == 0.5); + REQUIRE(td.get_rank(1.01) == 1); +} + +TEST_CASE("many values", "[tdigest]") { + const size_t n = 10000; + tdigest_double td(100); + for (size_t i = 0; i < n; ++i) td.update(i); +// td.compress(); +// std::cout << td.to_string(true); + REQUIRE_FALSE(td.is_empty()); + REQUIRE(td.get_total_weight() == n); + REQUIRE(td.get_min_value() == 0); + REQUIRE(td.get_max_value() == n - 1); + REQUIRE(td.get_rank(0) == Approx(0).margin(0.0001)); + REQUIRE(td.get_rank(n / 4) == Approx(0.25).margin(0.0001)); + REQUIRE(td.get_rank(n / 2) == Approx(0.5).margin(0.0001)); + REQUIRE(td.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001)); + REQUIRE(td.get_rank(n) == 1); +} + +TEST_CASE("rank - two values", "[tdigest]") { + tdigest_double td(100); + td.update(1); + td.update(2); +// td.compress(); +// std::cout << td.to_string(true); + REQUIRE(td.get_rank(0.99) == 0); + REQUIRE(td.get_rank(1) == 0.25); + REQUIRE(td.get_rank(1.25) == 0.375); + REQUIRE(td.get_rank(1.5) == 0.5); + REQUIRE(td.get_rank(1.75) == 0.625); + REQUIRE(td.get_rank(2) == 0.75); + REQUIRE(td.get_rank(2.01) == 1); +} + +TEST_CASE("rank - repeated value", "[tdigest]") { + tdigest_double td(100); + td.update(1); + td.update(1); + td.update(1); + td.update(1); +// td.compress(); +// std::cout << td.to_string(true); + REQUIRE(td.get_rank(0.99) == 0); + REQUIRE(td.get_rank(1) == 0.5); + REQUIRE(td.get_rank(1.01) == 1); +} + +TEST_CASE("rank - repeated block", "[tdigest]") { + tdigest_double td(100); + td.update(1); + td.update(2); + td.update(2); + td.update(3); +// td.compress(); +// std::cout << td.to_string(true); + REQUIRE(td.get_rank(0.99) == 0); + REQUIRE(td.get_rank(1) == 0.125); + REQUIRE(td.get_rank(2) == 0.5); + REQUIRE(td.get_rank(3) == 0.875); + REQUIRE(td.get_rank(3.01) == 1); +} + +} /* namespace datasketches */ From 1a9667ba6d01f98b7adf6f08ea9787d36b8222c8 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Wed, 10 Jan 2024 13:37:33 -0800 Subject: [PATCH 2/6] implemented merge --- tdigest/include/tdigest.hpp | 2 +- tdigest/include/tdigest_impl.hpp | 39 +++++++++++++++++++++----------- tdigest/test/tdigest_test.cpp | 39 ++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 14 deletions(-) diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp index 8ebd13ce..18803668 100644 --- a/tdigest/include/tdigest.hpp +++ b/tdigest/include/tdigest.hpp @@ -192,7 +192,7 @@ class tdigest { void merge_new_values(); void merge_new_values(bool force, uint16_t k); - void merge_new_values(vector_centroid& centroids, uint64_t weight, uint16_t k, bool reverse); + void merge_new_values(uint16_t k); }; } /* namespace datasketches */ diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp index 5f57e854..b6f639cd 100644 --- a/tdigest/include/tdigest_impl.hpp +++ b/tdigest/include/tdigest_impl.hpp @@ -55,6 +55,8 @@ buffered_weight_(0) centroids_capacity_ = internal_k_ + fudge; } if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_; + centroids_.reserve(centroids_capacity_); + buffer_.reserve(buffer_capacity_); } template @@ -69,6 +71,18 @@ void tdigest::update(T value) { template void tdigest::merge(tdigest& other) { + if (other.is_empty()) return; + size_t num = buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size(); + buffer_.reserve(num); + std::copy(other.buffer_.begin(), other.buffer_.end(), std::back_inserter(buffer_)); + std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_)); + buffered_weight_ += other.get_total_weight(); + if (num > buffer_capacity_) { + merge_new_values(internal_k_); + } else { + min_ = std::min(min_, other.get_min_value()); + max_ = std::max(max_, other.get_max_value()); + } } template @@ -205,28 +219,24 @@ void tdigest::merge_new_values() { template void tdigest::merge_new_values(bool force, uint16_t k) { if (total_weight_ == 0 && buffered_weight_ == 0) return; - if (force || buffered_weight_ > 0) { - merge_new_values(buffer_, buffered_weight_, k, USE_ALTERNATING_SORT & (merge_count_ & 1)); - ++merge_count_; - buffer_.clear(); - buffered_weight_ = 0; - } + if (force || buffered_weight_ > 0) merge_new_values(k); } template -void tdigest::merge_new_values(vector_centroid& incoming_centroids, uint64_t weight, uint16_t k, bool reverse) { - for (const auto& centroid: centroids_) incoming_centroids.push_back(centroid); +void tdigest::merge_new_values(uint16_t k) { + const bool reverse = USE_ALTERNATING_SORT & (merge_count_ & 1); + for (const auto& centroid: centroids_) buffer_.push_back(centroid); centroids_.clear(); - std::stable_sort(incoming_centroids.begin(), incoming_centroids.end(), centroid_cmp(reverse)); - total_weight_ += weight; - auto it = incoming_centroids.begin(); + std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp(reverse)); + total_weight_ += buffered_weight_; + auto it = buffer_.begin(); centroids_.push_back(*it); ++it; double weight_so_far = 0; const double normalizer = scale_function().normalizer(k, total_weight_); double k1 = scale_function().k(0, normalizer); double w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer); - while (it != incoming_centroids.end()) { + while (it != buffer_.end()) { const double proposed_weight = centroids_.back().get_weight() + it->get_weight(); const double projected_weight = weight_so_far + proposed_weight; bool add_this; @@ -237,7 +247,7 @@ void tdigest::merge_new_values(vector_centroid& incoming_centroids, uint64 } else { add_this = projected_weight <= w_limit; } - if (std::distance(incoming_centroids.begin(), it) == 1 || std::distance(incoming_centroids.end(), it) == 1) { + if (std::distance(buffer_.begin(), it) == 1 || std::distance(buffer_.end(), it) == 1) { add_this = false; } if (add_this) { @@ -257,6 +267,9 @@ void tdigest::merge_new_values(vector_centroid& incoming_centroids, uint64 min_ = std::min(min_, centroids_.front().get_mean()); max_ = std::max(max_, centroids_.back().get_mean()); } + ++merge_count_; + buffer_.clear(); + buffered_weight_ = 0; } } /* namespace datasketches */ diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp index e90faa70..08fbe801 100644 --- a/tdigest/test/tdigest_test.cpp +++ b/tdigest/test/tdigest_test.cpp @@ -108,4 +108,43 @@ TEST_CASE("rank - repeated block", "[tdigest]") { REQUIRE(td.get_rank(3.01) == 1); } +TEST_CASE("merge small", "[tdigest]") { + tdigest_double td1(10); + td1.update(1); + td1.update(2); + tdigest_double td2(10); + td2.update(2); + td2.update(3); + td1.merge(td2); + REQUIRE(td1.get_min_value() == 1); + REQUIRE(td1.get_max_value() == 3); + REQUIRE(td1.get_total_weight() == 4); + REQUIRE(td1.get_rank(0.99) == 0); + REQUIRE(td1.get_rank(1) == 0.125); + REQUIRE(td1.get_rank(2) == 0.5); + REQUIRE(td1.get_rank(3) == 0.875); + REQUIRE(td1.get_rank(3.01) == 1); +} + +TEST_CASE("merge large", "[tdigest]") { + const size_t n = 10000; + tdigest_double td1(100); + tdigest_double td2(100); + for (size_t i = 0; i < n / 2; ++i) { + td1.update(i); + td2.update(n / 2 + i); + } + td1.merge(td2); +// td1.compress(); +// std::cout << td1.to_string(true); + REQUIRE(td1.get_total_weight() == n); + REQUIRE(td1.get_min_value() == 0); + REQUIRE(td1.get_max_value() == n - 1); + REQUIRE(td1.get_rank(0) == Approx(0).margin(0.0001)); + REQUIRE(td1.get_rank(n / 4) == Approx(0.25).margin(0.0001)); + REQUIRE(td1.get_rank(n / 2) == Approx(0.5).margin(0.0001)); + REQUIRE(td1.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001)); + REQUIRE(td1.get_rank(n) == 1); +} + } /* namespace datasketches */ From 1540be6bd81a81aa3eebb9c7f7144255976ecc64 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Thu, 11 Jan 2024 18:23:24 -0800 Subject: [PATCH 3/6] implemented get_quantile(rank) --- tdigest/include/tdigest.hpp | 2 ++ tdigest/include/tdigest_impl.hpp | 50 +++++++++++++++++++++++++++++++- tdigest/test/tdigest_test.cpp | 8 +++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp index 18803668..70c737ee 100644 --- a/tdigest/include/tdigest.hpp +++ b/tdigest/include/tdigest.hpp @@ -193,6 +193,8 @@ class tdigest { void merge_new_values(); void merge_new_values(bool force, uint16_t k); void merge_new_values(uint16_t k); + + static double weighted_average(double x1, double w1, double x2, double w2); }; } /* namespace datasketches */ diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp index b6f639cd..aeb6157a 100644 --- a/tdigest/include/tdigest_impl.hpp +++ b/tdigest/include/tdigest_impl.hpp @@ -172,7 +172,50 @@ double tdigest::get_rank(T value) const { template T tdigest::get_quantile(double rank) const { if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); - return 0; + if ((rank < 0.0) || (rank > 1.0)) { + throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1"); + } + const_cast(this)->merge_new_values(); // side effect + if (centroids_.size() == 1) return centroids_.front().get_mean(); + + // at least 2 centroids + const double weight = rank * total_weight_; + if (weight < 1) return min_; + if (weight > total_weight_ - 1.0) return max_; + const double first_weight = centroids_.front().get_weight(); + if (first_weight > 1 && weight < first_weight / 2.0) { + return min_ + (weight - 1.0) / (first_weight / 2.0 - 1.0) * (centroids_.front().get_mean() - min_); + } + const double last_weight = centroids_.back().get_weight(); + if (last_weight > 1 && total_weight_ - weight <= last_weight / 2.0) { + return max_ + (total_weight_ - weight - 1.0) / (last_weight / 2.0 - 1.0) * (max_ - centroids_.back().get_mean()); + } + + // interpolate between extremes + double weight_so_far = first_weight / 2.0; + for (size_t i = 0; i < centroids_.size() - 1; ++i) { + const double dw = (centroids_[i].get_weight() + centroids_[i + 1].get_weight()) / 2.0; + if (weight_so_far + dw > weight) { + // the target weight is between centroids i and i+1 + double left_weight = 0; + if (centroids_[i].get_weight() == 1) { + if (weight - weight_so_far < 0.5) return centroids_[i].get_mean(); + left_weight = 0.5; + } + double right_weight = 0; + if (centroids_[i + 1].get_weight() == 1) { + if (weight_so_far + dw - weight <= 0.5) return centroids_[i + 1].get_mean(); + right_weight = 0.5; + } + const double w1 = weight - weight_so_far - left_weight; + const double w2 = weight_so_far + dw - weight - right_weight; + return weighted_average(centroids_[i].get_mean(), w1, centroids_[i + 1].get_mean(), w2); + } + weight_so_far += dw; + } + const double w1 = weight - total_weight_ - centroids_.back().get_weight() / 2.0; + const double w2 = centroids_.back().get_weight() / 2.0 - w1; + return weighted_average(centroids_.back().get_weight(), w1, max_, w2); } template @@ -272,6 +315,11 @@ void tdigest::merge_new_values(uint16_t k) { buffered_weight_ = 0; } +template +double tdigest::weighted_average(double x1, double w1, double x2, double w2) { + return (x1 * w1 + x2 * w2) / (w1 + w2); +} + } /* namespace datasketches */ #endif // _TDIGEST_IMPL_HPP_ diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp index 08fbe801..678afca2 100644 --- a/tdigest/test/tdigest_test.cpp +++ b/tdigest/test/tdigest_test.cpp @@ -46,6 +46,9 @@ TEST_CASE("one value", "[tdigest]") { REQUIRE(td.get_rank(0.99) == 0); REQUIRE(td.get_rank(1) == 0.5); REQUIRE(td.get_rank(1.01) == 1); + REQUIRE(td.get_quantile(0) == 1); + REQUIRE(td.get_quantile(0.5) == 1); + REQUIRE(td.get_quantile(1) == 1); } TEST_CASE("many values", "[tdigest]") { @@ -63,6 +66,11 @@ TEST_CASE("many values", "[tdigest]") { REQUIRE(td.get_rank(n / 2) == Approx(0.5).margin(0.0001)); REQUIRE(td.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001)); REQUIRE(td.get_rank(n) == 1); + REQUIRE(td.get_quantile(0) == 0); + REQUIRE(td.get_quantile(0.5) == Approx(n / 2).epsilon(0.01)); + REQUIRE(td.get_quantile(0.9) == Approx(n * 0.9).epsilon(0.01)); + REQUIRE(td.get_quantile(0.95) == Approx(n * 0.95).epsilon(0.01)); + REQUIRE(td.get_quantile(1) == n - 1); } TEST_CASE("rank - two values", "[tdigest]") { From e98561c33b0f6ef25e2a085b128241b7411b6313 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Tue, 23 Jan 2024 21:04:33 -0800 Subject: [PATCH 4/6] added serialization --- tdigest/include/tdigest.hpp | 45 ++++++- tdigest/include/tdigest_impl.hpp | 210 +++++++++++++++++++++++++------ tdigest/test/tdigest_test.cpp | 74 +++++++++++ 3 files changed, 293 insertions(+), 36 deletions(-) diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp index 70c737ee..357f203b 100644 --- a/tdigest/include/tdigest.hpp +++ b/tdigest/include/tdigest.hpp @@ -96,6 +96,7 @@ class tdigest { uint64_t weight_; }; using vector_centroid = std::vector::template rebind_alloc>; + using vector_bytes = std::vector::template rebind_alloc>; struct centroid_cmp { centroid_cmp(bool reverse): reverse_(reverse) {} @@ -176,11 +177,43 @@ class tdigest { */ string to_string(bool print_centroids = false) const; + /** + * This method serializes t-Digest into a given stream in a binary form + * @param os output stream + */ + void serialize(std::ostream& os) const; + + /** + * This method serializes t-Digest as a vector of bytes. + * An optional header can be reserved in front of the sketch. + * It is an uninitialized space of a given size. + * @param header_size_bytes space to reserve in front of the sketch + * @return serialized sketch as a vector of bytes + */ + vector_bytes serialize(unsigned header_size_bytes = 0) const; + + /** + * This method deserializes t-Digest from a given stream. + * @param is input stream + * @param allocator instance of an Allocator + * @return an instance of t-Digest + */ + static tdigest deserialize(std::istream& is, const Allocator& allocator = Allocator()); + + /** + * This method deserializes t-Digest from a given array of bytes. + * @param bytes pointer to the array of bytes + * @param size the size of the array + * @param allocator instance of an Allocator + * @return an instance of t-Digest + */ + static tdigest deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator()); + private: Allocator allocator_; + bool reverse_merge_; uint16_t k_; uint16_t internal_k_; - uint32_t merge_count_; T min_; T max_; size_t centroids_capacity_; @@ -190,6 +223,16 @@ class tdigest { vector_centroid buffer_; uint64_t buffered_weight_; + static const uint8_t PREAMBLE_LONGS_EMPTY = 1; + static const uint8_t PREAMBLE_LONGS_NON_EMPTY = 2; + static const uint8_t SERIAL_VERSION = 1; + static const uint8_t SKETCH_TYPE = 20; + + enum flags { IS_EMPTY, REVERSE_MERGE }; + + // for deserialize + tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, const Allocator& allocator); + void merge_new_values(); void merge_new_values(bool force, uint16_t k); void merge_new_values(uint16_t k); diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp index aeb6157a..c7128ce9 100644 --- a/tdigest/include/tdigest_impl.hpp +++ b/tdigest/include/tdigest_impl.hpp @@ -23,46 +23,20 @@ #include #include +#include "common_defs.hpp" +#include "memory_operations.hpp" + namespace datasketches { template tdigest::tdigest(uint16_t k, const A& allocator): -allocator_(allocator), -k_(k), -internal_k_(k), -merge_count_(0), -min_(std::numeric_limits::infinity()), -max_(-std::numeric_limits::infinity()), -centroids_capacity_(0), -centroids_(allocator), -total_weight_(0), -buffer_capacity_(0), -buffer_(allocator), -buffered_weight_(0) -{ - if (k < 10) throw std::invalid_argument("k must be at least 10"); - size_t fudge = 0; - if (USE_WEIGHT_LIMIT) { - fudge = 10; - if (k < 30) fudge +=20; - } - centroids_capacity_ = 2 * k_ + fudge; - buffer_capacity_ = 5 * centroids_capacity_; - double scale = std::max(1.0, static_cast(buffer_capacity_) / centroids_capacity_ - 1.0); - if (!USE_TWO_LEVEL_COMPRESSION) scale = 1; - internal_k_ = std::ceil(std::sqrt(scale) * k_); - if (centroids_capacity_ < internal_k_ + fudge) { - centroids_capacity_ = internal_k_ + fudge; - } - if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_; - centroids_.reserve(centroids_capacity_); - buffer_.reserve(buffer_capacity_); -} +tdigest(false, k, std::numeric_limits::infinity(), -std::numeric_limits::infinity(), vector_centroid(allocator), 0, allocator) +{} template void tdigest::update(T value) { // check for NaN - if (buffer_.size() >= buffer_capacity_ - centroids_.size() - 1) merge_new_values(); // - 1 for compatibility with Java + if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_new_values(); buffer_.push_back(centroid(value, 1)); ++buffered_weight_; min_ = std::min(min_, value); @@ -237,7 +211,6 @@ string tdigest::to_string(bool print_centroids) const { os << " Buffer capacity : " << buffer_capacity_ << std::endl; os << " Total Weight : " << total_weight_ << std::endl; os << " Buffered Weight : " << buffered_weight_ << std::endl; - os << " Merge count : " << merge_count_ << std::endl; if (!is_empty()) { os << " Min : " << min_ << std::endl; os << " Max : " << max_ << std::endl; @@ -267,7 +240,7 @@ void tdigest::merge_new_values(bool force, uint16_t k) { template void tdigest::merge_new_values(uint16_t k) { - const bool reverse = USE_ALTERNATING_SORT & (merge_count_ & 1); + const bool reverse = USE_ALTERNATING_SORT & reverse_merge_; for (const auto& centroid: centroids_) buffer_.push_back(centroid); centroids_.clear(); std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp(reverse)); @@ -310,7 +283,7 @@ void tdigest::merge_new_values(uint16_t k) { min_ = std::min(min_, centroids_.front().get_mean()); max_ = std::max(max_, centroids_.back().get_mean()); } - ++merge_count_; + reverse_merge_ = !reverse_merge_; buffer_.clear(); buffered_weight_ = 0; } @@ -320,6 +293,173 @@ double tdigest::weighted_average(double x1, double w1, double x2, double w return (x1 * w1 + x2 * w2) / (w1 + w2); } +template +void tdigest::serialize(std::ostream& os) const { + const_cast(this)->merge_new_values(); // side effect + write(os, is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY); + write(os, SERIAL_VERSION); + write(os, SKETCH_TYPE); + write(os, k_); + const uint8_t flags_byte( + (is_empty() ? 1 << flags::IS_EMPTY : 0) | + (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0) + ); + write(os, flags_byte); + write(os, 0); // unused + + if (is_empty()) return; + + write(os, static_cast(centroids_.size())); + write(os, 0); // unused + + write(os, min_); + write(os, max_); + write(os, centroids_.data(), centroids_.size() * sizeof(centroid)); +} + +template +auto tdigest::serialize(unsigned header_size_bytes) const -> vector_bytes { + const_cast(this)->merge_new_values(); // side effect + const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY; + const size_t size_bytes = preamble_longs * sizeof(uint64_t) + sizeof(T) * 2 + sizeof(centroid) * centroids_.size(); + vector_bytes bytes(size_bytes, 0, allocator_); + uint8_t* ptr = bytes.data() + header_size_bytes; + + *ptr++ = preamble_longs; + *ptr++ = SERIAL_VERSION; + *ptr++ = SKETCH_TYPE; + ptr += copy_to_mem(k_, ptr); + const uint8_t flags_byte( + (is_empty() ? 1 << flags::IS_EMPTY : 0) | + (reverse_merge_ ? 1 << flags::REVERSE_MERGE : 0) + ); + *ptr++ = flags_byte; + ptr += 2; // unused + if (is_empty()) return bytes; + + ptr += copy_to_mem(static_cast(centroids_.size()), ptr); + ptr += 4; // unused + + ptr += copy_to_mem(min_, ptr); + ptr += copy_to_mem(max_, ptr); + copy_to_mem(centroids_.data(), ptr, centroids_.size() * sizeof(centroid)); + return bytes; +} + +template +tdigest tdigest::deserialize(std::istream& is, const A& allocator) { + const auto preamble_longs = read(is); + const auto serial_version = read(is); + const auto sketch_type = read(is); + if (sketch_type != SKETCH_TYPE) { + throw std::invalid_argument("sketch type mismatch: expected " + std::to_string(SKETCH_TYPE) + ", actual " + std::to_string(sketch_type)); + } + if (serial_version != SERIAL_VERSION) { + throw std::invalid_argument("serial version mismatch: expected " + std::to_string(SERIAL_VERSION) + ", actual " + std::to_string(serial_version)); + } + const auto k = read(is); + const auto flags_byte = read(is); + const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); + const uint8_t expected_preamble_longs = is_empty ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY; + if (preamble_longs != expected_preamble_longs) { + throw std::invalid_argument("preamble longs mismatch: expected " + std::to_string(expected_preamble_longs) + ", actual " + std::to_string(preamble_longs)); + } + read(is); // unused + + if (is_empty) return tdigest(k, allocator); + + const auto num_centroids = read(is); + read(is); // unused + + const T min = read(is); + const T max = read(is); + vector_centroid centroids(num_centroids, centroid(0, 0), allocator); + read(is, centroids.data(), num_centroids * sizeof(centroid)); + uint64_t total_weight = 0; + for (const auto& c: centroids) total_weight += c.get_weight(); + const bool reverse_merge = flags_byte & (1 << flags::REVERSE_MERGE); + return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator); +} + +template +tdigest tdigest::deserialize(const void* bytes, size_t size, const A& allocator) { + ensure_minimum_memory(size, 8); + const char* ptr = static_cast(bytes); + const char* end_ptr = static_cast(bytes) + size; + + const uint8_t preamble_longs = *ptr++; + const uint8_t serial_version = *ptr++; + const uint8_t sketch_type = *ptr++; + if (sketch_type != SKETCH_TYPE) { + throw std::invalid_argument("sketch type mismatch: expected " + std::to_string(SKETCH_TYPE) + ", actual " + std::to_string(sketch_type)); + } + if (serial_version != SERIAL_VERSION) { + throw std::invalid_argument("serial version mismatch: expected " + std::to_string(SERIAL_VERSION) + ", actual " + std::to_string(serial_version)); + } + uint16_t k; + ptr += copy_from_mem(ptr, k); + const uint8_t flags_byte = *ptr++; + const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); + const uint8_t expected_preamble_longs = is_empty ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY; + if (preamble_longs != expected_preamble_longs) { + throw std::invalid_argument("preamble longs mismatch: expected " + std::to_string(expected_preamble_longs) + ", actual " + std::to_string(preamble_longs)); + } + ptr += 2; // unused + + if (is_empty) return tdigest(k, allocator); + + ensure_minimum_memory(end_ptr - ptr, 8); + uint32_t num_centroids; + ptr += copy_from_mem(ptr, num_centroids); + ptr += 4; // unused + + ensure_minimum_memory(end_ptr - ptr, sizeof(T) * 2 + sizeof(centroid) * num_centroids); + T min; + ptr += copy_from_mem(ptr, min); + T max; + ptr += copy_from_mem(ptr, max); + vector_centroid centroids(num_centroids, centroid(0, 0), allocator); + copy_from_mem(ptr, centroids.data(), sizeof(centroid) * num_centroids); + uint64_t total_weight = 0; + for (const auto& c: centroids) total_weight += c.get_weight(); + const bool reverse_merge = flags_byte & (1 << flags::REVERSE_MERGE); + return tdigest(reverse_merge, k, min, max, std::move(centroids), total_weight, allocator); +} + +template +tdigest::tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight, const A& allocator): +allocator_(allocator), +reverse_merge_(reverse_merge), +k_(k), +internal_k_(k), +min_(min), +max_(max), +centroids_capacity_(0), +centroids_(std::move(centroids)), +total_weight_(total_weight), +buffer_capacity_(0), +buffer_(allocator), +buffered_weight_(0) +{ + if (k < 10) throw std::invalid_argument("k must be at least 10"); + size_t fudge = 0; + if (USE_WEIGHT_LIMIT) { + fudge = 10; + if (k < 30) fudge +=20; + } + centroids_capacity_ = 2 * k_ + fudge; + buffer_capacity_ = 5 * centroids_capacity_; + double scale = std::max(1.0, static_cast(buffer_capacity_) / centroids_capacity_ - 1.0); + if (!USE_TWO_LEVEL_COMPRESSION) scale = 1; + internal_k_ = std::ceil(std::sqrt(scale) * k_); + if (centroids_capacity_ < internal_k_ + fudge) { + centroids_capacity_ = internal_k_ + fudge; + } + if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_; + centroids_.reserve(centroids_capacity_); + buffer_.reserve(buffer_capacity_); +} + } /* namespace datasketches */ #endif // _TDIGEST_IMPL_HPP_ diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp index 678afca2..b1627d58 100644 --- a/tdigest/test/tdigest_test.cpp +++ b/tdigest/test/tdigest_test.cpp @@ -155,4 +155,78 @@ TEST_CASE("merge large", "[tdigest]") { REQUIRE(td1.get_rank(n) == 1); } +TEST_CASE("serialize deserialize stream empty", "[tdigest]") { + tdigest td(100); + std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); + td.serialize(s); + auto deserialized_td = tdigest::deserialize(s); + REQUIRE(td.get_k() == deserialized_td.get_k()); + REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight()); + REQUIRE(td.is_empty() == deserialized_td.is_empty()); +} + +TEST_CASE("serialize deserialize stream non empty", "[tdigest]") { + tdigest td(100); + for (int i = 0; i < 1000; ++i) td.update(i); + std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); + td.serialize(s); + auto deserialized_td = tdigest::deserialize(s); + REQUIRE(td.get_k() == deserialized_td.get_k()); + REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight()); + REQUIRE(td.is_empty() == deserialized_td.is_empty()); + REQUIRE(td.get_min_value() == deserialized_td.get_min_value()); + REQUIRE(td.get_max_value() == deserialized_td.get_max_value()); + REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500)); + REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5)); +} + +TEST_CASE("serialize deserialize bytes empty", "[tdigest]") { + tdigest td(100); + auto bytes = td.serialize(); + auto deserialized_td = tdigest::deserialize(bytes.data(), bytes.size()); + REQUIRE(td.get_k() == deserialized_td.get_k()); + REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight()); + REQUIRE(td.is_empty() == deserialized_td.is_empty()); +} + +TEST_CASE("serialize deserialize bytes non empty", "[tdigest]") { + tdigest td(100); + for (int i = 0; i < 1000; ++i) td.update(i); + auto bytes = td.serialize(); + auto deserialized_td = tdigest::deserialize(bytes.data(), bytes.size()); + REQUIRE(td.get_k() == deserialized_td.get_k()); + REQUIRE(td.get_total_weight() == deserialized_td.get_total_weight()); + REQUIRE(td.is_empty() == deserialized_td.is_empty()); + REQUIRE(td.get_min_value() == deserialized_td.get_min_value()); + REQUIRE(td.get_max_value() == deserialized_td.get_max_value()); + REQUIRE(td.get_rank(500) == deserialized_td.get_rank(500)); + REQUIRE(td.get_quantile(0.5) == deserialized_td.get_quantile(0.5)); +} + +TEST_CASE("serialize deserialize steam and bytes equivalence", "[tdigest]") { + tdigest td(100); + for (int i = 0; i < 1000; ++i) td.update(i); + std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); + td.serialize(s); + auto bytes = td.serialize(); + + REQUIRE(bytes.size() == static_cast(s.tellp())); + for (size_t i = 0; i < bytes.size(); ++i) { + REQUIRE(((char*)bytes.data())[i] == (char)s.get()); + } + + s.seekg(0); // rewind + auto deserialized_td1 = tdigest::deserialize(s); + auto deserialized_td2 = tdigest::deserialize(bytes.data(), bytes.size()); + REQUIRE(bytes.size() == static_cast(s.tellg())); + + REQUIRE(deserialized_td1.get_k() == deserialized_td2.get_k()); + REQUIRE(deserialized_td1.get_total_weight() == deserialized_td2.get_total_weight()); + REQUIRE(deserialized_td1.is_empty() == deserialized_td2.is_empty()); + REQUIRE(deserialized_td1.get_min_value() == deserialized_td2.get_min_value()); + REQUIRE(deserialized_td1.get_max_value() == deserialized_td2.get_max_value()); + REQUIRE(deserialized_td1.get_rank(500) == deserialized_td2.get_rank(500)); + REQUIRE(deserialized_td1.get_quantile(0.5) == deserialized_td2.get_quantile(0.5)); +} + } /* namespace datasketches */ From e241813e7411e938f7e412616756b3644f819fc7 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Wed, 24 Jan 2024 10:11:35 -0800 Subject: [PATCH 5/6] handle NaN --- tdigest/include/tdigest_impl.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp index c7128ce9..c47202e6 100644 --- a/tdigest/include/tdigest_impl.hpp +++ b/tdigest/include/tdigest_impl.hpp @@ -35,7 +35,7 @@ tdigest(false, k, std::numeric_limits::infinity(), -std::numeric_limits::i template void tdigest::update(T value) { - // check for NaN + if (std::isnan(value)) return; if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_new_values(); buffer_.push_back(centroid(value, 1)); ++buffered_weight_; @@ -89,7 +89,7 @@ uint64_t tdigest::get_total_weight() const { template double tdigest::get_rank(T value) const { if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); - // check for NaN + if (std::isnan(value)) throw std::invalid_argument("operation is undefined for NaN"); if (value < min_) return 0; if (value > max_) return 1; // one centroid and value == min_ == max_ From 7baca9f4a894ecd707c159f61f30dffab629660a Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Mon, 29 Jan 2024 14:21:10 -0800 Subject: [PATCH 6/6] fixed cmake --- tdigest/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tdigest/CMakeLists.txt b/tdigest/CMakeLists.txt index d7deee3d..545f9d43 100755 --- a/tdigest/CMakeLists.txt +++ b/tdigest/CMakeLists.txt @@ -17,7 +17,7 @@ add_library(tdigest INTERFACE) -add_library(${PROJECT_NAME}::QUANTILES ALIAS tdigest) +add_library(${PROJECT_NAME}::TDIGEST ALIAS tdigest) if (BUILD_TESTS) add_subdirectory(test)