From 948fdd6ce196718a70a0d061d53247c1cddfd335 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Mon, 6 Nov 2023 14:47:18 -0800 Subject: [PATCH] [WIP] KeySegmentsExtractor and prototype higher-dimensional filtering Summary: TODO Test Plan: Unit test included --- db/db_bloom_filter_test.cc | 205 +++++++++ db/experimental.cc | 752 +++++++++++++++++++++++++++++++++ include/rocksdb/experimental.h | 237 +++++++++++ 3 files changed, 1194 insertions(+) diff --git a/db/db_bloom_filter_test.cc b/db/db_bloom_filter_test.cc index 9bd5c11b6d05..aca95ad42366 100644 --- a/db/db_bloom_filter_test.cc +++ b/db/db_bloom_filter_test.cc @@ -19,6 +19,7 @@ #include "port/stack_trace.h" #include "rocksdb/advanced_options.h" #include "rocksdb/convenience.h" +#include "rocksdb/experimental.h" #include "rocksdb/filter_policy.h" #include "rocksdb/perf_context.h" #include "rocksdb/statistics.h" @@ -3562,6 +3563,210 @@ TEST_F(DBBloomFilterTest, WeirdPrefixExtractorWithFilter3) { } } +TEST_F(DBBloomFilterTest, SstQueryFilter) { + using experimental::KeySegmentsExtractor; + using experimental::SstQueryFilterConfigs; + using KeyCategorySet = KeySegmentsExtractor::KeyCategorySet; + + struct MySegmentExtractor : public KeySegmentsExtractor { + const char* const name; + uint32_t min_ver; + uint32_t max_ver; + MySegmentExtractor(const char* _name, uint32_t _min_ver, uint32_t _max_ver) + : name(_name), min_ver(_min_ver), max_ver(_max_ver) {} + + const char* Name() const override { return name; } + + Status Extract(const Slice& key_or_bound, KeyKind /*kind*/, + uint32_t version, Result* result) const override { + if (version < min_ver || version > max_ver) { + return Status::InvalidArgument("unsupported version"); + } + size_t len = key_or_bound.size(); + if (len == 0) { + result->category = KeySegmentsExtractor::kReservedLowCategory; + return Status::OK(); + } + if (static_cast(key_or_bound[0]) < + static_cast('0')) { + result->category = KeySegmentsExtractor::kReservedLowCategory; + } + if (static_cast(key_or_bound[0]) > + static_cast('z')) { + result->category = KeySegmentsExtractor::kReservedHighCategory; + } + for (uint32_t i = 0; i < len; ++i) { + if (key_or_bound[i] == '_' || i + 1 == key_or_bound.size()) { + result->segment_ends.push_back(i + 1); + } + } + return Status::OK(); + } + + std::pair GetSupportedVersionRange() const override { + return {min_ver, max_ver}; + } + }; + + auto configs = SstQueryFilterConfigs::MakeShared(); + auto extractor1old = std::make_shared("Ex1", 1, 2); + auto extractor1new = std::make_shared("Ex1", 2, 3); + auto extractor2 = std::make_shared("Ex2", 1, 3); + configs->SetExtractorAndVersion(extractor1old, 1); + // Filter on 2nd field with '_' as delimiter, only for default category + configs->AddMinMax(1, KeyCategorySet{KeySegmentsExtractor::kDefaultCategory}); + // Also filter on 3rd field regardless of category + configs->AddMinMax(2); + + Options options = CurrentOptions(); + options.statistics = CreateDBStatistics(); + options.table_properties_collector_factories.emplace_back( + configs->GetTblPropCollFactory()); + + DestroyAndReopen(options); + + // For lower level file + ASSERT_OK(Put(" ", "val0")); + ASSERT_OK(Put(" _345_678", "val0")); + ASSERT_OK(Put("aaa", "val0")); + ASSERT_OK(Put("abc_123", "val1")); + ASSERT_OK(Put("abc_13", "val2")); + ASSERT_OK(Put("abc_156_987", "val3")); + ASSERT_OK(Put("xyz_145", "val4")); + ASSERT_OK(Put("xyz_167", "val5")); + ASSERT_OK(Put("xyz_178", "val6")); + ASSERT_OK(Put("zzz", "val0")); + ASSERT_OK(Put("~~~", "val0")); + ASSERT_OK(Put("~~~_456_789", "val0")); + + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + + configs->SetExtractorAndVersion(extractor1old, 2); + + // For higher level file + ASSERT_OK(Put(" ", "val0")); + ASSERT_OK(Put(" _345_680", "val0")); + ASSERT_OK(Put("aaa", "val9")); + ASSERT_OK(Put("abc_234", "val1")); + ASSERT_OK(Put("abc_245_567", "val2")); + ASSERT_OK(Put("abc_25", "val3")); + ASSERT_OK(Put("xyz_180", "val4")); + ASSERT_OK(Put("xyz_191", "val4")); + ASSERT_OK(Put("xyz_260", "val4")); + ASSERT_OK(Put("zzz", "val9")); + ASSERT_OK(Put("~~~", "val0")); + ASSERT_OK(Put("~~~_456_790", "val0")); + + ASSERT_OK(Flush()); + + using Keys = std::vector; + auto RangeQueryKeys = [configs, db = db_](std::string lb, std::string ub) { + Slice lb_slice = lb; + Slice ub_slice = ub; + + ReadOptions ro; + ro.iterate_lower_bound = &lb_slice; + ro.iterate_upper_bound = &ub_slice; + ro.table_filter = configs->GetTableFilterForRangeQuery(lb_slice, ub_slice); + auto it = db->NewIterator(ro); + Keys ret; + for (it->Seek(lb_slice); it->Valid(); it->Next()) { + ret.push_back(it->key().ToString()); + } + EXPECT_OK(it->status()); + delete it; + return ret; + }; + + // Control 1: range is not filterable, common prefix + EXPECT_EQ(RangeQueryKeys("abc_150", "abc_249"), + Keys({"abc_156_987", "abc_234", "abc_245_567"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 2); + + // Test 1: range is filterable to just lowest level, fully containing the + // segments in that category + EXPECT_EQ(RangeQueryKeys("abc_100", "abc_179"), + Keys({"abc_123", "abc_13", "abc_156_987"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + // Test 2: range is filterable to just lowest level, partial overlap + EXPECT_EQ(RangeQueryKeys("abc_1500_x_y", "abc_16QQ"), Keys({"abc_156_987"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + // Test 3: range is filterable to just highest level, fully containing the + // segments in that category but would be overlapping the range for the other + // file if the filter included all categories + EXPECT_EQ(RangeQueryKeys("abc_200", "abc_300"), + Keys({"abc_234", "abc_245_567", "abc_25"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + // Test 4: range is filterable to just highest level, partial overlap (etc.) + EXPECT_EQ(RangeQueryKeys("abc_200", "abc_249"), + Keys({"abc_234", "abc_245_567"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + // Test 5: range is filtered from both levels, because of category scope + EXPECT_EQ(RangeQueryKeys("abc_300", "abc_400"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 0); + + // Control 2: range is not filtered because association between 1st and + // 2nd segment is not represented + EXPECT_EQ(RangeQueryKeys("abc_170", "abc_190"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 2); + + // Control 3: range is not filtered because prefixes not represented + EXPECT_EQ(RangeQueryKeys("bcd_170", "bcd_190"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 2); + + // Control 4: range is not filtered because different prefix, prefixes not + // represented + EXPECT_EQ(RangeQueryKeys("abc_500", "abd_501"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 2); + + // TODO: exclusive upper bound tests + + // ======= Testing 3rd segment (cross-category filter) ======= + // Control 5: not filtered because of segment range overlap + EXPECT_EQ(RangeQueryKeys(" z__700", " z__750"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 2); + + // Test 6: filtered on both levels + EXPECT_EQ(RangeQueryKeys(" z__100", " z__300"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 0); + + // Control 6: finding something, with 2nd segment filter helping + EXPECT_EQ(RangeQueryKeys("abc_156_9", "abc_156_99"), Keys({"abc_156_987"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + EXPECT_EQ(RangeQueryKeys("abc_245_56", "abc_245_57"), Keys({"abc_245_567"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + // Test 6: filtered on both levels, for different segments + EXPECT_EQ(RangeQueryKeys("abc_245_900", "abc_245_999"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 0); + + // ======= Testing extractor name and version matching ======= + EXPECT_EQ(RangeQueryKeys("abc_23", "abc_24"), Keys({"abc_234"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + // This extractor does not support version 1 in the lower level file, so + // we can no longer filter there. But we can filter in higher level file. + configs->SetExtractorAndVersion(extractor1new, 2); + + EXPECT_EQ(RangeQueryKeys("abc_23", "abc_24"), Keys({"abc_234"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 2); + + EXPECT_EQ(RangeQueryKeys("abc_120", "abc_125"), Keys({"abc_123"})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 1); + + // But this extractor is completely incompatible, and the code can't + // reconstruct the old one to use. Not able to filter on either level. + configs->SetExtractorAndVersion(extractor2, 2); + + EXPECT_EQ(RangeQueryKeys("abc_300", "abc_400"), Keys({})); + EXPECT_EQ(TestGetAndResetTickerCount(options, NON_LAST_LEVEL_SEEK_DATA), 2); +} } // namespace ROCKSDB_NAMESPACE diff --git a/db/experimental.cc b/db/experimental.cc index f6f920b2ccb7..d7f609eb2730 100644 --- a/db/experimental.cc +++ b/db/experimental.cc @@ -5,6 +5,11 @@ #include "rocksdb/experimental.h" +#include +#include +#include +#include + #include "db/db_impl/db_impl.h" #include "db/version_util.h" #include "logging/logging.h" @@ -141,5 +146,752 @@ Status UpdateManifestForFilesState( return s; } +// EXPERIMENTAL new filtering features + +namespace { +Slice GetSegmentsFromKey(size_t from_idx, size_t to_idx, const Slice& key, + const KeySegmentsExtractor::Result& extracted) { + assert(from_idx <= to_idx); + size_t count = extracted.segment_ends.size(); + if (count <= from_idx) { + return Slice(); + } + assert(count > 0); + size_t start = from_idx > 0 ? extracted.segment_ends[from_idx - 1] : 0; + size_t end = extracted.segment_ends[std::min(to_idx, count - 1)]; + return Slice(key.data() + start, end - start); +} + +uint64_t CategorySetToUint(const KeySegmentsExtractor::KeyCategorySet& s) { + static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) == + sizeof(uint64_t)); + return *reinterpret_cast(&s); +} + +KeySegmentsExtractor::KeyCategorySet UintToCategorySet(uint64_t s) { + static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) == + sizeof(uint64_t)); + return *reinterpret_cast(&s); +} + +enum BuiltinSstQueryFilters : char { + // Wraps a set of filters such that they use a particular + // KeySegmentsExtractor and version, and a set of categories covering + // all keys seen. + kExtrAndVerAndCatFilterWrapper = 0x1, + + // Wraps a set of filters to limit their scope to a particular set of + // categories. (Unlike kExtrAndVerAndCatFilterWrapper, + // keys in other categories may have been seen so are not filtered here.) + kCategoryScopeFilterWrapper = 0x2, + + // A filter representing the bytewise min and max values of a numbered + // segment or composite (range of segments). The empty value is tracked + // and filtered independently because it might be a special case that is + // not representative of the minimum in a spread of values. + kBytewiseMinMaxFilter = 0x10, +}; + +class SstQueryFilterBuilder { + public: + virtual ~SstQueryFilterBuilder() {} + virtual void Add(const Slice& key, + const KeySegmentsExtractor::Result& extracted, + const Slice* prev_key, + const KeySegmentsExtractor::Result* prev_extracted) = 0; + virtual Status GetStatus() const = 0; + virtual size_t GetEncodedLength() const = 0; + virtual void Finish(std::string& append_to) = 0; +}; + +class SstQueryFilterConfigImpl { + public: + virtual ~SstQueryFilterConfigImpl() {} + + virtual std::unique_ptr NewBuilder( + bool sanity_checks) const = 0; +}; + +class CategoryScopeFilterWrapperBuilder : public SstQueryFilterBuilder { + public: + explicit CategoryScopeFilterWrapperBuilder( + KeySegmentsExtractor::KeyCategorySet categories, + std::unique_ptr wrapped) + : categories_(categories), wrapped_(std::move(wrapped)) {} + + void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted, + const Slice* prev_key, + const KeySegmentsExtractor::Result* prev_extracted) override { + if (!categories_.Contains(extracted.category)) { + // Category not in scope of the contituent filters + return; + } + wrapped_->Add(key, extracted, prev_key, prev_extracted); + } + + Status GetStatus() const override { return wrapped_->GetStatus(); } + + size_t GetEncodedLength() const override { + size_t wrapped_length = wrapped_->GetEncodedLength(); + if (wrapped_length == 0) { + // Use empty filter + // FIXME: needs unit test + return 0; + } else { + // For now in the code, wraps only 1 filter, but schema supports multiple + return 1 + VarintLength(CategorySetToUint(categories_)) + 1 + + wrapped_length; + } + } + + void Finish(std::string& append_to) override { + size_t encoded_length = GetEncodedLength(); + if (encoded_length == 0) { + // Nothing to do + return; + } + size_t old_append_to_size = append_to.size(); + append_to.reserve(old_append_to_size + encoded_length); + append_to.push_back(kCategoryScopeFilterWrapper); + + PutVarint64(&append_to, CategorySetToUint(categories_)); + + // Wrapping just 1 filter for now + PutVarint64(&append_to, 1); + wrapped_->Finish(append_to); + } + + private: + KeySegmentsExtractor::KeyCategorySet categories_; + std::unique_ptr wrapped_; +}; + +class BytewiseMinMaxSstQueryFilterConfig : public SstQueryFilterConfigImpl { + public: + explicit BytewiseMinMaxSstQueryFilterConfig( + uint32_t segment_index_from, uint32_t segment_index_to, + KeySegmentsExtractor::KeyCategorySet categories) + : segment_index_from_(segment_index_from), + segment_index_to_(segment_index_to), + categories_(categories) {} + + std::unique_ptr NewBuilder( + bool sanity_checks) const override { + auto b = std::make_unique(*this, sanity_checks); + if (categories_ != KeySegmentsExtractor::KeyCategorySet::All()) { + return std::make_unique(categories_, + std::move(b)); + } else { + return b; + } + } + + static bool RangeMayMatch( + const Slice& filter, const Slice& lower_bound_incl, + const KeySegmentsExtractor::Result& lower_bound_extracted, + const Slice& upper_bound_excl, + const KeySegmentsExtractor::Result& upper_bound_extracted) { + assert(!filter.empty() && filter[0] == kBytewiseMinMaxFilter); + if (filter.size() <= 4) { + // Missing some data + return true; + } + bool empty_included = (filter[1] & kEmptySeenFlag) != 0; + uint32_t segment_index_from = static_cast(filter[2]); + uint32_t segment_index_to = static_cast(filter[3]); + const char* p = filter.data() + 4; + const char* limit = filter.data() + filter.size(); + + uint32_t smallest_size; + p = GetVarint32Ptr(p, limit, &smallest_size); + if (p == nullptr || static_cast(limit - p) <= smallest_size) { + // Corrupt + return true; + } + Slice smallest = Slice(p, smallest_size); + p += smallest_size; + + size_t largest_size = static_cast(limit - p); + Slice largest = Slice(p, largest_size); + + if (segment_index_from > 0) { + Slice lower_bound_prefix = GetSegmentsFromKey( + 0, segment_index_from - 1, lower_bound_incl, lower_bound_extracted); + Slice upper_bound_prefix = GetSegmentsFromKey( + 0, segment_index_from - 1, upper_bound_excl, upper_bound_extracted); + if (lower_bound_prefix.compare(upper_bound_prefix) != 0) { + // Unable to filter when bounds cross prefix leading up to segment + return true; + } + } + Slice lower_bound_segment = + GetSegmentsFromKey(segment_index_from, segment_index_to, + lower_bound_incl, lower_bound_extracted); + if (empty_included && lower_bound_segment.empty()) { + // May match on 0-length segment + return true; + } + Slice upper_bound_segment = + GetSegmentsFromKey(segment_index_from, segment_index_to, + upper_bound_excl, upper_bound_extracted); + + // TODO: potentially fix upper bound to actually be exclusive + + // May match if both the upper bound and lower bound indicate there could + // be overlap + return upper_bound_segment.compare(smallest) >= 0 && + lower_bound_segment.compare(largest) <= 0; + } + + protected: + struct MyBuilder : public SstQueryFilterBuilder { + MyBuilder(const BytewiseMinMaxSstQueryFilterConfig& _parent, + bool _sanity_checks) + : parent(_parent), sanity_checks(_sanity_checks) {} + + void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted, + const Slice* prev_key, + const KeySegmentsExtractor::Result* prev_extracted) override { + Slice segment = GetSegmentsFromKey( + parent.segment_index_from_, parent.segment_index_to_, key, extracted); + + if (sanity_checks && prev_key && prev_extracted) { + // Opportunistic checking of segment ordering invariant + int compare = 0; + if (parent.segment_index_from_ > 0) { + Slice prev_prefix = GetSegmentsFromKey( + 0, parent.segment_index_from_ - 1, *prev_key, *prev_extracted); + Slice prefix = GetSegmentsFromKey(0, parent.segment_index_from_ - 1, + key, extracted); + compare = prev_prefix.compare(prefix); + if (compare > 0) { + status = Status::Corruption( + "Ordering invariant violated from 0x" + + prev_key->ToString(/*hex=*/true) + " with prefix 0x" + + prev_prefix.ToString(/*hex=*/true) + " to 0x" + + key.ToString(/*hex=*/true) + " with prefix 0x" + + prefix.ToString(/*hex=*/true)); + return; + } + } + if (compare == 0) { + // On the same prefix leading up to the segment, the segments must + // not be out of order. + Slice prev_segment = GetSegmentsFromKey(parent.segment_index_from_, + parent.segment_index_to_, + *prev_key, *prev_extracted); + compare = prev_segment.compare(segment); + if (compare > 0) { + status = Status::Corruption( + "Ordering invariant violated from 0x" + + prev_key->ToString(/*hex=*/true) + " with segment 0x" + + prev_segment.ToString(/*hex=*/true) + " to 0x" + + key.ToString(/*hex=*/true) + " with segment 0x" + + segment.ToString(/*hex=*/true)); + return; + } + } + } + + // Now actually update state for the key segments + // TODO: shorten largest and smallest if appropriate + if (segment.empty()) { + empty_seen = true; + } else if (largest.empty()) { + // First step for non-empty segment + smallest = largest = segment.ToString(); + } else if (segment.compare(largest) > 0) { + largest = segment.ToString(); + } else if (segment.compare(smallest) < 0) { + smallest = segment.ToString(); + } + } + + Status GetStatus() const override { return status; } + + size_t GetEncodedLength() const override { + if (largest.empty()) { + // Not an interesting filter -> 0 to indicate no filter + // FIXME: needs unit test + return 0; + } + return 4 + VarintLength(smallest.size()) + smallest.size() + + largest.size(); + } + + void Finish(std::string& append_to) override { + assert(status.ok()); + size_t encoded_length = GetEncodedLength(); + if (encoded_length == 0) { + // Nothing to do + return; + } + size_t old_append_to_size = append_to.size(); + append_to.reserve(old_append_to_size + encoded_length); + append_to.push_back(kBytewiseMinMaxFilter); + + append_to.push_back(empty_seen ? kEmptySeenFlag : 0); + + // FIXME: check bounds + append_to.push_back(static_cast(parent.segment_index_from_)); + append_to.push_back(static_cast(parent.segment_index_to_)); + + PutVarint32(&append_to, static_cast(smallest.size())); + append_to.append(smallest); + // The end of `largest` is given by the end of the filter + append_to.append(largest); + assert(append_to.size() == old_append_to_size + encoded_length); + } + + const BytewiseMinMaxSstQueryFilterConfig& parent; + const bool sanity_checks; + // Smallest and largest segment seen, excluding the empty segment which + // is tracked separately + std::string smallest; + std::string largest; + bool empty_seen = false; + + // Only for sanity checks + Status status; + }; + + private: + uint32_t segment_index_from_; + uint32_t segment_index_to_; + KeySegmentsExtractor::KeyCategorySet categories_; + + static constexpr char kEmptySeenFlag = 0x1; +}; + +class SstQueryFilterConfigsImpl + : public SstQueryFilterConfigs, + public std::enable_shared_from_this { + public: + Self& SetExtractorAndVersion(std::shared_ptr extractor, + uint32_t version) override { + extractor_ = std::move(extractor); + version_ = version; + return *this; + } + + Self& SetSanityChecks(bool enabled) override { + sanity_checks_ = enabled; + return *this; + } + + Self& AddMinMax(uint32_t from_segment_index, uint32_t to_segment_index, + KeySegmentsExtractor::KeyCategorySet categories) override { + configs_.push_back(std::make_shared( + from_segment_index, to_segment_index, categories)); + return *this; + } + Self& AddApproximateSet( + uint32_t from_segment_index, uint32_t to_segment_index, + KeySegmentsExtractor::KeyCategorySet categories) override { + // TODO + (void)from_segment_index; + (void)to_segment_index; + (void)categories; + return *this; + } + + struct MyCollector : public TablePropertiesCollector { + explicit MyCollector( + std::shared_ptr _parent) + : parent(std::move(_parent)) { + for (const auto& c : parent->configs_) { + builders.push_back(c->NewBuilder(parent->sanity_checks_)); + } + } + + Status AddUserKey(const Slice& key, const Slice& /*value*/, + EntryType /*type*/, SequenceNumber /*seq*/, + uint64_t /*file_size*/) override { + KeySegmentsExtractor::Result extracted; + if (parent->extractor_) { + Status s = + parent->extractor_->Extract(key, KeySegmentsExtractor::kFullUserKey, + parent->version_, &extracted); + if (!s.ok()) { + return s; + } + bool new_category = categories_seen.Add(extracted.category); + if (parent->sanity_checks_) { + // Opportunistic checking of category ordering invariant + if (!first_key) { + if (prev_extracted.category != extracted.category && + !new_category) { + return Status::Corruption( + "Category ordering invariant violated from key 0x" + + Slice(prev_key).ToString(/*hex=*/true) + " to 0x" + + key.ToString(/*hex=*/true)); + } + } + } + } + for (const auto& b : builders) { + if (first_key) { + b->Add(key, extracted, nullptr, nullptr); + } else { + Slice prev_key_slice = Slice(prev_key); + b->Add(key, extracted, &prev_key_slice, &prev_extracted); + } + } + prev_key.assign(key.data(), key.size()); + prev_extracted = extracted; + first_key = false; + return Status::OK(); + } + Status Finish(UserCollectedProperties* properties) override { + assert(properties != nullptr); + + size_t total_size = 1; + // TODO: use autovector + std::vector> filters_to_finish; + // Need to determine number of filters before serializing them. Might + // as well determine full length also. + for (const auto& b : builders) { + Status s = b->GetStatus(); + if (s.ok()) { + size_t len = b->GetEncodedLength(); + if (len > 0) { + total_size += VarintLength(len) + len; + filters_to_finish.emplace_back(*b, len); + } + } else { + // FIXME: no way to report partial failure without getting + // remaining filters thrown out + } + } + total_size += VarintLength(filters_to_finish.size()); + if (filters_to_finish.empty()) { + // No filters to add + return Status::OK(); + } + // Length of the last filter is omitted + total_size -= VarintLength(filters_to_finish.back().second); + + // Need to determine size of + // kExtrAndVerAndCatFilterWrapper if used + size_t name_len = 0; + if (parent->extractor_) { + name_len = strlen(parent->extractor_->Name()); + // identifier byte + total_size += 1; + // fields of the wrapper + total_size += VarintLength(name_len) + name_len + + VarintLength(parent->version_) + + VarintLength(CategorySetToUint(categories_seen)); + // outer layer will have just 1 filter in its count (added here) + // and this filter wrapper will have filters_to_finish.size() + // (added above). + total_size += 1; + } + + std::string filters; + filters.reserve(total_size); + + filters.push_back(kSchemaVersion); + + if (parent->extractor_) { + // Wrap everything in a kExtrAndVerAndCatFilterWrapper + // TODO in future: put whole key filters outside of this wrapper. + // Also TODO in future: order the filters starting with broadest + // applicability. + + // Just one top-level filter (wrapper). Because it's last, we don't + // need to encode its length. + PutVarint64(&filters, 1); + // The filter(s) wrapper itself + filters.push_back(kExtrAndVerAndCatFilterWrapper); + PutVarint64(&filters, name_len); + filters.append(parent->extractor_->Name(), name_len); + PutVarint64(&filters, parent->version_); + PutVarint64(&filters, CategorySetToUint(categories_seen)); + } + + PutVarint64(&filters, filters_to_finish.size()); + + for (const auto& e : filters_to_finish) { + // Encode filter length, except last filter + if (&e != &filters_to_finish.back()) { + PutVarint64(&filters, e.second); + } + // Encode filter + e.first.Finish(filters); + } + if (filters.size() != total_size) { + assert(false); + return Status::Corruption( + "Internal inconsistency building SST query filters"); + } + + (*properties)[SstQueryFilterConfigsImpl::kTablePropertyName] = + std::move(filters); + return Status::OK(); + } + UserCollectedProperties GetReadableProperties() const override { + // TODO? + return {}; + } + const char* Name() const override { + // placeholder + return "SstQueryFilterConfigsImpl::MyCollector"; + } + + std::shared_ptr parent; + std::vector> builders; + bool first_key = true; + std::string prev_key; + KeySegmentsExtractor::Result prev_extracted; + KeySegmentsExtractor::KeyCategorySet categories_seen; + }; + + struct RangeQueryFilterReader { + Slice lower_bound_incl; + Slice upper_bound_excl; + std::shared_ptr extractor; + + struct State { + KeySegmentsExtractor::Result lb_extracted; + KeySegmentsExtractor::Result ub_extracted; + }; + + bool MayMatch_CategoryScopeFilterWrapper(Slice wrapper, + State& state) const { + assert(!wrapper.empty() && wrapper[0] == kCategoryScopeFilterWrapper); + + // Regardless of the filter values (which we assume is not all + // categories; that should skip the wrapper), we need upper bound and + // lower bound to be in the same category to do any range filtering. + // (There could be another category in range between the bounds.) + if (state.lb_extracted.category != state.ub_extracted.category) { + // Can't filter between categories + return true; + } + + const char* p = wrapper.data() + 1; + const char* limit = wrapper.data() + wrapper.size(); + + uint64_t cats_raw; + p = GetVarint64Ptr(p, limit, &cats_raw); + if (p == nullptr) { + // Missing categories + return true; + } + KeySegmentsExtractor::KeyCategorySet categories = + UintToCategorySet(cats_raw); + + // Check category against those in scope + if (!categories.Contains(state.lb_extracted.category)) { + // Can't filter this category + return true; + } + + // Process the wrapped filters + return MayMatch(Slice(p, limit - p), &state); + } + + bool MayMatch_ExtrAndVerAndCatFilterWrapper(Slice wrapper) const { + assert(!wrapper.empty() && wrapper[0] == kExtrAndVerAndCatFilterWrapper); + if (wrapper.size() <= 4) { + // Missing some data + return true; + } + const char* p = wrapper.data() + 1; + const char* limit = wrapper.data() + wrapper.size(); + uint64_t name_len; + p = GetVarint64Ptr(p, limit, &name_len); + if (p == nullptr || name_len == 0 || + static_cast(limit - p) < name_len) { + // Missing some data + return true; + } + Slice name(p, name_len); + p += name_len; + if (!extractor || name != Slice(extractor->Name())) { + // Extractor mismatch + // TODO future: try to get the extractor from the ObjectRegistry + return true; + } + // TODO future: cache extraction with default version + uint32_t version; + p = GetVarint32Ptr(p, limit, &version); + if (p == nullptr) { + // Missing some data + return true; + } + + // Ready to run extractor + assert(extractor); + State state; + Status s = extractor->Extract(lower_bound_incl, + KeySegmentsExtractor::kInclusiveLowerBound, + version, &state.lb_extracted); + if (!s.ok()) { + // TODO? Report problem + // No filtering + return true; + } + s = extractor->Extract(upper_bound_excl, + KeySegmentsExtractor::kExclusiveUpperBound, + version, &state.ub_extracted); + if (!s.ok()) { + // TODO? Report problem + // No filtering + return true; + } + + uint64_t cats_raw; + p = GetVarint64Ptr(p, limit, &cats_raw); + if (p == nullptr) { + // Missing categories + return true; + } + KeySegmentsExtractor::KeyCategorySet categories = + UintToCategorySet(cats_raw); + + // Can only filter out based on category if upper and lower bound have + // the same category. (Each category is contiguous by key order, but we + // don't know the order between categories.) + if (state.lb_extracted.category == state.ub_extracted.category && + !categories.Contains(state.lb_extracted.category)) { + // Filtered out + return false; + } + + // Process the wrapped filters + return MayMatch(Slice(p, limit - p), &state); + } + + bool MayMatch(Slice filters, State* state = nullptr) const { + const char* p = filters.data(); + const char* limit = p + filters.size(); + uint64_t filter_count; + p = GetVarint64Ptr(p, limit, &filter_count); + if (p == nullptr || filter_count == 0) { + // TODO? Report problem + // No filtering + return true; + } + + for (size_t i = 0; i < filter_count; ++i) { + uint64_t filter_len; + if (i + 1 == filter_count) { + // Last filter + filter_len = static_cast(limit - p); + } else { + p = GetVarint64Ptr(p, limit, &filter_len); + if (p == nullptr || filter_len == 0 || + static_cast(limit - p) < filter_len) { + // TODO? Report problem + // No filtering + return true; + } + } + Slice filter = Slice(p, filter_len); + p += filter_len; + bool may_match = true; + char type = filter[0]; + switch (type) { + case kExtrAndVerAndCatFilterWrapper: + may_match = MayMatch_ExtrAndVerAndCatFilterWrapper(filter); + break; + case kCategoryScopeFilterWrapper: + if (state == nullptr) { + // TODO? Report problem + // No filtering + return true; + } + may_match = MayMatch_CategoryScopeFilterWrapper(filter, *state); + break; + case kBytewiseMinMaxFilter: + if (state == nullptr) { + // TODO? Report problem + // No filtering + return true; + } + may_match = BytewiseMinMaxSstQueryFilterConfig::RangeMayMatch( + filter, lower_bound_incl, state->lb_extracted, upper_bound_excl, + state->ub_extracted); + break; + default: + // TODO? Report problem + {} + // Unknown filter type + } + if (!may_match) { + // Successfully filtered + return false; + } + } + + // Wasn't filtered + return true; + } + }; + + struct MyFactory : public TablePropertiesCollectorFactory { + explicit MyFactory(std::shared_ptr _parent) + : parent(std::move(_parent)) {} + TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context /*context*/) override { + return new MyCollector(parent); + } + const char* Name() const override { + // placeholder + return "SstQueryFilterConfigsImpl::MyFactory"; + } + std::shared_ptr parent; + }; + + std::shared_ptr GetTblPropCollFactory() + const override { + return std::make_shared(this->shared_from_this()); + } + std::function GetTableFilterForRangeQuery( + Slice lower_bound_incl, Slice upper_bound_excl) const override { + // TODO: cache extractor results between SST files, assuming most will + // use the same version + return [rqf = RangeQueryFilterReader{lower_bound_incl, upper_bound_excl, + extractor_}]( + const TableProperties& props) -> bool { + auto it = props.user_collected_properties.find(kTablePropertyName); + if (it == props.user_collected_properties.end()) { + // No filtering + return true; + } + auto& filters = it->second; + // Parse the serialized filters string + if (filters.size() < 2 || filters[0] != kSchemaVersion) { + // TODO? Report problem + // No filtering + return true; + } + return rqf.MayMatch(Slice(filters.data() + 1, filters.size() - 1)); + }; + } + + private: + static const std::string kTablePropertyName; + static constexpr char kSchemaVersion = 1; + + private: + std::shared_ptr extractor_; + uint32_t version_ = 0; + std::vector> configs_; + bool sanity_checks_ = false; +}; + +// SstQueryFilterConfigs +const std::string SstQueryFilterConfigsImpl::kTablePropertyName = + "rocksdb.sqfc"; + +} // namespace + +std::shared_ptr SstQueryFilterConfigs::MakeShared() { + return std::make_shared(); +} + } // namespace experimental } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/experimental.h b/include/rocksdb/experimental.h index b59395255c19..a982273b1b41 100644 --- a/include/rocksdb/experimental.h +++ b/include/rocksdb/experimental.h @@ -5,6 +5,10 @@ #pragma once +#include +#include + +#include "rocksdb/data_structure.h" #include "rocksdb/db.h" #include "rocksdb/status.h" @@ -52,5 +56,238 @@ Status UpdateManifestForFilesState( const std::vector& column_families, const UpdateManifestForFilesStateOptions& opts = {}); +// **************************************************************************** +// EXPERIMENTAL new filtering features +// **************************************************************************** + +// A class for splitting a key into meaningful pieces, or "segments" for +// filtering purposes. Keys can also be put in "categories" to simplify +// some configuration and handling. To simplify satisfying some filtering +// requirements, the segments must encompass a complete key prefix (or the whole +// key) and segments cannot overlap. +// +// OTHER CURRENT LIMITATIONS (maybe relaxed in the future for segments only +// needing point query or WHERE filtering): +// * Assumes the (default) byte-wise comparator is used. +// * Assumes that all categories are contiguous in comparator order. In other +// words, any key between two keys of category c must also be in category c. +// * Assumes the (weak) segment ordering property (described below) always +// holds. (For byte-wise comparator, this is implied by the segment prefix +// property, also described below.) +// +// SEGMENT ORDERING PROPERTY: For maximum use in filters, especially for +// filtering key range queries, we must have a correspondence between +// the lexicographic ordering of key segments and the ordering of keys +// they are extracted from. In other words, if we took the segmented keys +// and ordered them primarily by (byte-wise) order on segment 0, then +// on segment 1, etc., then key order of the original keys would not be +// violated. This is the WEAK form of the property, where multiple keys +// might generate the same segments, but such keys must be contiguous in +// key order. (The STRONG form of the property is potentially more useful, +// but for bytewise comparator, it can be inferred from segments satisfying +// the weak property by assuming another segment that extends to the end of +// the key, which would be empty if the segments already extend to the end +// of the key.) +// +// The segment ordering property is hard to think about directly, but for +// bytewise comparator, it is implied by a simpler property to reason about: +// the segment prefix property (see below). (NOTE: an example way to satisfy +// the segment ordering property while breaking the segment prefix property +// is to have a segment delimited by any byte smaller than a certain value, +// and not include the delimiter with the segment leading up to the delimiter. +// For example, the space character is ordered before other printable +// characters, so breaking "foo bar" into "foo", " ", and "bar" would be +// legal, but not recommended.) +// +// SEGMENT PREFIX PROPERTY: If a key generates segments s0, ..., sn (possibly +// more beyond sn) and sn does not extend to the end of the key, then all keys +// starting with bytes s0+...+sn (concatenated) also generate the same segments +// (possibly more). For example, if a key has segment s0 which is less than the +// whole key and another key starts with the bytes of s0--or only has the bytes +// of s0--then the other key must have the same segment s0. In other words, any +// prefix of segments that might not extend to the end of the key must form an +// unambiguous prefix code. See +// https://en.wikipedia.org/wiki/Prefix_code In other other words, parsing +// a key into segments cannot use even a single byte of look-ahead. Upon +// processing each byte, the extractor decides whether to cut a segment that +// ends with that byte, but not one that ends before that byte. The only +// exception is that upon reaching the end of the key, the extractor can choose +// whether to make a segment that ends at the end of the key. +// +// Example types of key segments that can be freely mixed in any order: +// * Some fixed number of bytes or codewords. +// * Ends in a delimiter byte or codeword. (Not including the delimiter as +// part of the segment leading up to it would very likely violate the segment +// prefix property.) +// * Length-encoded sequence of bytes or codewords. The length could even +// come from a preceding segment. +// * Any/all remaining bytes to the end of the key, though this implies all +// subsequent segments will be empty. +// For each kind of segment, it should be determined before parsing the segment +// whether an incomplete/short parse will be treated as a segment extending to +// the end of the key or as an empty segment. +// +// For example, keys might consist of +// * Segment 0: Any sequence of bytes up to and including the first ':' +// character, or the whole key if no ':' is present. +// * Segment 1: The next four bytes, all or nothing (in case of short key). +// * Segment 2: An unsigned byte indicating the number of additional bytes in +// the segment, and then that many bytes (or less up to the end of the key). +// * Segment 3: Any/all remaining bytes in the key +// +// For an example of what can go wrong, consider using '4' as a delimiter +// but not including it with the segment leading up to it. Suppose we have +// these keys and corresponding first segments: +// "123456" -> "123" +// "124536" -> "12" +// "125436" -> "125" +// Notice how byte-wise comparator ordering of the segments does not follow +// the ordering of the keys. This means we cannot safely use a filter with +// a range of segment values for filtering key range queries. +// +// Also note that it is legal for all keys in a category (or many categories) +// to return an empty sequence of segments. +// +// To eliminate a confusing distinction between a segment that is empty vs. +// "not present" for a particular key, each key is logically assiciated with +// an infinite sequence of segments, including some infinite tail of 0-length +// segments. In practice, we only represent a finite sequence that (at least) +// covers the non-trivial segments. +// +class KeySegmentsExtractor { + public: + // The extractor assigns keys to categories so that it is easier to + // combine distinct (though disjoint) key representations within a single + // column family while applying different or overlapping filtering + // configurations to the categories. + // To enable fast set representation, the user is allowed up to 64 + // categories for assigning to keys with the extractor. The user will + // likely cast to their own enum type or scalars. + enum KeyCategory : uint_fast8_t { + kDefaultCategory = 0, + kMinCategory = kDefaultCategory, + // ... (user categories) + // Can be used for a theoretical key ordered before any expected. + kReservedLowCategory = 62, + // Can be used for a theoretical key ordered after any expected. + kReservedHighCategory = 63, + kMaxCategory = kReservedHighCategory, + }; + using KeyCategorySet = SmallEnumSet; + + // The extractor can process three kinds of key-like inputs + enum KeyKind { + // User key, not including user timestamp + kFullUserKey, + // An iterator lower bound (inclusive). This should generally be handled + // the same as a full user key but the distinction might be useful for + // diagnostics or assertions. + kInclusiveLowerBound, + // An iterator upper bound (exclusive). Upper bounds are frequently + // constructed by incrementing the last byte of a key prefix, and this can + // affect what should be considered as a segment delimiter. + kExclusiveUpperBound, + }; + + // The extractor result + struct Result { + // Positions in the key (or bound) that represent boundaries + // between segments, or the exclusive end of each segment. For example, if + // the key is "abc|123|xyz" then following the guidance of including + // delimiters with the preceding segment, segment_ends would be {4, 8, 11}, + // representing segments "abc|" "123|" and "xyz". Empty segments are + // naturally represented with repeated values, as in {4, 8, 8} for + // "abc|123|", though {4, 8} would be logically equivalent because an + // infinite sequence of 0-length segments is assumed after what is + // explicitly represented here. However, segments might not reach the end + // the key (no automatic last segment to the end of the key) and that is + // OK for the WEAK ordering property. + // + // The first segment automatically starts at key position 0. The only way + // to put gaps between segments of interest is to assign those gaps to + // numbered segments, which can be left unused. + std::vector segment_ends; + + // A category to assign to the key or bound. This default may be kept, + // such as to put all keys into a single category. + // IMPORTANT CURRENT LIMITATION from above: each category must be + // contiguous in key comparator order, so any key between two keys in + // category c must also be in category c. (Typically the category will be + // determined by segment 0 in some way, often the first byte.) The enum + // scalar values do not need to be related to key order. + KeyCategory category = kDefaultCategory; + }; + + virtual ~KeySegmentsExtractor() {} + + virtual const char* Name() const = 0; + + // If able to process the input, populates the result and returns OK. + // For unsupported extractor version, returns InvalidArgument. Corruption + // status may be returned for keys or bounds that are not expected in the + // applicable column family. RocksDB will always call the function with + // a (pointer to a) default-initialized result object. + virtual Status Extract(const Slice& key_or_bound, KeyKind kind, + uint32_t version, Result* result) const = 0; + + // For sanity checking + virtual std::pair GetSupportedVersionRange() const = 0; +}; + +// Not user extensible +class SstQueryFilterConfigs { + public: + static std::shared_ptr MakeShared(); + + virtual ~SstQueryFilterConfigs() {} + using Self = SstQueryFilterConfigs; + + // Just one extractor and version is used for all filters on an SST file. + // The user should do necessary work to unify key segment extraction to keep + // RocksDB tracking overheads minimized. + virtual Self& SetExtractorAndVersion( + std::shared_ptr extractor, uint32_t version) = 0; + + virtual Self& SetSanityChecks(bool enabled) = 0; + + // Add a filter to this configuration that stores minimum and maximum values + // (under bytewise ordering) for the segment with the given index (position + // in segment_ends). + Self& AddMinMax(uint32_t segment_index, + KeySegmentsExtractor::KeyCategorySet categories = + KeySegmentsExtractor::KeyCategorySet::All()) { + return AddMinMax(segment_index, segment_index, categories); + } + // Same, on composite of segments [from_segment_index, to_segment_index] + virtual Self& AddMinMax(uint32_t from_segment_index, + uint32_t to_segment_index, + KeySegmentsExtractor::KeyCategorySet categories = + KeySegmentsExtractor::KeyCategorySet::All()) = 0; + + // FUTURE: Replacement for prefix Bloom + Self& AddApproximateSet(uint32_t segment_index, + KeySegmentsExtractor::KeyCategorySet categories = + KeySegmentsExtractor::KeyCategorySet::All()) { + return AddApproximateSet(segment_index, segment_index, categories); + } + // Same, on composite of segments [from_segment_index, to_segment_index] + virtual Self& AddApproximateSet( + uint32_t from_segment_index, uint32_t to_segment_index, + KeySegmentsExtractor::KeyCategorySet categories = + KeySegmentsExtractor::KeyCategorySet::All()) = 0; + + // EXPERIMENTAL/TEMPORARY: used to hook into table properties for persisting + // filters + virtual std::shared_ptr + GetTblPropCollFactory() const = 0; + + // EXPERIMENTAL/TEMPORARY: used as table_filter hook for applying persisted + // filters to range queries. The buffers pointed to by the Slices must live + // as long as any read operations using this table filter function. + virtual std::function + GetTableFilterForRangeQuery(Slice lower_bound_incl, + Slice upper_bound_excl) const = 0; +}; + } // namespace experimental } // namespace ROCKSDB_NAMESPACE