From 2bd3f91e1bc138c0806d761c7c37701f86bbb0e4 Mon Sep 17 00:00:00 2001 From: toge Date: Sun, 8 Sep 2024 23:56:23 +0900 Subject: [PATCH 1/2] rocksdb: add version 9.6.1 --- recipes/rocksdb/all/conandata.yml | 7 + .../patches/9.6.1-0001-support-clang.patch | 1331 +++++++++++++++++ recipes/rocksdb/config.yml | 2 + 3 files changed, 1340 insertions(+) create mode 100644 recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch diff --git a/recipes/rocksdb/all/conandata.yml b/recipes/rocksdb/all/conandata.yml index 074302cce72fd..51de59560d7e7 100644 --- a/recipes/rocksdb/all/conandata.yml +++ b/recipes/rocksdb/all/conandata.yml @@ -1,4 +1,7 @@ sources: + "9.6.1": + url: "https://github.com/facebook/rocksdb/archive/refs/tags/v9.6.1.tar.gz" + sha256: "98cf497c1d6d0a927142d2002a0b6b4816a0998c74fda9ae7b1bdaf6b784e895" "9.5.2": url: "https://github.com/facebook/rocksdb/archive/refs/tags/v9.5.2.tar.gz" sha256: "B20780586D3DF4A3C5BCBDE341A2C1946B03D18237960BDA5BC5E9538F42AF40" @@ -27,6 +30,10 @@ sources: url: "https://github.com/facebook/rocksdb/archive/refs/tags/v6.20.3.tar.gz" sha256: "c6502c7aae641b7e20fafa6c2b92273d935d2b7b2707135ebd9a67b092169dca" patches: + "9.6.1": + - patch_file: "patches/9.6.1-0001-support-clang.patch" + patch_description: "Fix build with gcc 13 by including cstdint" + patch_type: "portability" "6.29.5": - patch_file: "patches/6.29.5-0001-add-include-cstdint-for-gcc-13.patch" patch_description: "Fix build with gcc 13 by including cstdint" diff --git a/recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch b/recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch new file mode 100644 index 0000000000000..8141c951a8c73 --- /dev/null +++ b/recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch @@ -0,0 +1,1331 @@ +diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc +index 1b444f5..e3d64d6 100644 +--- a/table/block_based/block_based_table_builder.cc ++++ b/table/block_based/block_based_table_builder.cc +@@ -267,708 +267,708 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector + bool decoupled_partitioned_filters_; + }; + +-struct BlockBasedTableBuilder::Rep { +- const ImmutableOptions ioptions; +- // BEGIN from MutableCFOptions +- std::shared_ptr prefix_extractor; +- // END from MutableCFOptions +- const WriteOptions write_options; +- const BlockBasedTableOptions table_options; +- const InternalKeyComparator& internal_comparator; +- // Size in bytes for the user-defined timestamps. +- size_t ts_sz; +- // When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the +- // user key will be stripped when creating the block based table. This +- // stripping happens for all user keys, including the keys in data block, +- // index block for data block, index block for index block (if index type is +- // `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned +- // filters), the `first_internal_key` in `IndexValue`, the `end_key` for range +- // deletion entries. +- // As long as the user keys are sorted when added via `Add` API, their logic +- // ordering won't change after timestamps are stripped. However, for each user +- // key to be logically equivalent before and after timestamp is stripped, the +- // user key should contain the minimum timestamp. +- bool persist_user_defined_timestamps; +- WritableFileWriter* file; +- std::atomic offset; +- size_t alignment; +- BlockBuilder data_block; +- // Buffers uncompressed data blocks to replay later. Needed when +- // compression dictionary is enabled so we can finalize the dictionary before +- // compressing any data blocks. +- std::vector data_block_buffers; +- BlockBuilder range_del_block; ++struct BlockBasedTableBuilder::ParallelCompressionRep { ++ // TODO: consider replacing with autovector or similar ++ // Keys is a wrapper of vector of strings avoiding ++ // releasing string memories during vector clear() ++ // in order to save memory allocation overhead ++ class Keys { ++ public: ++ Keys() : keys_(kKeysInitSize), size_(0) {} ++ void PushBack(const Slice& key) { ++ if (size_ == keys_.size()) { ++ keys_.emplace_back(key.data(), key.size()); ++ } else { ++ keys_[size_].assign(key.data(), key.size()); ++ } ++ size_++; ++ } ++ void SwapAssign(std::vector& keys) { ++ size_ = keys.size(); ++ std::swap(keys_, keys); ++ } ++ void Clear() { size_ = 0; } ++ size_t Size() { return size_; } ++ std::string& Back() { return keys_[size_ - 1]; } ++ std::string& operator[](size_t idx) { ++ assert(idx < size_); ++ return keys_[idx]; ++ } + +- InternalKeySliceTransform internal_prefix_transform; +- std::unique_ptr index_builder; +- std::string index_separator_scratch; +- PartitionedIndexBuilder* p_index_builder_ = nullptr; ++ private: ++ const size_t kKeysInitSize = 32; ++ std::vector keys_; ++ size_t size_; ++ }; ++ std::unique_ptr curr_block_keys; + +- std::string last_ikey; // Internal key or empty (unset) +- const Slice* first_key_in_next_block = nullptr; +- CompressionType compression_type; +- uint64_t sample_for_compression; +- std::atomic compressible_input_data_bytes; +- std::atomic uncompressible_input_data_bytes; +- std::atomic sampled_input_data_bytes; +- std::atomic sampled_output_slow_data_bytes; +- std::atomic sampled_output_fast_data_bytes; +- CompressionOptions compression_opts; +- std::unique_ptr compression_dict; +- std::vector> compression_ctxs; +- std::vector> verify_ctxs; +- std::unique_ptr verify_dict; ++ class BlockRepSlot; + +- size_t data_begin_offset = 0; ++ // BlockRep instances are fetched from and recycled to ++ // block_rep_pool during parallel compression. ++ struct BlockRep { ++ Slice contents; ++ Slice compressed_contents; ++ std::unique_ptr data; ++ std::unique_ptr compressed_data; ++ CompressionType compression_type; ++ std::unique_ptr first_key_in_next_block; ++ std::unique_ptr keys; ++ std::unique_ptr slot; ++ Status status; ++ }; ++ // Use a vector of BlockRep as a buffer for a determined number ++ // of BlockRep structures. All data referenced by pointers in ++ // BlockRep will be freed when this vector is destructed. ++ using BlockRepBuffer = std::vector; ++ BlockRepBuffer block_rep_buf; ++ // Use a thread-safe queue for concurrent access from block ++ // building thread and writer thread. ++ using BlockRepPool = WorkQueue; ++ BlockRepPool block_rep_pool; + +- TableProperties props; ++ // Use BlockRepSlot to keep block order in write thread. ++ // slot_ will pass references to BlockRep ++ class BlockRepSlot { ++ public: ++ BlockRepSlot() : slot_(1) {} ++ template ++ void Fill(T&& rep) { ++ slot_.push(std::forward(rep)); ++ } ++ void Take(BlockRep*& rep) { slot_.pop(rep); } + +- // States of the builder. +- // +- // - `kBuffered`: This is the initial state where zero or more data blocks are +- // accumulated uncompressed in-memory. From this state, call +- // `EnterUnbuffered()` to finalize the compression dictionary if enabled, +- // compress/write out any buffered blocks, and proceed to the `kUnbuffered` +- // state. +- // +- // - `kUnbuffered`: This is the state when compression dictionary is finalized +- // either because it wasn't enabled in the first place or it's been created +- // from sampling previously buffered data. In this state, blocks are simply +- // compressed/written out as they fill up. From this state, call `Finish()` +- // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete +- // the partially created file. +- // +- // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been +- // called, so the table builder is no longer usable. We must be in this +- // state by the time the destructor runs. +- enum class State { +- kBuffered, +- kUnbuffered, +- kClosed, ++ private: ++ // slot_ will pass references to BlockRep in block_rep_buf, ++ // and those references are always valid before the destruction of ++ // block_rep_buf. ++ WorkQueue slot_; + }; +- State state; +- // `kBuffered` state is allowed only as long as the buffering of uncompressed +- // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. +- uint64_t buffer_limit; +- std::shared_ptr +- compression_dict_buffer_cache_res_mgr; +- const bool use_delta_encoding_for_index_values; +- std::unique_ptr filter_builder; +- OffsetableCacheKey base_cache_key; +- const TableFileCreationReason reason; + +- BlockHandle pending_handle; // Handle to add to index block ++ // Compression queue will pass references to BlockRep in block_rep_buf, ++ // and those references are always valid before the destruction of ++ // block_rep_buf. ++ using CompressQueue = WorkQueue; ++ CompressQueue compress_queue; ++ std::vector compress_thread_pool; + +- std::string compressed_output; +- std::unique_ptr flush_block_policy; ++ // Write queue will pass references to BlockRep::slot in block_rep_buf, ++ // and those references are always valid before the corresponding ++ // BlockRep::slot is destructed, which is before the destruction of ++ // block_rep_buf. ++ using WriteQueue = WorkQueue; ++ WriteQueue write_queue; ++ std::unique_ptr write_thread; + +- std::vector> table_properties_collectors; ++ // Estimate output file size when parallel compression is enabled. This is ++ // necessary because compression & flush are no longer synchronized, ++ // and BlockBasedTableBuilder::FileSize() is no longer accurate. ++ // memory_order_relaxed suffices because accurate statistics is not required. ++ class FileSizeEstimator { ++ public: ++ explicit FileSizeEstimator() ++ : uncomp_bytes_compressed(0), ++ uncomp_bytes_curr_block(0), ++ uncomp_bytes_curr_block_set(false), ++ uncomp_bytes_inflight(0), ++ blocks_inflight(0), ++ curr_compression_ratio(0), ++ estimated_file_size(0) {} + +- std::unique_ptr pc_rep; +- BlockCreateContext create_context; ++ // Estimate file size when a block is about to be emitted to ++ // compression thread ++ void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { ++ uint64_t new_uncomp_bytes_inflight = ++ uncomp_bytes_inflight.fetch_add(uncomp_block_size, ++ std::memory_order_relaxed) + ++ uncomp_block_size; + +- // The size of the "tail" part of a SST file. "Tail" refers to +- // all blocks after data blocks till the end of the SST file. +- uint64_t tail_size; ++ uint64_t new_blocks_inflight = ++ blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; + +- // See class Footer +- uint32_t base_context_checksum; ++ estimated_file_size.store( ++ curr_file_size + ++ static_cast( ++ static_cast(new_uncomp_bytes_inflight) * ++ curr_compression_ratio.load(std::memory_order_relaxed)) + ++ new_blocks_inflight * kBlockTrailerSize, ++ std::memory_order_relaxed); ++ } + +- uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } +- void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } ++ // Estimate file size when a block is already reaped from ++ // compression thread ++ void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { ++ assert(uncomp_bytes_curr_block_set); + +- bool IsParallelCompressionEnabled() const { +- return compression_opts.parallel_threads > 1; +- } ++ uint64_t new_uncomp_bytes_compressed = ++ uncomp_bytes_compressed + uncomp_bytes_curr_block; ++ assert(new_uncomp_bytes_compressed > 0); + +- Status GetStatus() { +- // We need to make modifications of status visible when status_ok is set +- // to false, and this is ensured by status_mutex, so no special memory +- // order for status_ok is required. +- if (status_ok.load(std::memory_order_relaxed)) { +- return Status::OK(); +- } else { +- return CopyStatus(); +- } +- } ++ curr_compression_ratio.store( ++ (curr_compression_ratio.load(std::memory_order_relaxed) * ++ uncomp_bytes_compressed + ++ compressed_block_size) / ++ static_cast(new_uncomp_bytes_compressed), ++ std::memory_order_relaxed); ++ uncomp_bytes_compressed = new_uncomp_bytes_compressed; + +- Status CopyStatus() { +- std::lock_guard lock(status_mutex); +- return status; +- } ++ uint64_t new_uncomp_bytes_inflight = ++ uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, ++ std::memory_order_relaxed) - ++ uncomp_bytes_curr_block; + +- IOStatus GetIOStatus() { +- // We need to make modifications of io_status visible when status_ok is set +- // to false, and this is ensured by io_status_mutex, so no special memory +- // order for io_status_ok is required. +- if (io_status_ok.load(std::memory_order_relaxed)) { +-#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition +- auto ios = CopyIOStatus(); +- ios.PermitUncheckedError(); +- // Assume no races in unit tests +- assert(ios.ok()); +-#endif // ROCKSDB_ASSERT_STATUS_CHECKED +- return IOStatus::OK(); +- } else { +- return CopyIOStatus(); +- } +- } ++ uint64_t new_blocks_inflight = ++ blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; + +- IOStatus CopyIOStatus() { +- std::lock_guard lock(io_status_mutex); +- return io_status; +- } ++ estimated_file_size.store( ++ curr_file_size + ++ static_cast( ++ static_cast(new_uncomp_bytes_inflight) * ++ curr_compression_ratio.load(std::memory_order_relaxed)) + ++ new_blocks_inflight * kBlockTrailerSize, ++ std::memory_order_relaxed); + +- // Never erase an existing status that is not OK. +- void SetStatus(Status s) { +- if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { +- // Locking is an overkill for non compression_opts.parallel_threads +- // case but since it's unlikely that s is not OK, we take this cost +- // to be simplicity. +- std::lock_guard lock(status_mutex); +- status = s; +- status_ok.store(false, std::memory_order_relaxed); ++ uncomp_bytes_curr_block_set = false; + } +- } + +- // Never erase an existing I/O status that is not OK. +- // Calling this will also SetStatus(ios) +- void SetIOStatus(IOStatus ios) { +- if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { +- // Locking is an overkill for non compression_opts.parallel_threads +- // case but since it's unlikely that s is not OK, we take this cost +- // to be simplicity. +- std::lock_guard lock(io_status_mutex); +- io_status = ios; +- io_status_ok.store(false, std::memory_order_relaxed); ++ void SetEstimatedFileSize(uint64_t size) { ++ estimated_file_size.store(size, std::memory_order_relaxed); + } +- SetStatus(ios); +- } + +- Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo, +- WritableFileWriter* f) +- : ioptions(tbo.ioptions), +- prefix_extractor(tbo.moptions.prefix_extractor), +- write_options(tbo.write_options), +- table_options(table_opt), +- internal_comparator(tbo.internal_comparator), +- ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()), +- persist_user_defined_timestamps( +- tbo.ioptions.persist_user_defined_timestamps), +- file(f), +- offset(0), +- alignment(table_options.block_align +- ? std::min(static_cast(table_options.block_size), +- kDefaultPageSize) +- : 0), +- data_block(table_options.block_restart_interval, +- table_options.use_delta_encoding, +- false /* use_value_delta_encoding */, +- tbo.internal_comparator.user_comparator() +- ->CanKeysWithDifferentByteContentsBeEqual() +- ? BlockBasedTableOptions::kDataBlockBinarySearch +- : table_options.data_block_index_type, +- table_options.data_block_hash_table_util_ratio, ts_sz, +- persist_user_defined_timestamps), +- range_del_block( +- 1 /* block_restart_interval */, true /* use_delta_encoding */, +- false /* use_value_delta_encoding */, +- BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */, +- 0.75 /* data_block_hash_table_util_ratio */, ts_sz, +- persist_user_defined_timestamps), +- internal_prefix_transform(prefix_extractor.get()), +- compression_type(tbo.compression_type), +- sample_for_compression(tbo.moptions.sample_for_compression), +- compressible_input_data_bytes(0), +- uncompressible_input_data_bytes(0), +- sampled_input_data_bytes(0), +- sampled_output_slow_data_bytes(0), +- sampled_output_fast_data_bytes(0), +- compression_opts(tbo.compression_opts), +- compression_dict(), +- compression_ctxs(tbo.compression_opts.parallel_threads), +- verify_ctxs(tbo.compression_opts.parallel_threads), +- verify_dict(), +- state((tbo.compression_opts.max_dict_bytes > 0 && +- tbo.compression_type != kNoCompression) +- ? State::kBuffered +- : State::kUnbuffered), +- use_delta_encoding_for_index_values(table_opt.format_version >= 4 && +- !table_opt.block_align), +- reason(tbo.reason), +- flush_block_policy( +- table_options.flush_block_policy_factory->NewFlushBlockPolicy( +- table_options, data_block)), +- create_context(&table_options, &ioptions, ioptions.stats, +- compression_type == kZSTD || +- compression_type == kZSTDNotFinalCompression, +- tbo.moptions.block_protection_bytes_per_key, +- tbo.internal_comparator.user_comparator(), +- !use_delta_encoding_for_index_values, +- table_opt.index_type == +- BlockBasedTableOptions::kBinarySearchWithFirstKey), +- tail_size(0), +- status_ok(true), +- io_status_ok(true) { +- if (tbo.target_file_size == 0) { +- buffer_limit = compression_opts.max_dict_buffer_bytes; +- } else if (compression_opts.max_dict_buffer_bytes == 0) { +- buffer_limit = tbo.target_file_size; +- } else { +- buffer_limit = std::min(tbo.target_file_size, +- compression_opts.max_dict_buffer_bytes); ++ uint64_t GetEstimatedFileSize() { ++ return estimated_file_size.load(std::memory_order_relaxed); + } + +- const auto compress_dict_build_buffer_charged = +- table_options.cache_usage_options.options_overrides +- .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer) +- .charged; +- if (table_options.block_cache && +- (compress_dict_build_buffer_charged == +- CacheEntryRoleOptions::Decision::kEnabled || +- compress_dict_build_buffer_charged == +- CacheEntryRoleOptions::Decision::kFallback)) { +- compression_dict_buffer_cache_res_mgr = +- std::make_shared>( +- table_options.block_cache); +- } else { +- compression_dict_buffer_cache_res_mgr = nullptr; ++ void SetCurrBlockUncompSize(uint64_t size) { ++ uncomp_bytes_curr_block = size; ++ uncomp_bytes_curr_block_set = true; + } + +- assert(compression_ctxs.size() >= compression_opts.parallel_threads); +- for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { +- compression_ctxs[i].reset( +- new CompressionContext(compression_type, compression_opts)); ++ private: ++ // Input bytes compressed so far. ++ uint64_t uncomp_bytes_compressed; ++ // Size of current block being appended. ++ uint64_t uncomp_bytes_curr_block; ++ // Whether uncomp_bytes_curr_block has been set for next ++ // ReapBlock call. ++ bool uncomp_bytes_curr_block_set; ++ // Input bytes under compression and not appended yet. ++ std::atomic uncomp_bytes_inflight; ++ // Number of blocks under compression and not appended yet. ++ std::atomic blocks_inflight; ++ // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. ++ std::atomic curr_compression_ratio; ++ // Estimated SST file size. ++ std::atomic estimated_file_size; ++ }; ++ FileSizeEstimator file_size_estimator; ++ ++ // Facilities used for waiting first block completion. Need to Wait for ++ // the completion of first block compression and flush to get a non-zero ++ // compression ratio. ++ std::atomic first_block_processed; ++ std::condition_variable first_block_cond; ++ std::mutex first_block_mutex; ++ ++ explicit ParallelCompressionRep(uint32_t parallel_threads) ++ : curr_block_keys(new Keys()), ++ block_rep_buf(parallel_threads), ++ block_rep_pool(parallel_threads), ++ compress_queue(parallel_threads), ++ write_queue(parallel_threads), ++ first_block_processed(false) { ++ for (uint32_t i = 0; i < parallel_threads; i++) { ++ block_rep_buf[i].contents = Slice(); ++ block_rep_buf[i].compressed_contents = Slice(); ++ block_rep_buf[i].data.reset(new std::string()); ++ block_rep_buf[i].compressed_data.reset(new std::string()); ++ block_rep_buf[i].compression_type = CompressionType(); ++ block_rep_buf[i].first_key_in_next_block.reset(new std::string()); ++ block_rep_buf[i].keys.reset(new Keys()); ++ block_rep_buf[i].slot.reset(new BlockRepSlot()); ++ block_rep_buf[i].status = Status::OK(); ++ block_rep_pool.push(&block_rep_buf[i]); ++ } ++ } ++ ++ ~ParallelCompressionRep() { block_rep_pool.finish(); } ++ ++ // Make a block prepared to be emitted to compression thread ++ // Used in non-buffered mode ++ BlockRep* PrepareBlock(CompressionType compression_type, ++ const Slice* first_key_in_next_block, ++ BlockBuilder* data_block) { ++ BlockRep* block_rep = ++ PrepareBlockInternal(compression_type, first_key_in_next_block); ++ assert(block_rep != nullptr); ++ data_block->SwapAndReset(*(block_rep->data)); ++ block_rep->contents = *(block_rep->data); ++ std::swap(block_rep->keys, curr_block_keys); ++ curr_block_keys->Clear(); ++ return block_rep; ++ } ++ ++ // Used in EnterUnbuffered ++ BlockRep* PrepareBlock(CompressionType compression_type, ++ const Slice* first_key_in_next_block, ++ std::string* data_block, ++ std::vector* keys) { ++ BlockRep* block_rep = ++ PrepareBlockInternal(compression_type, first_key_in_next_block); ++ assert(block_rep != nullptr); ++ std::swap(*(block_rep->data), *data_block); ++ block_rep->contents = *(block_rep->data); ++ block_rep->keys->SwapAssign(*keys); ++ return block_rep; ++ } ++ ++ // Emit a block to compression thread ++ void EmitBlock(BlockRep* block_rep) { ++ assert(block_rep != nullptr); ++ assert(block_rep->status.ok()); ++ if (!write_queue.push(block_rep->slot.get())) { ++ return; + } +- if (table_options.index_type == +- BlockBasedTableOptions::kTwoLevelIndexSearch) { +- p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( +- &internal_comparator, use_delta_encoding_for_index_values, +- table_options, ts_sz, persist_user_defined_timestamps); +- index_builder.reset(p_index_builder_); +- } else { +- index_builder.reset(IndexBuilder::CreateIndexBuilder( +- table_options.index_type, &internal_comparator, +- &this->internal_prefix_transform, use_delta_encoding_for_index_values, +- table_options, ts_sz, persist_user_defined_timestamps)); ++ if (!compress_queue.push(block_rep)) { ++ return; + } +- if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) { +- // Apply optimize_filters_for_hits setting here when applicable by +- // skipping filter generation +- filter_builder.reset(); +- } else if (tbo.skip_filters) { +- // For SstFileWriter skip_filters +- filter_builder.reset(); +- } else if (!table_options.filter_policy) { +- // Null filter_policy -> no filter +- filter_builder.reset(); +- } else { +- FilterBuildingContext filter_context(table_options); + +- filter_context.info_log = ioptions.logger; +- filter_context.column_family_name = tbo.column_family_name; +- filter_context.reason = reason; ++ if (!first_block_processed.load(std::memory_order_relaxed)) { ++ std::unique_lock lock(first_block_mutex); ++ first_block_cond.wait(lock, [this] { ++ return first_block_processed.load(std::memory_order_relaxed); ++ }); ++ } ++ } + +- // Only populate other fields if known to be in LSM rather than +- // generating external SST file +- if (reason != TableFileCreationReason::kMisc) { +- filter_context.compaction_style = ioptions.compaction_style; +- filter_context.num_levels = ioptions.num_levels; +- filter_context.level_at_creation = tbo.level_at_creation; +- filter_context.is_bottommost = tbo.is_bottommost; +- assert(filter_context.level_at_creation < filter_context.num_levels); +- } ++ // Reap a block from compression thread ++ void ReapBlock(BlockRep* block_rep) { ++ assert(block_rep != nullptr); ++ block_rep->compressed_data->clear(); ++ block_rep_pool.push(block_rep); + +- filter_builder.reset(CreateFilterBlockBuilder( +- ioptions, tbo.moptions, filter_context, +- use_delta_encoding_for_index_values, p_index_builder_, ts_sz, +- persist_user_defined_timestamps)); ++ if (!first_block_processed.load(std::memory_order_relaxed)) { ++ std::lock_guard lock(first_block_mutex); ++ first_block_processed.store(true, std::memory_order_relaxed); ++ first_block_cond.notify_one(); + } ++ } + +- assert(tbo.internal_tbl_prop_coll_factories); +- for (auto& factory : *tbo.internal_tbl_prop_coll_factories) { +- assert(factory); ++ private: ++ BlockRep* PrepareBlockInternal(CompressionType compression_type, ++ const Slice* first_key_in_next_block) { ++ BlockRep* block_rep = nullptr; ++ block_rep_pool.pop(block_rep); ++ assert(block_rep != nullptr); + +- std::unique_ptr collector{ +- factory->CreateInternalTblPropColl( +- tbo.column_family_id, tbo.level_at_creation, +- tbo.ioptions.num_levels, +- tbo.last_level_inclusive_max_seqno_threshold)}; +- if (collector) { +- table_properties_collectors.emplace_back(std::move(collector)); +- } +- } +- table_properties_collectors.emplace_back( +- new BlockBasedTablePropertiesCollector( +- table_options.index_type, table_options.whole_key_filtering, +- prefix_extractor != nullptr, +- table_options.decouple_partitioned_filters)); +- if (ts_sz > 0 && persist_user_defined_timestamps) { +- table_properties_collectors.emplace_back( +- new TimestampTablePropertiesCollector( +- tbo.internal_comparator.user_comparator())); +- } +- if (table_options.verify_compression) { +- for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { +- verify_ctxs[i].reset(new UncompressionContext(compression_type)); +- } +- } ++ assert(block_rep->data); + +- // These are only needed for populating table properties +- props.column_family_id = tbo.column_family_id; +- props.column_family_name = tbo.column_family_name; +- props.oldest_key_time = tbo.oldest_key_time; +- props.file_creation_time = tbo.file_creation_time; +- props.orig_file_number = tbo.cur_file_num; +- props.db_id = tbo.db_id; +- props.db_session_id = tbo.db_session_id; +- props.db_host_id = ioptions.db_host_id; +- if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { +- ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); +- } ++ block_rep->compression_type = compression_type; + +- if (FormatVersionUsesContextChecksum(table_options.format_version)) { +- // Must be non-zero and semi- or quasi-random +- // TODO: ideally guaranteed different for related files (e.g. use file +- // number and db_session, for benefit of SstFileWriter) +- do { +- base_context_checksum = Random::GetTLSInstance()->Next(); +- } while (UNLIKELY(base_context_checksum == 0)); ++ if (first_key_in_next_block == nullptr) { ++ block_rep->first_key_in_next_block.reset(nullptr); + } else { +- base_context_checksum = 0; ++ block_rep->first_key_in_next_block->assign( ++ first_key_in_next_block->data(), first_key_in_next_block->size()); + } + +- if (alignment > 0 && compression_type != kNoCompression) { +- // With better sanitization in `CompactionPicker::CompactFiles()`, we +- // would not need to handle this case here and could change it to an +- // assertion instead. +- SetStatus(Status::InvalidArgument( +- "Enable block_align, but compression enabled")); +- } ++ return block_rep; + } ++}; + +- Rep(const Rep&) = delete; +- Rep& operator=(const Rep&) = delete; ++struct BlockBasedTableBuilder::Rep { ++ const ImmutableOptions ioptions; ++ // BEGIN from MutableCFOptions ++ std::shared_ptr prefix_extractor; ++ // END from MutableCFOptions ++ const WriteOptions write_options; ++ const BlockBasedTableOptions table_options; ++ const InternalKeyComparator& internal_comparator; ++ // Size in bytes for the user-defined timestamps. ++ size_t ts_sz; ++ // When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the ++ // user key will be stripped when creating the block based table. This ++ // stripping happens for all user keys, including the keys in data block, ++ // index block for data block, index block for index block (if index type is ++ // `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned ++ // filters), the `first_internal_key` in `IndexValue`, the `end_key` for range ++ // deletion entries. ++ // As long as the user keys are sorted when added via `Add` API, their logic ++ // ordering won't change after timestamps are stripped. However, for each user ++ // key to be logically equivalent before and after timestamp is stripped, the ++ // user key should contain the minimum timestamp. ++ bool persist_user_defined_timestamps; ++ WritableFileWriter* file; ++ std::atomic offset; ++ size_t alignment; ++ BlockBuilder data_block; ++ // Buffers uncompressed data blocks to replay later. Needed when ++ // compression dictionary is enabled so we can finalize the dictionary before ++ // compressing any data blocks. ++ std::vector data_block_buffers; ++ BlockBuilder range_del_block; + +- private: +- // Synchronize status & io_status accesses across threads from main thread, +- // compression thread and write thread in parallel compression. +- std::mutex status_mutex; +- std::atomic status_ok; +- Status status; +- std::mutex io_status_mutex; +- std::atomic io_status_ok; +- IOStatus io_status; +-}; ++ InternalKeySliceTransform internal_prefix_transform; ++ std::unique_ptr index_builder; ++ std::string index_separator_scratch; ++ PartitionedIndexBuilder* p_index_builder_ = nullptr; + +-struct BlockBasedTableBuilder::ParallelCompressionRep { +- // TODO: consider replacing with autovector or similar +- // Keys is a wrapper of vector of strings avoiding +- // releasing string memories during vector clear() +- // in order to save memory allocation overhead +- class Keys { +- public: +- Keys() : keys_(kKeysInitSize), size_(0) {} +- void PushBack(const Slice& key) { +- if (size_ == keys_.size()) { +- keys_.emplace_back(key.data(), key.size()); +- } else { +- keys_[size_].assign(key.data(), key.size()); +- } +- size_++; +- } +- void SwapAssign(std::vector& keys) { +- size_ = keys.size(); +- std::swap(keys_, keys); +- } +- void Clear() { size_ = 0; } +- size_t Size() { return size_; } +- std::string& Back() { return keys_[size_ - 1]; } +- std::string& operator[](size_t idx) { +- assert(idx < size_); +- return keys_[idx]; +- } ++ std::string last_ikey; // Internal key or empty (unset) ++ const Slice* first_key_in_next_block = nullptr; ++ CompressionType compression_type; ++ uint64_t sample_for_compression; ++ std::atomic compressible_input_data_bytes; ++ std::atomic uncompressible_input_data_bytes; ++ std::atomic sampled_input_data_bytes; ++ std::atomic sampled_output_slow_data_bytes; ++ std::atomic sampled_output_fast_data_bytes; ++ CompressionOptions compression_opts; ++ std::unique_ptr compression_dict; ++ std::vector> compression_ctxs; ++ std::vector> verify_ctxs; ++ std::unique_ptr verify_dict; + +- private: +- const size_t kKeysInitSize = 32; +- std::vector keys_; +- size_t size_; ++ size_t data_begin_offset = 0; ++ ++ TableProperties props; ++ ++ // States of the builder. ++ // ++ // - `kBuffered`: This is the initial state where zero or more data blocks are ++ // accumulated uncompressed in-memory. From this state, call ++ // `EnterUnbuffered()` to finalize the compression dictionary if enabled, ++ // compress/write out any buffered blocks, and proceed to the `kUnbuffered` ++ // state. ++ // ++ // - `kUnbuffered`: This is the state when compression dictionary is finalized ++ // either because it wasn't enabled in the first place or it's been created ++ // from sampling previously buffered data. In this state, blocks are simply ++ // compressed/written out as they fill up. From this state, call `Finish()` ++ // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete ++ // the partially created file. ++ // ++ // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been ++ // called, so the table builder is no longer usable. We must be in this ++ // state by the time the destructor runs. ++ enum class State { ++ kBuffered, ++ kUnbuffered, ++ kClosed, + }; +- std::unique_ptr curr_block_keys; +- +- class BlockRepSlot; ++ State state; ++ // `kBuffered` state is allowed only as long as the buffering of uncompressed ++ // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. ++ uint64_t buffer_limit; ++ std::shared_ptr ++ compression_dict_buffer_cache_res_mgr; ++ const bool use_delta_encoding_for_index_values; ++ std::unique_ptr filter_builder; ++ OffsetableCacheKey base_cache_key; ++ const TableFileCreationReason reason; + +- // BlockRep instances are fetched from and recycled to +- // block_rep_pool during parallel compression. +- struct BlockRep { +- Slice contents; +- Slice compressed_contents; +- std::unique_ptr data; +- std::unique_ptr compressed_data; +- CompressionType compression_type; +- std::unique_ptr first_key_in_next_block; +- std::unique_ptr keys; +- std::unique_ptr slot; +- Status status; +- }; +- // Use a vector of BlockRep as a buffer for a determined number +- // of BlockRep structures. All data referenced by pointers in +- // BlockRep will be freed when this vector is destructed. +- using BlockRepBuffer = std::vector; +- BlockRepBuffer block_rep_buf; +- // Use a thread-safe queue for concurrent access from block +- // building thread and writer thread. +- using BlockRepPool = WorkQueue; +- BlockRepPool block_rep_pool; ++ BlockHandle pending_handle; // Handle to add to index block + +- // Use BlockRepSlot to keep block order in write thread. +- // slot_ will pass references to BlockRep +- class BlockRepSlot { +- public: +- BlockRepSlot() : slot_(1) {} +- template +- void Fill(T&& rep) { +- slot_.push(std::forward(rep)); +- } +- void Take(BlockRep*& rep) { slot_.pop(rep); } ++ std::string compressed_output; ++ std::unique_ptr flush_block_policy; + +- private: +- // slot_ will pass references to BlockRep in block_rep_buf, +- // and those references are always valid before the destruction of +- // block_rep_buf. +- WorkQueue slot_; +- }; ++ std::vector> table_properties_collectors; + +- // Compression queue will pass references to BlockRep in block_rep_buf, +- // and those references are always valid before the destruction of +- // block_rep_buf. +- using CompressQueue = WorkQueue; +- CompressQueue compress_queue; +- std::vector compress_thread_pool; ++ std::unique_ptr pc_rep; ++ BlockCreateContext create_context; + +- // Write queue will pass references to BlockRep::slot in block_rep_buf, +- // and those references are always valid before the corresponding +- // BlockRep::slot is destructed, which is before the destruction of +- // block_rep_buf. +- using WriteQueue = WorkQueue; +- WriteQueue write_queue; +- std::unique_ptr write_thread; ++ // The size of the "tail" part of a SST file. "Tail" refers to ++ // all blocks after data blocks till the end of the SST file. ++ uint64_t tail_size; + +- // Estimate output file size when parallel compression is enabled. This is +- // necessary because compression & flush are no longer synchronized, +- // and BlockBasedTableBuilder::FileSize() is no longer accurate. +- // memory_order_relaxed suffices because accurate statistics is not required. +- class FileSizeEstimator { +- public: +- explicit FileSizeEstimator() +- : uncomp_bytes_compressed(0), +- uncomp_bytes_curr_block(0), +- uncomp_bytes_curr_block_set(false), +- uncomp_bytes_inflight(0), +- blocks_inflight(0), +- curr_compression_ratio(0), +- estimated_file_size(0) {} ++ // See class Footer ++ uint32_t base_context_checksum; + +- // Estimate file size when a block is about to be emitted to +- // compression thread +- void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { +- uint64_t new_uncomp_bytes_inflight = +- uncomp_bytes_inflight.fetch_add(uncomp_block_size, +- std::memory_order_relaxed) + +- uncomp_block_size; ++ uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } ++ void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } + +- uint64_t new_blocks_inflight = +- blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; ++ bool IsParallelCompressionEnabled() const { ++ return compression_opts.parallel_threads > 1; ++ } + +- estimated_file_size.store( +- curr_file_size + +- static_cast( +- static_cast(new_uncomp_bytes_inflight) * +- curr_compression_ratio.load(std::memory_order_relaxed)) + +- new_blocks_inflight * kBlockTrailerSize, +- std::memory_order_relaxed); ++ Status GetStatus() { ++ // We need to make modifications of status visible when status_ok is set ++ // to false, and this is ensured by status_mutex, so no special memory ++ // order for status_ok is required. ++ if (status_ok.load(std::memory_order_relaxed)) { ++ return Status::OK(); ++ } else { ++ return CopyStatus(); + } ++ } + +- // Estimate file size when a block is already reaped from +- // compression thread +- void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { +- assert(uncomp_bytes_curr_block_set); +- +- uint64_t new_uncomp_bytes_compressed = +- uncomp_bytes_compressed + uncomp_bytes_curr_block; +- assert(new_uncomp_bytes_compressed > 0); ++ Status CopyStatus() { ++ std::lock_guard lock(status_mutex); ++ return status; ++ } + +- curr_compression_ratio.store( +- (curr_compression_ratio.load(std::memory_order_relaxed) * +- uncomp_bytes_compressed + +- compressed_block_size) / +- static_cast(new_uncomp_bytes_compressed), +- std::memory_order_relaxed); +- uncomp_bytes_compressed = new_uncomp_bytes_compressed; ++ IOStatus GetIOStatus() { ++ // We need to make modifications of io_status visible when status_ok is set ++ // to false, and this is ensured by io_status_mutex, so no special memory ++ // order for io_status_ok is required. ++ if (io_status_ok.load(std::memory_order_relaxed)) { ++#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition ++ auto ios = CopyIOStatus(); ++ ios.PermitUncheckedError(); ++ // Assume no races in unit tests ++ assert(ios.ok()); ++#endif // ROCKSDB_ASSERT_STATUS_CHECKED ++ return IOStatus::OK(); ++ } else { ++ return CopyIOStatus(); ++ } ++ } + +- uint64_t new_uncomp_bytes_inflight = +- uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, +- std::memory_order_relaxed) - +- uncomp_bytes_curr_block; ++ IOStatus CopyIOStatus() { ++ std::lock_guard lock(io_status_mutex); ++ return io_status; ++ } + +- uint64_t new_blocks_inflight = +- blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; ++ // Never erase an existing status that is not OK. ++ void SetStatus(Status s) { ++ if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { ++ // Locking is an overkill for non compression_opts.parallel_threads ++ // case but since it's unlikely that s is not OK, we take this cost ++ // to be simplicity. ++ std::lock_guard lock(status_mutex); ++ status = s; ++ status_ok.store(false, std::memory_order_relaxed); ++ } ++ } + +- estimated_file_size.store( +- curr_file_size + +- static_cast( +- static_cast(new_uncomp_bytes_inflight) * +- curr_compression_ratio.load(std::memory_order_relaxed)) + +- new_blocks_inflight * kBlockTrailerSize, +- std::memory_order_relaxed); ++ // Never erase an existing I/O status that is not OK. ++ // Calling this will also SetStatus(ios) ++ void SetIOStatus(IOStatus ios) { ++ if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { ++ // Locking is an overkill for non compression_opts.parallel_threads ++ // case but since it's unlikely that s is not OK, we take this cost ++ // to be simplicity. ++ std::lock_guard lock(io_status_mutex); ++ io_status = ios; ++ io_status_ok.store(false, std::memory_order_relaxed); ++ } ++ SetStatus(ios); ++ } + +- uncomp_bytes_curr_block_set = false; ++ Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo, ++ WritableFileWriter* f) ++ : ioptions(tbo.ioptions), ++ prefix_extractor(tbo.moptions.prefix_extractor), ++ write_options(tbo.write_options), ++ table_options(table_opt), ++ internal_comparator(tbo.internal_comparator), ++ ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()), ++ persist_user_defined_timestamps( ++ tbo.ioptions.persist_user_defined_timestamps), ++ file(f), ++ offset(0), ++ alignment(table_options.block_align ++ ? std::min(static_cast(table_options.block_size), ++ kDefaultPageSize) ++ : 0), ++ data_block(table_options.block_restart_interval, ++ table_options.use_delta_encoding, ++ false /* use_value_delta_encoding */, ++ tbo.internal_comparator.user_comparator() ++ ->CanKeysWithDifferentByteContentsBeEqual() ++ ? BlockBasedTableOptions::kDataBlockBinarySearch ++ : table_options.data_block_index_type, ++ table_options.data_block_hash_table_util_ratio, ts_sz, ++ persist_user_defined_timestamps), ++ range_del_block( ++ 1 /* block_restart_interval */, true /* use_delta_encoding */, ++ false /* use_value_delta_encoding */, ++ BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */, ++ 0.75 /* data_block_hash_table_util_ratio */, ts_sz, ++ persist_user_defined_timestamps), ++ internal_prefix_transform(prefix_extractor.get()), ++ compression_type(tbo.compression_type), ++ sample_for_compression(tbo.moptions.sample_for_compression), ++ compressible_input_data_bytes(0), ++ uncompressible_input_data_bytes(0), ++ sampled_input_data_bytes(0), ++ sampled_output_slow_data_bytes(0), ++ sampled_output_fast_data_bytes(0), ++ compression_opts(tbo.compression_opts), ++ compression_dict(), ++ compression_ctxs(tbo.compression_opts.parallel_threads), ++ verify_ctxs(tbo.compression_opts.parallel_threads), ++ verify_dict(), ++ state((tbo.compression_opts.max_dict_bytes > 0 && ++ tbo.compression_type != kNoCompression) ++ ? State::kBuffered ++ : State::kUnbuffered), ++ use_delta_encoding_for_index_values(table_opt.format_version >= 4 && ++ !table_opt.block_align), ++ reason(tbo.reason), ++ flush_block_policy( ++ table_options.flush_block_policy_factory->NewFlushBlockPolicy( ++ table_options, data_block)), ++ create_context(&table_options, &ioptions, ioptions.stats, ++ compression_type == kZSTD || ++ compression_type == kZSTDNotFinalCompression, ++ tbo.moptions.block_protection_bytes_per_key, ++ tbo.internal_comparator.user_comparator(), ++ !use_delta_encoding_for_index_values, ++ table_opt.index_type == ++ BlockBasedTableOptions::kBinarySearchWithFirstKey), ++ tail_size(0), ++ status_ok(true), ++ io_status_ok(true) { ++ if (tbo.target_file_size == 0) { ++ buffer_limit = compression_opts.max_dict_buffer_bytes; ++ } else if (compression_opts.max_dict_buffer_bytes == 0) { ++ buffer_limit = tbo.target_file_size; ++ } else { ++ buffer_limit = std::min(tbo.target_file_size, ++ compression_opts.max_dict_buffer_bytes); + } + +- void SetEstimatedFileSize(uint64_t size) { +- estimated_file_size.store(size, std::memory_order_relaxed); ++ const auto compress_dict_build_buffer_charged = ++ table_options.cache_usage_options.options_overrides ++ .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer) ++ .charged; ++ if (table_options.block_cache && ++ (compress_dict_build_buffer_charged == ++ CacheEntryRoleOptions::Decision::kEnabled || ++ compress_dict_build_buffer_charged == ++ CacheEntryRoleOptions::Decision::kFallback)) { ++ compression_dict_buffer_cache_res_mgr = ++ std::make_shared>( ++ table_options.block_cache); ++ } else { ++ compression_dict_buffer_cache_res_mgr = nullptr; + } + +- uint64_t GetEstimatedFileSize() { +- return estimated_file_size.load(std::memory_order_relaxed); ++ assert(compression_ctxs.size() >= compression_opts.parallel_threads); ++ for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { ++ compression_ctxs[i].reset( ++ new CompressionContext(compression_type, compression_opts)); + } +- +- void SetCurrBlockUncompSize(uint64_t size) { +- uncomp_bytes_curr_block = size; +- uncomp_bytes_curr_block_set = true; ++ if (table_options.index_type == ++ BlockBasedTableOptions::kTwoLevelIndexSearch) { ++ p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( ++ &internal_comparator, use_delta_encoding_for_index_values, ++ table_options, ts_sz, persist_user_defined_timestamps); ++ index_builder.reset(p_index_builder_); ++ } else { ++ index_builder.reset(IndexBuilder::CreateIndexBuilder( ++ table_options.index_type, &internal_comparator, ++ &this->internal_prefix_transform, use_delta_encoding_for_index_values, ++ table_options, ts_sz, persist_user_defined_timestamps)); + } ++ if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) { ++ // Apply optimize_filters_for_hits setting here when applicable by ++ // skipping filter generation ++ filter_builder.reset(); ++ } else if (tbo.skip_filters) { ++ // For SstFileWriter skip_filters ++ filter_builder.reset(); ++ } else if (!table_options.filter_policy) { ++ // Null filter_policy -> no filter ++ filter_builder.reset(); ++ } else { ++ FilterBuildingContext filter_context(table_options); + +- private: +- // Input bytes compressed so far. +- uint64_t uncomp_bytes_compressed; +- // Size of current block being appended. +- uint64_t uncomp_bytes_curr_block; +- // Whether uncomp_bytes_curr_block has been set for next +- // ReapBlock call. +- bool uncomp_bytes_curr_block_set; +- // Input bytes under compression and not appended yet. +- std::atomic uncomp_bytes_inflight; +- // Number of blocks under compression and not appended yet. +- std::atomic blocks_inflight; +- // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. +- std::atomic curr_compression_ratio; +- // Estimated SST file size. +- std::atomic estimated_file_size; +- }; +- FileSizeEstimator file_size_estimator; ++ filter_context.info_log = ioptions.logger; ++ filter_context.column_family_name = tbo.column_family_name; ++ filter_context.reason = reason; + +- // Facilities used for waiting first block completion. Need to Wait for +- // the completion of first block compression and flush to get a non-zero +- // compression ratio. +- std::atomic first_block_processed; +- std::condition_variable first_block_cond; +- std::mutex first_block_mutex; ++ // Only populate other fields if known to be in LSM rather than ++ // generating external SST file ++ if (reason != TableFileCreationReason::kMisc) { ++ filter_context.compaction_style = ioptions.compaction_style; ++ filter_context.num_levels = ioptions.num_levels; ++ filter_context.level_at_creation = tbo.level_at_creation; ++ filter_context.is_bottommost = tbo.is_bottommost; ++ assert(filter_context.level_at_creation < filter_context.num_levels); ++ } + +- explicit ParallelCompressionRep(uint32_t parallel_threads) +- : curr_block_keys(new Keys()), +- block_rep_buf(parallel_threads), +- block_rep_pool(parallel_threads), +- compress_queue(parallel_threads), +- write_queue(parallel_threads), +- first_block_processed(false) { +- for (uint32_t i = 0; i < parallel_threads; i++) { +- block_rep_buf[i].contents = Slice(); +- block_rep_buf[i].compressed_contents = Slice(); +- block_rep_buf[i].data.reset(new std::string()); +- block_rep_buf[i].compressed_data.reset(new std::string()); +- block_rep_buf[i].compression_type = CompressionType(); +- block_rep_buf[i].first_key_in_next_block.reset(new std::string()); +- block_rep_buf[i].keys.reset(new Keys()); +- block_rep_buf[i].slot.reset(new BlockRepSlot()); +- block_rep_buf[i].status = Status::OK(); +- block_rep_pool.push(&block_rep_buf[i]); ++ filter_builder.reset(CreateFilterBlockBuilder( ++ ioptions, tbo.moptions, filter_context, ++ use_delta_encoding_for_index_values, p_index_builder_, ts_sz, ++ persist_user_defined_timestamps)); + } +- } +- +- ~ParallelCompressionRep() { block_rep_pool.finish(); } +- +- // Make a block prepared to be emitted to compression thread +- // Used in non-buffered mode +- BlockRep* PrepareBlock(CompressionType compression_type, +- const Slice* first_key_in_next_block, +- BlockBuilder* data_block) { +- BlockRep* block_rep = +- PrepareBlockInternal(compression_type, first_key_in_next_block); +- assert(block_rep != nullptr); +- data_block->SwapAndReset(*(block_rep->data)); +- block_rep->contents = *(block_rep->data); +- std::swap(block_rep->keys, curr_block_keys); +- curr_block_keys->Clear(); +- return block_rep; +- } + +- // Used in EnterUnbuffered +- BlockRep* PrepareBlock(CompressionType compression_type, +- const Slice* first_key_in_next_block, +- std::string* data_block, +- std::vector* keys) { +- BlockRep* block_rep = +- PrepareBlockInternal(compression_type, first_key_in_next_block); +- assert(block_rep != nullptr); +- std::swap(*(block_rep->data), *data_block); +- block_rep->contents = *(block_rep->data); +- block_rep->keys->SwapAssign(*keys); +- return block_rep; +- } ++ assert(tbo.internal_tbl_prop_coll_factories); ++ for (auto& factory : *tbo.internal_tbl_prop_coll_factories) { ++ assert(factory); + +- // Emit a block to compression thread +- void EmitBlock(BlockRep* block_rep) { +- assert(block_rep != nullptr); +- assert(block_rep->status.ok()); +- if (!write_queue.push(block_rep->slot.get())) { +- return; ++ std::unique_ptr collector{ ++ factory->CreateInternalTblPropColl( ++ tbo.column_family_id, tbo.level_at_creation, ++ tbo.ioptions.num_levels, ++ tbo.last_level_inclusive_max_seqno_threshold)}; ++ if (collector) { ++ table_properties_collectors.emplace_back(std::move(collector)); ++ } + } +- if (!compress_queue.push(block_rep)) { +- return; ++ table_properties_collectors.emplace_back( ++ new BlockBasedTablePropertiesCollector( ++ table_options.index_type, table_options.whole_key_filtering, ++ prefix_extractor != nullptr, ++ table_options.decouple_partitioned_filters)); ++ if (ts_sz > 0 && persist_user_defined_timestamps) { ++ table_properties_collectors.emplace_back( ++ new TimestampTablePropertiesCollector( ++ tbo.internal_comparator.user_comparator())); + } +- +- if (!first_block_processed.load(std::memory_order_relaxed)) { +- std::unique_lock lock(first_block_mutex); +- first_block_cond.wait(lock, [this] { +- return first_block_processed.load(std::memory_order_relaxed); +- }); ++ if (table_options.verify_compression) { ++ for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { ++ verify_ctxs[i].reset(new UncompressionContext(compression_type)); ++ } + } +- } +- +- // Reap a block from compression thread +- void ReapBlock(BlockRep* block_rep) { +- assert(block_rep != nullptr); +- block_rep->compressed_data->clear(); +- block_rep_pool.push(block_rep); + +- if (!first_block_processed.load(std::memory_order_relaxed)) { +- std::lock_guard lock(first_block_mutex); +- first_block_processed.store(true, std::memory_order_relaxed); +- first_block_cond.notify_one(); ++ // These are only needed for populating table properties ++ props.column_family_id = tbo.column_family_id; ++ props.column_family_name = tbo.column_family_name; ++ props.oldest_key_time = tbo.oldest_key_time; ++ props.file_creation_time = tbo.file_creation_time; ++ props.orig_file_number = tbo.cur_file_num; ++ props.db_id = tbo.db_id; ++ props.db_session_id = tbo.db_session_id; ++ props.db_host_id = ioptions.db_host_id; ++ if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { ++ ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); + } +- } +- +- private: +- BlockRep* PrepareBlockInternal(CompressionType compression_type, +- const Slice* first_key_in_next_block) { +- BlockRep* block_rep = nullptr; +- block_rep_pool.pop(block_rep); +- assert(block_rep != nullptr); +- +- assert(block_rep->data); +- +- block_rep->compression_type = compression_type; + +- if (first_key_in_next_block == nullptr) { +- block_rep->first_key_in_next_block.reset(nullptr); ++ if (FormatVersionUsesContextChecksum(table_options.format_version)) { ++ // Must be non-zero and semi- or quasi-random ++ // TODO: ideally guaranteed different for related files (e.g. use file ++ // number and db_session, for benefit of SstFileWriter) ++ do { ++ base_context_checksum = Random::GetTLSInstance()->Next(); ++ } while (UNLIKELY(base_context_checksum == 0)); + } else { +- block_rep->first_key_in_next_block->assign( +- first_key_in_next_block->data(), first_key_in_next_block->size()); ++ base_context_checksum = 0; + } + +- return block_rep; ++ if (alignment > 0 && compression_type != kNoCompression) { ++ // With better sanitization in `CompactionPicker::CompactFiles()`, we ++ // would not need to handle this case here and could change it to an ++ // assertion instead. ++ SetStatus(Status::InvalidArgument( ++ "Enable block_align, but compression enabled")); ++ } + } ++ ++ Rep(const Rep&) = delete; ++ Rep& operator=(const Rep&) = delete; ++ ++ private: ++ // Synchronize status & io_status accesses across threads from main thread, ++ // compression thread and write thread in parallel compression. ++ std::mutex status_mutex; ++ std::atomic status_ok; ++ Status status; ++ std::mutex io_status_mutex; ++ std::atomic io_status_ok; ++ IOStatus io_status; + }; + + BlockBasedTableBuilder::BlockBasedTableBuilder( diff --git a/recipes/rocksdb/config.yml b/recipes/rocksdb/config.yml index 8044f69736f9c..92ed7e6094b6a 100644 --- a/recipes/rocksdb/config.yml +++ b/recipes/rocksdb/config.yml @@ -1,4 +1,6 @@ versions: + "9.6.1": + folder: all "9.5.2": folder: all "9.4.0": From 2a9e5fb9dbffca623ee11346fd881225ae857134 Mon Sep 17 00:00:00 2001 From: toge Date: Tue, 15 Oct 2024 09:34:24 +0900 Subject: [PATCH 2/2] update 9.7.2 --- recipes/rocksdb/all/conandata.yml | 10 +- .../patches/9.6.1-0001-support-clang.patch | 1331 ----------------- recipes/rocksdb/config.yml | 2 +- 3 files changed, 4 insertions(+), 1339 deletions(-) delete mode 100644 recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch diff --git a/recipes/rocksdb/all/conandata.yml b/recipes/rocksdb/all/conandata.yml index 51de59560d7e7..771ad19b3d30e 100644 --- a/recipes/rocksdb/all/conandata.yml +++ b/recipes/rocksdb/all/conandata.yml @@ -1,7 +1,7 @@ sources: - "9.6.1": - url: "https://github.com/facebook/rocksdb/archive/refs/tags/v9.6.1.tar.gz" - sha256: "98cf497c1d6d0a927142d2002a0b6b4816a0998c74fda9ae7b1bdaf6b784e895" + "9.7.2": + url: "https://github.com/facebook/rocksdb/archive/refs/tags/v9.7.2.tar.gz" + sha256: "13e9c41d290199ee0185590d4fa9d327422aaf75765b3193945303c3c314e07d" "9.5.2": url: "https://github.com/facebook/rocksdb/archive/refs/tags/v9.5.2.tar.gz" sha256: "B20780586D3DF4A3C5BCBDE341A2C1946B03D18237960BDA5BC5E9538F42AF40" @@ -30,10 +30,6 @@ sources: url: "https://github.com/facebook/rocksdb/archive/refs/tags/v6.20.3.tar.gz" sha256: "c6502c7aae641b7e20fafa6c2b92273d935d2b7b2707135ebd9a67b092169dca" patches: - "9.6.1": - - patch_file: "patches/9.6.1-0001-support-clang.patch" - patch_description: "Fix build with gcc 13 by including cstdint" - patch_type: "portability" "6.29.5": - patch_file: "patches/6.29.5-0001-add-include-cstdint-for-gcc-13.patch" patch_description: "Fix build with gcc 13 by including cstdint" diff --git a/recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch b/recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch deleted file mode 100644 index 8141c951a8c73..0000000000000 --- a/recipes/rocksdb/all/patches/9.6.1-0001-support-clang.patch +++ /dev/null @@ -1,1331 +0,0 @@ -diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc -index 1b444f5..e3d64d6 100644 ---- a/table/block_based/block_based_table_builder.cc -+++ b/table/block_based/block_based_table_builder.cc -@@ -267,708 +267,708 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector - bool decoupled_partitioned_filters_; - }; - --struct BlockBasedTableBuilder::Rep { -- const ImmutableOptions ioptions; -- // BEGIN from MutableCFOptions -- std::shared_ptr prefix_extractor; -- // END from MutableCFOptions -- const WriteOptions write_options; -- const BlockBasedTableOptions table_options; -- const InternalKeyComparator& internal_comparator; -- // Size in bytes for the user-defined timestamps. -- size_t ts_sz; -- // When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the -- // user key will be stripped when creating the block based table. This -- // stripping happens for all user keys, including the keys in data block, -- // index block for data block, index block for index block (if index type is -- // `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned -- // filters), the `first_internal_key` in `IndexValue`, the `end_key` for range -- // deletion entries. -- // As long as the user keys are sorted when added via `Add` API, their logic -- // ordering won't change after timestamps are stripped. However, for each user -- // key to be logically equivalent before and after timestamp is stripped, the -- // user key should contain the minimum timestamp. -- bool persist_user_defined_timestamps; -- WritableFileWriter* file; -- std::atomic offset; -- size_t alignment; -- BlockBuilder data_block; -- // Buffers uncompressed data blocks to replay later. Needed when -- // compression dictionary is enabled so we can finalize the dictionary before -- // compressing any data blocks. -- std::vector data_block_buffers; -- BlockBuilder range_del_block; -+struct BlockBasedTableBuilder::ParallelCompressionRep { -+ // TODO: consider replacing with autovector or similar -+ // Keys is a wrapper of vector of strings avoiding -+ // releasing string memories during vector clear() -+ // in order to save memory allocation overhead -+ class Keys { -+ public: -+ Keys() : keys_(kKeysInitSize), size_(0) {} -+ void PushBack(const Slice& key) { -+ if (size_ == keys_.size()) { -+ keys_.emplace_back(key.data(), key.size()); -+ } else { -+ keys_[size_].assign(key.data(), key.size()); -+ } -+ size_++; -+ } -+ void SwapAssign(std::vector& keys) { -+ size_ = keys.size(); -+ std::swap(keys_, keys); -+ } -+ void Clear() { size_ = 0; } -+ size_t Size() { return size_; } -+ std::string& Back() { return keys_[size_ - 1]; } -+ std::string& operator[](size_t idx) { -+ assert(idx < size_); -+ return keys_[idx]; -+ } - -- InternalKeySliceTransform internal_prefix_transform; -- std::unique_ptr index_builder; -- std::string index_separator_scratch; -- PartitionedIndexBuilder* p_index_builder_ = nullptr; -+ private: -+ const size_t kKeysInitSize = 32; -+ std::vector keys_; -+ size_t size_; -+ }; -+ std::unique_ptr curr_block_keys; - -- std::string last_ikey; // Internal key or empty (unset) -- const Slice* first_key_in_next_block = nullptr; -- CompressionType compression_type; -- uint64_t sample_for_compression; -- std::atomic compressible_input_data_bytes; -- std::atomic uncompressible_input_data_bytes; -- std::atomic sampled_input_data_bytes; -- std::atomic sampled_output_slow_data_bytes; -- std::atomic sampled_output_fast_data_bytes; -- CompressionOptions compression_opts; -- std::unique_ptr compression_dict; -- std::vector> compression_ctxs; -- std::vector> verify_ctxs; -- std::unique_ptr verify_dict; -+ class BlockRepSlot; - -- size_t data_begin_offset = 0; -+ // BlockRep instances are fetched from and recycled to -+ // block_rep_pool during parallel compression. -+ struct BlockRep { -+ Slice contents; -+ Slice compressed_contents; -+ std::unique_ptr data; -+ std::unique_ptr compressed_data; -+ CompressionType compression_type; -+ std::unique_ptr first_key_in_next_block; -+ std::unique_ptr keys; -+ std::unique_ptr slot; -+ Status status; -+ }; -+ // Use a vector of BlockRep as a buffer for a determined number -+ // of BlockRep structures. All data referenced by pointers in -+ // BlockRep will be freed when this vector is destructed. -+ using BlockRepBuffer = std::vector; -+ BlockRepBuffer block_rep_buf; -+ // Use a thread-safe queue for concurrent access from block -+ // building thread and writer thread. -+ using BlockRepPool = WorkQueue; -+ BlockRepPool block_rep_pool; - -- TableProperties props; -+ // Use BlockRepSlot to keep block order in write thread. -+ // slot_ will pass references to BlockRep -+ class BlockRepSlot { -+ public: -+ BlockRepSlot() : slot_(1) {} -+ template -+ void Fill(T&& rep) { -+ slot_.push(std::forward(rep)); -+ } -+ void Take(BlockRep*& rep) { slot_.pop(rep); } - -- // States of the builder. -- // -- // - `kBuffered`: This is the initial state where zero or more data blocks are -- // accumulated uncompressed in-memory. From this state, call -- // `EnterUnbuffered()` to finalize the compression dictionary if enabled, -- // compress/write out any buffered blocks, and proceed to the `kUnbuffered` -- // state. -- // -- // - `kUnbuffered`: This is the state when compression dictionary is finalized -- // either because it wasn't enabled in the first place or it's been created -- // from sampling previously buffered data. In this state, blocks are simply -- // compressed/written out as they fill up. From this state, call `Finish()` -- // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete -- // the partially created file. -- // -- // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been -- // called, so the table builder is no longer usable. We must be in this -- // state by the time the destructor runs. -- enum class State { -- kBuffered, -- kUnbuffered, -- kClosed, -+ private: -+ // slot_ will pass references to BlockRep in block_rep_buf, -+ // and those references are always valid before the destruction of -+ // block_rep_buf. -+ WorkQueue slot_; - }; -- State state; -- // `kBuffered` state is allowed only as long as the buffering of uncompressed -- // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. -- uint64_t buffer_limit; -- std::shared_ptr -- compression_dict_buffer_cache_res_mgr; -- const bool use_delta_encoding_for_index_values; -- std::unique_ptr filter_builder; -- OffsetableCacheKey base_cache_key; -- const TableFileCreationReason reason; - -- BlockHandle pending_handle; // Handle to add to index block -+ // Compression queue will pass references to BlockRep in block_rep_buf, -+ // and those references are always valid before the destruction of -+ // block_rep_buf. -+ using CompressQueue = WorkQueue; -+ CompressQueue compress_queue; -+ std::vector compress_thread_pool; - -- std::string compressed_output; -- std::unique_ptr flush_block_policy; -+ // Write queue will pass references to BlockRep::slot in block_rep_buf, -+ // and those references are always valid before the corresponding -+ // BlockRep::slot is destructed, which is before the destruction of -+ // block_rep_buf. -+ using WriteQueue = WorkQueue; -+ WriteQueue write_queue; -+ std::unique_ptr write_thread; - -- std::vector> table_properties_collectors; -+ // Estimate output file size when parallel compression is enabled. This is -+ // necessary because compression & flush are no longer synchronized, -+ // and BlockBasedTableBuilder::FileSize() is no longer accurate. -+ // memory_order_relaxed suffices because accurate statistics is not required. -+ class FileSizeEstimator { -+ public: -+ explicit FileSizeEstimator() -+ : uncomp_bytes_compressed(0), -+ uncomp_bytes_curr_block(0), -+ uncomp_bytes_curr_block_set(false), -+ uncomp_bytes_inflight(0), -+ blocks_inflight(0), -+ curr_compression_ratio(0), -+ estimated_file_size(0) {} - -- std::unique_ptr pc_rep; -- BlockCreateContext create_context; -+ // Estimate file size when a block is about to be emitted to -+ // compression thread -+ void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { -+ uint64_t new_uncomp_bytes_inflight = -+ uncomp_bytes_inflight.fetch_add(uncomp_block_size, -+ std::memory_order_relaxed) + -+ uncomp_block_size; - -- // The size of the "tail" part of a SST file. "Tail" refers to -- // all blocks after data blocks till the end of the SST file. -- uint64_t tail_size; -+ uint64_t new_blocks_inflight = -+ blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; - -- // See class Footer -- uint32_t base_context_checksum; -+ estimated_file_size.store( -+ curr_file_size + -+ static_cast( -+ static_cast(new_uncomp_bytes_inflight) * -+ curr_compression_ratio.load(std::memory_order_relaxed)) + -+ new_blocks_inflight * kBlockTrailerSize, -+ std::memory_order_relaxed); -+ } - -- uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } -- void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } -+ // Estimate file size when a block is already reaped from -+ // compression thread -+ void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { -+ assert(uncomp_bytes_curr_block_set); - -- bool IsParallelCompressionEnabled() const { -- return compression_opts.parallel_threads > 1; -- } -+ uint64_t new_uncomp_bytes_compressed = -+ uncomp_bytes_compressed + uncomp_bytes_curr_block; -+ assert(new_uncomp_bytes_compressed > 0); - -- Status GetStatus() { -- // We need to make modifications of status visible when status_ok is set -- // to false, and this is ensured by status_mutex, so no special memory -- // order for status_ok is required. -- if (status_ok.load(std::memory_order_relaxed)) { -- return Status::OK(); -- } else { -- return CopyStatus(); -- } -- } -+ curr_compression_ratio.store( -+ (curr_compression_ratio.load(std::memory_order_relaxed) * -+ uncomp_bytes_compressed + -+ compressed_block_size) / -+ static_cast(new_uncomp_bytes_compressed), -+ std::memory_order_relaxed); -+ uncomp_bytes_compressed = new_uncomp_bytes_compressed; - -- Status CopyStatus() { -- std::lock_guard lock(status_mutex); -- return status; -- } -+ uint64_t new_uncomp_bytes_inflight = -+ uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, -+ std::memory_order_relaxed) - -+ uncomp_bytes_curr_block; - -- IOStatus GetIOStatus() { -- // We need to make modifications of io_status visible when status_ok is set -- // to false, and this is ensured by io_status_mutex, so no special memory -- // order for io_status_ok is required. -- if (io_status_ok.load(std::memory_order_relaxed)) { --#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition -- auto ios = CopyIOStatus(); -- ios.PermitUncheckedError(); -- // Assume no races in unit tests -- assert(ios.ok()); --#endif // ROCKSDB_ASSERT_STATUS_CHECKED -- return IOStatus::OK(); -- } else { -- return CopyIOStatus(); -- } -- } -+ uint64_t new_blocks_inflight = -+ blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; - -- IOStatus CopyIOStatus() { -- std::lock_guard lock(io_status_mutex); -- return io_status; -- } -+ estimated_file_size.store( -+ curr_file_size + -+ static_cast( -+ static_cast(new_uncomp_bytes_inflight) * -+ curr_compression_ratio.load(std::memory_order_relaxed)) + -+ new_blocks_inflight * kBlockTrailerSize, -+ std::memory_order_relaxed); - -- // Never erase an existing status that is not OK. -- void SetStatus(Status s) { -- if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { -- // Locking is an overkill for non compression_opts.parallel_threads -- // case but since it's unlikely that s is not OK, we take this cost -- // to be simplicity. -- std::lock_guard lock(status_mutex); -- status = s; -- status_ok.store(false, std::memory_order_relaxed); -+ uncomp_bytes_curr_block_set = false; - } -- } - -- // Never erase an existing I/O status that is not OK. -- // Calling this will also SetStatus(ios) -- void SetIOStatus(IOStatus ios) { -- if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { -- // Locking is an overkill for non compression_opts.parallel_threads -- // case but since it's unlikely that s is not OK, we take this cost -- // to be simplicity. -- std::lock_guard lock(io_status_mutex); -- io_status = ios; -- io_status_ok.store(false, std::memory_order_relaxed); -+ void SetEstimatedFileSize(uint64_t size) { -+ estimated_file_size.store(size, std::memory_order_relaxed); - } -- SetStatus(ios); -- } - -- Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo, -- WritableFileWriter* f) -- : ioptions(tbo.ioptions), -- prefix_extractor(tbo.moptions.prefix_extractor), -- write_options(tbo.write_options), -- table_options(table_opt), -- internal_comparator(tbo.internal_comparator), -- ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()), -- persist_user_defined_timestamps( -- tbo.ioptions.persist_user_defined_timestamps), -- file(f), -- offset(0), -- alignment(table_options.block_align -- ? std::min(static_cast(table_options.block_size), -- kDefaultPageSize) -- : 0), -- data_block(table_options.block_restart_interval, -- table_options.use_delta_encoding, -- false /* use_value_delta_encoding */, -- tbo.internal_comparator.user_comparator() -- ->CanKeysWithDifferentByteContentsBeEqual() -- ? BlockBasedTableOptions::kDataBlockBinarySearch -- : table_options.data_block_index_type, -- table_options.data_block_hash_table_util_ratio, ts_sz, -- persist_user_defined_timestamps), -- range_del_block( -- 1 /* block_restart_interval */, true /* use_delta_encoding */, -- false /* use_value_delta_encoding */, -- BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */, -- 0.75 /* data_block_hash_table_util_ratio */, ts_sz, -- persist_user_defined_timestamps), -- internal_prefix_transform(prefix_extractor.get()), -- compression_type(tbo.compression_type), -- sample_for_compression(tbo.moptions.sample_for_compression), -- compressible_input_data_bytes(0), -- uncompressible_input_data_bytes(0), -- sampled_input_data_bytes(0), -- sampled_output_slow_data_bytes(0), -- sampled_output_fast_data_bytes(0), -- compression_opts(tbo.compression_opts), -- compression_dict(), -- compression_ctxs(tbo.compression_opts.parallel_threads), -- verify_ctxs(tbo.compression_opts.parallel_threads), -- verify_dict(), -- state((tbo.compression_opts.max_dict_bytes > 0 && -- tbo.compression_type != kNoCompression) -- ? State::kBuffered -- : State::kUnbuffered), -- use_delta_encoding_for_index_values(table_opt.format_version >= 4 && -- !table_opt.block_align), -- reason(tbo.reason), -- flush_block_policy( -- table_options.flush_block_policy_factory->NewFlushBlockPolicy( -- table_options, data_block)), -- create_context(&table_options, &ioptions, ioptions.stats, -- compression_type == kZSTD || -- compression_type == kZSTDNotFinalCompression, -- tbo.moptions.block_protection_bytes_per_key, -- tbo.internal_comparator.user_comparator(), -- !use_delta_encoding_for_index_values, -- table_opt.index_type == -- BlockBasedTableOptions::kBinarySearchWithFirstKey), -- tail_size(0), -- status_ok(true), -- io_status_ok(true) { -- if (tbo.target_file_size == 0) { -- buffer_limit = compression_opts.max_dict_buffer_bytes; -- } else if (compression_opts.max_dict_buffer_bytes == 0) { -- buffer_limit = tbo.target_file_size; -- } else { -- buffer_limit = std::min(tbo.target_file_size, -- compression_opts.max_dict_buffer_bytes); -+ uint64_t GetEstimatedFileSize() { -+ return estimated_file_size.load(std::memory_order_relaxed); - } - -- const auto compress_dict_build_buffer_charged = -- table_options.cache_usage_options.options_overrides -- .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer) -- .charged; -- if (table_options.block_cache && -- (compress_dict_build_buffer_charged == -- CacheEntryRoleOptions::Decision::kEnabled || -- compress_dict_build_buffer_charged == -- CacheEntryRoleOptions::Decision::kFallback)) { -- compression_dict_buffer_cache_res_mgr = -- std::make_shared>( -- table_options.block_cache); -- } else { -- compression_dict_buffer_cache_res_mgr = nullptr; -+ void SetCurrBlockUncompSize(uint64_t size) { -+ uncomp_bytes_curr_block = size; -+ uncomp_bytes_curr_block_set = true; - } - -- assert(compression_ctxs.size() >= compression_opts.parallel_threads); -- for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { -- compression_ctxs[i].reset( -- new CompressionContext(compression_type, compression_opts)); -+ private: -+ // Input bytes compressed so far. -+ uint64_t uncomp_bytes_compressed; -+ // Size of current block being appended. -+ uint64_t uncomp_bytes_curr_block; -+ // Whether uncomp_bytes_curr_block has been set for next -+ // ReapBlock call. -+ bool uncomp_bytes_curr_block_set; -+ // Input bytes under compression and not appended yet. -+ std::atomic uncomp_bytes_inflight; -+ // Number of blocks under compression and not appended yet. -+ std::atomic blocks_inflight; -+ // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. -+ std::atomic curr_compression_ratio; -+ // Estimated SST file size. -+ std::atomic estimated_file_size; -+ }; -+ FileSizeEstimator file_size_estimator; -+ -+ // Facilities used for waiting first block completion. Need to Wait for -+ // the completion of first block compression and flush to get a non-zero -+ // compression ratio. -+ std::atomic first_block_processed; -+ std::condition_variable first_block_cond; -+ std::mutex first_block_mutex; -+ -+ explicit ParallelCompressionRep(uint32_t parallel_threads) -+ : curr_block_keys(new Keys()), -+ block_rep_buf(parallel_threads), -+ block_rep_pool(parallel_threads), -+ compress_queue(parallel_threads), -+ write_queue(parallel_threads), -+ first_block_processed(false) { -+ for (uint32_t i = 0; i < parallel_threads; i++) { -+ block_rep_buf[i].contents = Slice(); -+ block_rep_buf[i].compressed_contents = Slice(); -+ block_rep_buf[i].data.reset(new std::string()); -+ block_rep_buf[i].compressed_data.reset(new std::string()); -+ block_rep_buf[i].compression_type = CompressionType(); -+ block_rep_buf[i].first_key_in_next_block.reset(new std::string()); -+ block_rep_buf[i].keys.reset(new Keys()); -+ block_rep_buf[i].slot.reset(new BlockRepSlot()); -+ block_rep_buf[i].status = Status::OK(); -+ block_rep_pool.push(&block_rep_buf[i]); -+ } -+ } -+ -+ ~ParallelCompressionRep() { block_rep_pool.finish(); } -+ -+ // Make a block prepared to be emitted to compression thread -+ // Used in non-buffered mode -+ BlockRep* PrepareBlock(CompressionType compression_type, -+ const Slice* first_key_in_next_block, -+ BlockBuilder* data_block) { -+ BlockRep* block_rep = -+ PrepareBlockInternal(compression_type, first_key_in_next_block); -+ assert(block_rep != nullptr); -+ data_block->SwapAndReset(*(block_rep->data)); -+ block_rep->contents = *(block_rep->data); -+ std::swap(block_rep->keys, curr_block_keys); -+ curr_block_keys->Clear(); -+ return block_rep; -+ } -+ -+ // Used in EnterUnbuffered -+ BlockRep* PrepareBlock(CompressionType compression_type, -+ const Slice* first_key_in_next_block, -+ std::string* data_block, -+ std::vector* keys) { -+ BlockRep* block_rep = -+ PrepareBlockInternal(compression_type, first_key_in_next_block); -+ assert(block_rep != nullptr); -+ std::swap(*(block_rep->data), *data_block); -+ block_rep->contents = *(block_rep->data); -+ block_rep->keys->SwapAssign(*keys); -+ return block_rep; -+ } -+ -+ // Emit a block to compression thread -+ void EmitBlock(BlockRep* block_rep) { -+ assert(block_rep != nullptr); -+ assert(block_rep->status.ok()); -+ if (!write_queue.push(block_rep->slot.get())) { -+ return; - } -- if (table_options.index_type == -- BlockBasedTableOptions::kTwoLevelIndexSearch) { -- p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( -- &internal_comparator, use_delta_encoding_for_index_values, -- table_options, ts_sz, persist_user_defined_timestamps); -- index_builder.reset(p_index_builder_); -- } else { -- index_builder.reset(IndexBuilder::CreateIndexBuilder( -- table_options.index_type, &internal_comparator, -- &this->internal_prefix_transform, use_delta_encoding_for_index_values, -- table_options, ts_sz, persist_user_defined_timestamps)); -+ if (!compress_queue.push(block_rep)) { -+ return; - } -- if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) { -- // Apply optimize_filters_for_hits setting here when applicable by -- // skipping filter generation -- filter_builder.reset(); -- } else if (tbo.skip_filters) { -- // For SstFileWriter skip_filters -- filter_builder.reset(); -- } else if (!table_options.filter_policy) { -- // Null filter_policy -> no filter -- filter_builder.reset(); -- } else { -- FilterBuildingContext filter_context(table_options); - -- filter_context.info_log = ioptions.logger; -- filter_context.column_family_name = tbo.column_family_name; -- filter_context.reason = reason; -+ if (!first_block_processed.load(std::memory_order_relaxed)) { -+ std::unique_lock lock(first_block_mutex); -+ first_block_cond.wait(lock, [this] { -+ return first_block_processed.load(std::memory_order_relaxed); -+ }); -+ } -+ } - -- // Only populate other fields if known to be in LSM rather than -- // generating external SST file -- if (reason != TableFileCreationReason::kMisc) { -- filter_context.compaction_style = ioptions.compaction_style; -- filter_context.num_levels = ioptions.num_levels; -- filter_context.level_at_creation = tbo.level_at_creation; -- filter_context.is_bottommost = tbo.is_bottommost; -- assert(filter_context.level_at_creation < filter_context.num_levels); -- } -+ // Reap a block from compression thread -+ void ReapBlock(BlockRep* block_rep) { -+ assert(block_rep != nullptr); -+ block_rep->compressed_data->clear(); -+ block_rep_pool.push(block_rep); - -- filter_builder.reset(CreateFilterBlockBuilder( -- ioptions, tbo.moptions, filter_context, -- use_delta_encoding_for_index_values, p_index_builder_, ts_sz, -- persist_user_defined_timestamps)); -+ if (!first_block_processed.load(std::memory_order_relaxed)) { -+ std::lock_guard lock(first_block_mutex); -+ first_block_processed.store(true, std::memory_order_relaxed); -+ first_block_cond.notify_one(); - } -+ } - -- assert(tbo.internal_tbl_prop_coll_factories); -- for (auto& factory : *tbo.internal_tbl_prop_coll_factories) { -- assert(factory); -+ private: -+ BlockRep* PrepareBlockInternal(CompressionType compression_type, -+ const Slice* first_key_in_next_block) { -+ BlockRep* block_rep = nullptr; -+ block_rep_pool.pop(block_rep); -+ assert(block_rep != nullptr); - -- std::unique_ptr collector{ -- factory->CreateInternalTblPropColl( -- tbo.column_family_id, tbo.level_at_creation, -- tbo.ioptions.num_levels, -- tbo.last_level_inclusive_max_seqno_threshold)}; -- if (collector) { -- table_properties_collectors.emplace_back(std::move(collector)); -- } -- } -- table_properties_collectors.emplace_back( -- new BlockBasedTablePropertiesCollector( -- table_options.index_type, table_options.whole_key_filtering, -- prefix_extractor != nullptr, -- table_options.decouple_partitioned_filters)); -- if (ts_sz > 0 && persist_user_defined_timestamps) { -- table_properties_collectors.emplace_back( -- new TimestampTablePropertiesCollector( -- tbo.internal_comparator.user_comparator())); -- } -- if (table_options.verify_compression) { -- for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { -- verify_ctxs[i].reset(new UncompressionContext(compression_type)); -- } -- } -+ assert(block_rep->data); - -- // These are only needed for populating table properties -- props.column_family_id = tbo.column_family_id; -- props.column_family_name = tbo.column_family_name; -- props.oldest_key_time = tbo.oldest_key_time; -- props.file_creation_time = tbo.file_creation_time; -- props.orig_file_number = tbo.cur_file_num; -- props.db_id = tbo.db_id; -- props.db_session_id = tbo.db_session_id; -- props.db_host_id = ioptions.db_host_id; -- if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { -- ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); -- } -+ block_rep->compression_type = compression_type; - -- if (FormatVersionUsesContextChecksum(table_options.format_version)) { -- // Must be non-zero and semi- or quasi-random -- // TODO: ideally guaranteed different for related files (e.g. use file -- // number and db_session, for benefit of SstFileWriter) -- do { -- base_context_checksum = Random::GetTLSInstance()->Next(); -- } while (UNLIKELY(base_context_checksum == 0)); -+ if (first_key_in_next_block == nullptr) { -+ block_rep->first_key_in_next_block.reset(nullptr); - } else { -- base_context_checksum = 0; -+ block_rep->first_key_in_next_block->assign( -+ first_key_in_next_block->data(), first_key_in_next_block->size()); - } - -- if (alignment > 0 && compression_type != kNoCompression) { -- // With better sanitization in `CompactionPicker::CompactFiles()`, we -- // would not need to handle this case here and could change it to an -- // assertion instead. -- SetStatus(Status::InvalidArgument( -- "Enable block_align, but compression enabled")); -- } -+ return block_rep; - } -+}; - -- Rep(const Rep&) = delete; -- Rep& operator=(const Rep&) = delete; -+struct BlockBasedTableBuilder::Rep { -+ const ImmutableOptions ioptions; -+ // BEGIN from MutableCFOptions -+ std::shared_ptr prefix_extractor; -+ // END from MutableCFOptions -+ const WriteOptions write_options; -+ const BlockBasedTableOptions table_options; -+ const InternalKeyComparator& internal_comparator; -+ // Size in bytes for the user-defined timestamps. -+ size_t ts_sz; -+ // When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the -+ // user key will be stripped when creating the block based table. This -+ // stripping happens for all user keys, including the keys in data block, -+ // index block for data block, index block for index block (if index type is -+ // `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned -+ // filters), the `first_internal_key` in `IndexValue`, the `end_key` for range -+ // deletion entries. -+ // As long as the user keys are sorted when added via `Add` API, their logic -+ // ordering won't change after timestamps are stripped. However, for each user -+ // key to be logically equivalent before and after timestamp is stripped, the -+ // user key should contain the minimum timestamp. -+ bool persist_user_defined_timestamps; -+ WritableFileWriter* file; -+ std::atomic offset; -+ size_t alignment; -+ BlockBuilder data_block; -+ // Buffers uncompressed data blocks to replay later. Needed when -+ // compression dictionary is enabled so we can finalize the dictionary before -+ // compressing any data blocks. -+ std::vector data_block_buffers; -+ BlockBuilder range_del_block; - -- private: -- // Synchronize status & io_status accesses across threads from main thread, -- // compression thread and write thread in parallel compression. -- std::mutex status_mutex; -- std::atomic status_ok; -- Status status; -- std::mutex io_status_mutex; -- std::atomic io_status_ok; -- IOStatus io_status; --}; -+ InternalKeySliceTransform internal_prefix_transform; -+ std::unique_ptr index_builder; -+ std::string index_separator_scratch; -+ PartitionedIndexBuilder* p_index_builder_ = nullptr; - --struct BlockBasedTableBuilder::ParallelCompressionRep { -- // TODO: consider replacing with autovector or similar -- // Keys is a wrapper of vector of strings avoiding -- // releasing string memories during vector clear() -- // in order to save memory allocation overhead -- class Keys { -- public: -- Keys() : keys_(kKeysInitSize), size_(0) {} -- void PushBack(const Slice& key) { -- if (size_ == keys_.size()) { -- keys_.emplace_back(key.data(), key.size()); -- } else { -- keys_[size_].assign(key.data(), key.size()); -- } -- size_++; -- } -- void SwapAssign(std::vector& keys) { -- size_ = keys.size(); -- std::swap(keys_, keys); -- } -- void Clear() { size_ = 0; } -- size_t Size() { return size_; } -- std::string& Back() { return keys_[size_ - 1]; } -- std::string& operator[](size_t idx) { -- assert(idx < size_); -- return keys_[idx]; -- } -+ std::string last_ikey; // Internal key or empty (unset) -+ const Slice* first_key_in_next_block = nullptr; -+ CompressionType compression_type; -+ uint64_t sample_for_compression; -+ std::atomic compressible_input_data_bytes; -+ std::atomic uncompressible_input_data_bytes; -+ std::atomic sampled_input_data_bytes; -+ std::atomic sampled_output_slow_data_bytes; -+ std::atomic sampled_output_fast_data_bytes; -+ CompressionOptions compression_opts; -+ std::unique_ptr compression_dict; -+ std::vector> compression_ctxs; -+ std::vector> verify_ctxs; -+ std::unique_ptr verify_dict; - -- private: -- const size_t kKeysInitSize = 32; -- std::vector keys_; -- size_t size_; -+ size_t data_begin_offset = 0; -+ -+ TableProperties props; -+ -+ // States of the builder. -+ // -+ // - `kBuffered`: This is the initial state where zero or more data blocks are -+ // accumulated uncompressed in-memory. From this state, call -+ // `EnterUnbuffered()` to finalize the compression dictionary if enabled, -+ // compress/write out any buffered blocks, and proceed to the `kUnbuffered` -+ // state. -+ // -+ // - `kUnbuffered`: This is the state when compression dictionary is finalized -+ // either because it wasn't enabled in the first place or it's been created -+ // from sampling previously buffered data. In this state, blocks are simply -+ // compressed/written out as they fill up. From this state, call `Finish()` -+ // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete -+ // the partially created file. -+ // -+ // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been -+ // called, so the table builder is no longer usable. We must be in this -+ // state by the time the destructor runs. -+ enum class State { -+ kBuffered, -+ kUnbuffered, -+ kClosed, - }; -- std::unique_ptr curr_block_keys; -- -- class BlockRepSlot; -+ State state; -+ // `kBuffered` state is allowed only as long as the buffering of uncompressed -+ // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. -+ uint64_t buffer_limit; -+ std::shared_ptr -+ compression_dict_buffer_cache_res_mgr; -+ const bool use_delta_encoding_for_index_values; -+ std::unique_ptr filter_builder; -+ OffsetableCacheKey base_cache_key; -+ const TableFileCreationReason reason; - -- // BlockRep instances are fetched from and recycled to -- // block_rep_pool during parallel compression. -- struct BlockRep { -- Slice contents; -- Slice compressed_contents; -- std::unique_ptr data; -- std::unique_ptr compressed_data; -- CompressionType compression_type; -- std::unique_ptr first_key_in_next_block; -- std::unique_ptr keys; -- std::unique_ptr slot; -- Status status; -- }; -- // Use a vector of BlockRep as a buffer for a determined number -- // of BlockRep structures. All data referenced by pointers in -- // BlockRep will be freed when this vector is destructed. -- using BlockRepBuffer = std::vector; -- BlockRepBuffer block_rep_buf; -- // Use a thread-safe queue for concurrent access from block -- // building thread and writer thread. -- using BlockRepPool = WorkQueue; -- BlockRepPool block_rep_pool; -+ BlockHandle pending_handle; // Handle to add to index block - -- // Use BlockRepSlot to keep block order in write thread. -- // slot_ will pass references to BlockRep -- class BlockRepSlot { -- public: -- BlockRepSlot() : slot_(1) {} -- template -- void Fill(T&& rep) { -- slot_.push(std::forward(rep)); -- } -- void Take(BlockRep*& rep) { slot_.pop(rep); } -+ std::string compressed_output; -+ std::unique_ptr flush_block_policy; - -- private: -- // slot_ will pass references to BlockRep in block_rep_buf, -- // and those references are always valid before the destruction of -- // block_rep_buf. -- WorkQueue slot_; -- }; -+ std::vector> table_properties_collectors; - -- // Compression queue will pass references to BlockRep in block_rep_buf, -- // and those references are always valid before the destruction of -- // block_rep_buf. -- using CompressQueue = WorkQueue; -- CompressQueue compress_queue; -- std::vector compress_thread_pool; -+ std::unique_ptr pc_rep; -+ BlockCreateContext create_context; - -- // Write queue will pass references to BlockRep::slot in block_rep_buf, -- // and those references are always valid before the corresponding -- // BlockRep::slot is destructed, which is before the destruction of -- // block_rep_buf. -- using WriteQueue = WorkQueue; -- WriteQueue write_queue; -- std::unique_ptr write_thread; -+ // The size of the "tail" part of a SST file. "Tail" refers to -+ // all blocks after data blocks till the end of the SST file. -+ uint64_t tail_size; - -- // Estimate output file size when parallel compression is enabled. This is -- // necessary because compression & flush are no longer synchronized, -- // and BlockBasedTableBuilder::FileSize() is no longer accurate. -- // memory_order_relaxed suffices because accurate statistics is not required. -- class FileSizeEstimator { -- public: -- explicit FileSizeEstimator() -- : uncomp_bytes_compressed(0), -- uncomp_bytes_curr_block(0), -- uncomp_bytes_curr_block_set(false), -- uncomp_bytes_inflight(0), -- blocks_inflight(0), -- curr_compression_ratio(0), -- estimated_file_size(0) {} -+ // See class Footer -+ uint32_t base_context_checksum; - -- // Estimate file size when a block is about to be emitted to -- // compression thread -- void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { -- uint64_t new_uncomp_bytes_inflight = -- uncomp_bytes_inflight.fetch_add(uncomp_block_size, -- std::memory_order_relaxed) + -- uncomp_block_size; -+ uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } -+ void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } - -- uint64_t new_blocks_inflight = -- blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; -+ bool IsParallelCompressionEnabled() const { -+ return compression_opts.parallel_threads > 1; -+ } - -- estimated_file_size.store( -- curr_file_size + -- static_cast( -- static_cast(new_uncomp_bytes_inflight) * -- curr_compression_ratio.load(std::memory_order_relaxed)) + -- new_blocks_inflight * kBlockTrailerSize, -- std::memory_order_relaxed); -+ Status GetStatus() { -+ // We need to make modifications of status visible when status_ok is set -+ // to false, and this is ensured by status_mutex, so no special memory -+ // order for status_ok is required. -+ if (status_ok.load(std::memory_order_relaxed)) { -+ return Status::OK(); -+ } else { -+ return CopyStatus(); - } -+ } - -- // Estimate file size when a block is already reaped from -- // compression thread -- void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { -- assert(uncomp_bytes_curr_block_set); -- -- uint64_t new_uncomp_bytes_compressed = -- uncomp_bytes_compressed + uncomp_bytes_curr_block; -- assert(new_uncomp_bytes_compressed > 0); -+ Status CopyStatus() { -+ std::lock_guard lock(status_mutex); -+ return status; -+ } - -- curr_compression_ratio.store( -- (curr_compression_ratio.load(std::memory_order_relaxed) * -- uncomp_bytes_compressed + -- compressed_block_size) / -- static_cast(new_uncomp_bytes_compressed), -- std::memory_order_relaxed); -- uncomp_bytes_compressed = new_uncomp_bytes_compressed; -+ IOStatus GetIOStatus() { -+ // We need to make modifications of io_status visible when status_ok is set -+ // to false, and this is ensured by io_status_mutex, so no special memory -+ // order for io_status_ok is required. -+ if (io_status_ok.load(std::memory_order_relaxed)) { -+#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition -+ auto ios = CopyIOStatus(); -+ ios.PermitUncheckedError(); -+ // Assume no races in unit tests -+ assert(ios.ok()); -+#endif // ROCKSDB_ASSERT_STATUS_CHECKED -+ return IOStatus::OK(); -+ } else { -+ return CopyIOStatus(); -+ } -+ } - -- uint64_t new_uncomp_bytes_inflight = -- uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, -- std::memory_order_relaxed) - -- uncomp_bytes_curr_block; -+ IOStatus CopyIOStatus() { -+ std::lock_guard lock(io_status_mutex); -+ return io_status; -+ } - -- uint64_t new_blocks_inflight = -- blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; -+ // Never erase an existing status that is not OK. -+ void SetStatus(Status s) { -+ if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { -+ // Locking is an overkill for non compression_opts.parallel_threads -+ // case but since it's unlikely that s is not OK, we take this cost -+ // to be simplicity. -+ std::lock_guard lock(status_mutex); -+ status = s; -+ status_ok.store(false, std::memory_order_relaxed); -+ } -+ } - -- estimated_file_size.store( -- curr_file_size + -- static_cast( -- static_cast(new_uncomp_bytes_inflight) * -- curr_compression_ratio.load(std::memory_order_relaxed)) + -- new_blocks_inflight * kBlockTrailerSize, -- std::memory_order_relaxed); -+ // Never erase an existing I/O status that is not OK. -+ // Calling this will also SetStatus(ios) -+ void SetIOStatus(IOStatus ios) { -+ if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { -+ // Locking is an overkill for non compression_opts.parallel_threads -+ // case but since it's unlikely that s is not OK, we take this cost -+ // to be simplicity. -+ std::lock_guard lock(io_status_mutex); -+ io_status = ios; -+ io_status_ok.store(false, std::memory_order_relaxed); -+ } -+ SetStatus(ios); -+ } - -- uncomp_bytes_curr_block_set = false; -+ Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo, -+ WritableFileWriter* f) -+ : ioptions(tbo.ioptions), -+ prefix_extractor(tbo.moptions.prefix_extractor), -+ write_options(tbo.write_options), -+ table_options(table_opt), -+ internal_comparator(tbo.internal_comparator), -+ ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()), -+ persist_user_defined_timestamps( -+ tbo.ioptions.persist_user_defined_timestamps), -+ file(f), -+ offset(0), -+ alignment(table_options.block_align -+ ? std::min(static_cast(table_options.block_size), -+ kDefaultPageSize) -+ : 0), -+ data_block(table_options.block_restart_interval, -+ table_options.use_delta_encoding, -+ false /* use_value_delta_encoding */, -+ tbo.internal_comparator.user_comparator() -+ ->CanKeysWithDifferentByteContentsBeEqual() -+ ? BlockBasedTableOptions::kDataBlockBinarySearch -+ : table_options.data_block_index_type, -+ table_options.data_block_hash_table_util_ratio, ts_sz, -+ persist_user_defined_timestamps), -+ range_del_block( -+ 1 /* block_restart_interval */, true /* use_delta_encoding */, -+ false /* use_value_delta_encoding */, -+ BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */, -+ 0.75 /* data_block_hash_table_util_ratio */, ts_sz, -+ persist_user_defined_timestamps), -+ internal_prefix_transform(prefix_extractor.get()), -+ compression_type(tbo.compression_type), -+ sample_for_compression(tbo.moptions.sample_for_compression), -+ compressible_input_data_bytes(0), -+ uncompressible_input_data_bytes(0), -+ sampled_input_data_bytes(0), -+ sampled_output_slow_data_bytes(0), -+ sampled_output_fast_data_bytes(0), -+ compression_opts(tbo.compression_opts), -+ compression_dict(), -+ compression_ctxs(tbo.compression_opts.parallel_threads), -+ verify_ctxs(tbo.compression_opts.parallel_threads), -+ verify_dict(), -+ state((tbo.compression_opts.max_dict_bytes > 0 && -+ tbo.compression_type != kNoCompression) -+ ? State::kBuffered -+ : State::kUnbuffered), -+ use_delta_encoding_for_index_values(table_opt.format_version >= 4 && -+ !table_opt.block_align), -+ reason(tbo.reason), -+ flush_block_policy( -+ table_options.flush_block_policy_factory->NewFlushBlockPolicy( -+ table_options, data_block)), -+ create_context(&table_options, &ioptions, ioptions.stats, -+ compression_type == kZSTD || -+ compression_type == kZSTDNotFinalCompression, -+ tbo.moptions.block_protection_bytes_per_key, -+ tbo.internal_comparator.user_comparator(), -+ !use_delta_encoding_for_index_values, -+ table_opt.index_type == -+ BlockBasedTableOptions::kBinarySearchWithFirstKey), -+ tail_size(0), -+ status_ok(true), -+ io_status_ok(true) { -+ if (tbo.target_file_size == 0) { -+ buffer_limit = compression_opts.max_dict_buffer_bytes; -+ } else if (compression_opts.max_dict_buffer_bytes == 0) { -+ buffer_limit = tbo.target_file_size; -+ } else { -+ buffer_limit = std::min(tbo.target_file_size, -+ compression_opts.max_dict_buffer_bytes); - } - -- void SetEstimatedFileSize(uint64_t size) { -- estimated_file_size.store(size, std::memory_order_relaxed); -+ const auto compress_dict_build_buffer_charged = -+ table_options.cache_usage_options.options_overrides -+ .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer) -+ .charged; -+ if (table_options.block_cache && -+ (compress_dict_build_buffer_charged == -+ CacheEntryRoleOptions::Decision::kEnabled || -+ compress_dict_build_buffer_charged == -+ CacheEntryRoleOptions::Decision::kFallback)) { -+ compression_dict_buffer_cache_res_mgr = -+ std::make_shared>( -+ table_options.block_cache); -+ } else { -+ compression_dict_buffer_cache_res_mgr = nullptr; - } - -- uint64_t GetEstimatedFileSize() { -- return estimated_file_size.load(std::memory_order_relaxed); -+ assert(compression_ctxs.size() >= compression_opts.parallel_threads); -+ for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { -+ compression_ctxs[i].reset( -+ new CompressionContext(compression_type, compression_opts)); - } -- -- void SetCurrBlockUncompSize(uint64_t size) { -- uncomp_bytes_curr_block = size; -- uncomp_bytes_curr_block_set = true; -+ if (table_options.index_type == -+ BlockBasedTableOptions::kTwoLevelIndexSearch) { -+ p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( -+ &internal_comparator, use_delta_encoding_for_index_values, -+ table_options, ts_sz, persist_user_defined_timestamps); -+ index_builder.reset(p_index_builder_); -+ } else { -+ index_builder.reset(IndexBuilder::CreateIndexBuilder( -+ table_options.index_type, &internal_comparator, -+ &this->internal_prefix_transform, use_delta_encoding_for_index_values, -+ table_options, ts_sz, persist_user_defined_timestamps)); - } -+ if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) { -+ // Apply optimize_filters_for_hits setting here when applicable by -+ // skipping filter generation -+ filter_builder.reset(); -+ } else if (tbo.skip_filters) { -+ // For SstFileWriter skip_filters -+ filter_builder.reset(); -+ } else if (!table_options.filter_policy) { -+ // Null filter_policy -> no filter -+ filter_builder.reset(); -+ } else { -+ FilterBuildingContext filter_context(table_options); - -- private: -- // Input bytes compressed so far. -- uint64_t uncomp_bytes_compressed; -- // Size of current block being appended. -- uint64_t uncomp_bytes_curr_block; -- // Whether uncomp_bytes_curr_block has been set for next -- // ReapBlock call. -- bool uncomp_bytes_curr_block_set; -- // Input bytes under compression and not appended yet. -- std::atomic uncomp_bytes_inflight; -- // Number of blocks under compression and not appended yet. -- std::atomic blocks_inflight; -- // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. -- std::atomic curr_compression_ratio; -- // Estimated SST file size. -- std::atomic estimated_file_size; -- }; -- FileSizeEstimator file_size_estimator; -+ filter_context.info_log = ioptions.logger; -+ filter_context.column_family_name = tbo.column_family_name; -+ filter_context.reason = reason; - -- // Facilities used for waiting first block completion. Need to Wait for -- // the completion of first block compression and flush to get a non-zero -- // compression ratio. -- std::atomic first_block_processed; -- std::condition_variable first_block_cond; -- std::mutex first_block_mutex; -+ // Only populate other fields if known to be in LSM rather than -+ // generating external SST file -+ if (reason != TableFileCreationReason::kMisc) { -+ filter_context.compaction_style = ioptions.compaction_style; -+ filter_context.num_levels = ioptions.num_levels; -+ filter_context.level_at_creation = tbo.level_at_creation; -+ filter_context.is_bottommost = tbo.is_bottommost; -+ assert(filter_context.level_at_creation < filter_context.num_levels); -+ } - -- explicit ParallelCompressionRep(uint32_t parallel_threads) -- : curr_block_keys(new Keys()), -- block_rep_buf(parallel_threads), -- block_rep_pool(parallel_threads), -- compress_queue(parallel_threads), -- write_queue(parallel_threads), -- first_block_processed(false) { -- for (uint32_t i = 0; i < parallel_threads; i++) { -- block_rep_buf[i].contents = Slice(); -- block_rep_buf[i].compressed_contents = Slice(); -- block_rep_buf[i].data.reset(new std::string()); -- block_rep_buf[i].compressed_data.reset(new std::string()); -- block_rep_buf[i].compression_type = CompressionType(); -- block_rep_buf[i].first_key_in_next_block.reset(new std::string()); -- block_rep_buf[i].keys.reset(new Keys()); -- block_rep_buf[i].slot.reset(new BlockRepSlot()); -- block_rep_buf[i].status = Status::OK(); -- block_rep_pool.push(&block_rep_buf[i]); -+ filter_builder.reset(CreateFilterBlockBuilder( -+ ioptions, tbo.moptions, filter_context, -+ use_delta_encoding_for_index_values, p_index_builder_, ts_sz, -+ persist_user_defined_timestamps)); - } -- } -- -- ~ParallelCompressionRep() { block_rep_pool.finish(); } -- -- // Make a block prepared to be emitted to compression thread -- // Used in non-buffered mode -- BlockRep* PrepareBlock(CompressionType compression_type, -- const Slice* first_key_in_next_block, -- BlockBuilder* data_block) { -- BlockRep* block_rep = -- PrepareBlockInternal(compression_type, first_key_in_next_block); -- assert(block_rep != nullptr); -- data_block->SwapAndReset(*(block_rep->data)); -- block_rep->contents = *(block_rep->data); -- std::swap(block_rep->keys, curr_block_keys); -- curr_block_keys->Clear(); -- return block_rep; -- } - -- // Used in EnterUnbuffered -- BlockRep* PrepareBlock(CompressionType compression_type, -- const Slice* first_key_in_next_block, -- std::string* data_block, -- std::vector* keys) { -- BlockRep* block_rep = -- PrepareBlockInternal(compression_type, first_key_in_next_block); -- assert(block_rep != nullptr); -- std::swap(*(block_rep->data), *data_block); -- block_rep->contents = *(block_rep->data); -- block_rep->keys->SwapAssign(*keys); -- return block_rep; -- } -+ assert(tbo.internal_tbl_prop_coll_factories); -+ for (auto& factory : *tbo.internal_tbl_prop_coll_factories) { -+ assert(factory); - -- // Emit a block to compression thread -- void EmitBlock(BlockRep* block_rep) { -- assert(block_rep != nullptr); -- assert(block_rep->status.ok()); -- if (!write_queue.push(block_rep->slot.get())) { -- return; -+ std::unique_ptr collector{ -+ factory->CreateInternalTblPropColl( -+ tbo.column_family_id, tbo.level_at_creation, -+ tbo.ioptions.num_levels, -+ tbo.last_level_inclusive_max_seqno_threshold)}; -+ if (collector) { -+ table_properties_collectors.emplace_back(std::move(collector)); -+ } - } -- if (!compress_queue.push(block_rep)) { -- return; -+ table_properties_collectors.emplace_back( -+ new BlockBasedTablePropertiesCollector( -+ table_options.index_type, table_options.whole_key_filtering, -+ prefix_extractor != nullptr, -+ table_options.decouple_partitioned_filters)); -+ if (ts_sz > 0 && persist_user_defined_timestamps) { -+ table_properties_collectors.emplace_back( -+ new TimestampTablePropertiesCollector( -+ tbo.internal_comparator.user_comparator())); - } -- -- if (!first_block_processed.load(std::memory_order_relaxed)) { -- std::unique_lock lock(first_block_mutex); -- first_block_cond.wait(lock, [this] { -- return first_block_processed.load(std::memory_order_relaxed); -- }); -+ if (table_options.verify_compression) { -+ for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { -+ verify_ctxs[i].reset(new UncompressionContext(compression_type)); -+ } - } -- } -- -- // Reap a block from compression thread -- void ReapBlock(BlockRep* block_rep) { -- assert(block_rep != nullptr); -- block_rep->compressed_data->clear(); -- block_rep_pool.push(block_rep); - -- if (!first_block_processed.load(std::memory_order_relaxed)) { -- std::lock_guard lock(first_block_mutex); -- first_block_processed.store(true, std::memory_order_relaxed); -- first_block_cond.notify_one(); -+ // These are only needed for populating table properties -+ props.column_family_id = tbo.column_family_id; -+ props.column_family_name = tbo.column_family_name; -+ props.oldest_key_time = tbo.oldest_key_time; -+ props.file_creation_time = tbo.file_creation_time; -+ props.orig_file_number = tbo.cur_file_num; -+ props.db_id = tbo.db_id; -+ props.db_session_id = tbo.db_session_id; -+ props.db_host_id = ioptions.db_host_id; -+ if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { -+ ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); - } -- } -- -- private: -- BlockRep* PrepareBlockInternal(CompressionType compression_type, -- const Slice* first_key_in_next_block) { -- BlockRep* block_rep = nullptr; -- block_rep_pool.pop(block_rep); -- assert(block_rep != nullptr); -- -- assert(block_rep->data); -- -- block_rep->compression_type = compression_type; - -- if (first_key_in_next_block == nullptr) { -- block_rep->first_key_in_next_block.reset(nullptr); -+ if (FormatVersionUsesContextChecksum(table_options.format_version)) { -+ // Must be non-zero and semi- or quasi-random -+ // TODO: ideally guaranteed different for related files (e.g. use file -+ // number and db_session, for benefit of SstFileWriter) -+ do { -+ base_context_checksum = Random::GetTLSInstance()->Next(); -+ } while (UNLIKELY(base_context_checksum == 0)); - } else { -- block_rep->first_key_in_next_block->assign( -- first_key_in_next_block->data(), first_key_in_next_block->size()); -+ base_context_checksum = 0; - } - -- return block_rep; -+ if (alignment > 0 && compression_type != kNoCompression) { -+ // With better sanitization in `CompactionPicker::CompactFiles()`, we -+ // would not need to handle this case here and could change it to an -+ // assertion instead. -+ SetStatus(Status::InvalidArgument( -+ "Enable block_align, but compression enabled")); -+ } - } -+ -+ Rep(const Rep&) = delete; -+ Rep& operator=(const Rep&) = delete; -+ -+ private: -+ // Synchronize status & io_status accesses across threads from main thread, -+ // compression thread and write thread in parallel compression. -+ std::mutex status_mutex; -+ std::atomic status_ok; -+ Status status; -+ std::mutex io_status_mutex; -+ std::atomic io_status_ok; -+ IOStatus io_status; - }; - - BlockBasedTableBuilder::BlockBasedTableBuilder( diff --git a/recipes/rocksdb/config.yml b/recipes/rocksdb/config.yml index 92ed7e6094b6a..66354e1a0305a 100644 --- a/recipes/rocksdb/config.yml +++ b/recipes/rocksdb/config.yml @@ -1,5 +1,5 @@ versions: - "9.6.1": + "9.7.2": folder: all "9.5.2": folder: all