From 37f226686086e8d24c4414d8ea97515045411651 Mon Sep 17 00:00:00 2001 From: Efrat1 <42644291+Efrat1@users.noreply.github.com> Date: Sun, 19 Mar 2023 12:23:50 +0200 Subject: [PATCH] Add rocksdb column family of keys history to get fast access to snapshots (#2938) Keys: category_name|user_key|big_endian_block_id Values: empty string Add unit tests --- bftengine/include/bftengine/ReplicaConfig.hpp | 13 + kvbc/CMakeLists.txt | 1 + kvbc/include/Replica.h | 4 + kvbc/include/categorization/kv_blockchain.h | 4 + kvbc/include/db_interfaces.h | 6 + .../categorization/kv_blockchain_adapter.hpp | 5 + kvbc/include/kvbc_adapter/idempotent_reader.h | 6 + kvbc/include/kvbc_adapter/replica_adapter.hpp | 6 + .../v4blockchain/blocks_reader_adapter.hpp | 7 + kvbc/include/v4blockchain/detail/blockchain.h | 13 +- .../v4blockchain/detail/column_families.h | 1 + .../v4blockchain/detail/keys_history.h | 117 +++++ kvbc/include/v4blockchain/v4_blockchain.h | 5 + kvbc/src/Replica.cpp | 6 + kvbc/src/categorization/kv_blockchain.cpp | 30 ++ kvbc/src/merkle_tree_storage_factory.cpp | 4 + kvbc/src/v4blockchain/detail/blockchain.cpp | 3 +- kvbc/src/v4blockchain/detail/keys_history.cpp | 278 ++++++++++ kvbc/src/v4blockchain/detail/latest_keys.cpp | 14 +- kvbc/src/v4blockchain/v4_blockchain.cpp | 39 ++ kvbc/test/CMakeLists.txt | 13 + .../categorization/kv_blockchain_test.cpp | 70 +++ .../test/kvbc_app_filter/kvbc_filter_test.cpp | 6 + kvbc/test/pruning_test.cpp | 6 + kvbc/test/replica_state_sync_test.cpp | 6 + kvbc/test/v4blockchain/v4_blockchain_test.cpp | 157 +++++- .../v4blockchain/v4_keys_history_test.cpp | 477 ++++++++++++++++++ .../test/thin_replica_server_test.cpp | 7 + 28 files changed, 1291 insertions(+), 13 deletions(-) create mode 100644 kvbc/include/v4blockchain/detail/keys_history.h create mode 100644 kvbc/src/v4blockchain/detail/keys_history.cpp create mode 100644 kvbc/test/v4blockchain/v4_keys_history_test.cpp diff --git a/bftengine/include/bftengine/ReplicaConfig.hpp b/bftengine/include/bftengine/ReplicaConfig.hpp index 3417cb0e5c..1f4c3f70d9 100644 --- a/bftengine/include/bftengine/ReplicaConfig.hpp +++ b/bftengine/include/bftengine/ReplicaConfig.hpp @@ -283,6 +283,15 @@ class ReplicaConfig : public concord::serialize::SerializableFactory getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId snapshot_version) const override final; ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // checkpoint void checkpointInProcess(bool flag, kvbc::BlockId); diff --git a/kvbc/include/categorization/kv_blockchain.h b/kvbc/include/categorization/kv_blockchain.h index c9e13bf67d..ad687c9053 100644 --- a/kvbc/include/categorization/kv_blockchain.h +++ b/kvbc/include/categorization/kv_blockchain.h @@ -153,6 +153,10 @@ class KeyValueBlockchain { // The key used in the default column family for persisting the current public state hash. static std::string publicStateHashKey(); + std::optional getFromSnapshot(const std::string& category_id, + const std::string& key, + const BlockId snapshot_version); + private: BlockId addBlock(CategoryInput&& category_updates, concord::storage::rocksdb::NativeWriteBatch& write_batch); diff --git a/kvbc/include/db_interfaces.h b/kvbc/include/db_interfaces.h index f195250d3f..42c2383d55 100644 --- a/kvbc/include/db_interfaces.h +++ b/kvbc/include/db_interfaces.h @@ -81,6 +81,12 @@ class IReader { // Get the last block ID in the system. virtual BlockId getLastBlockId() const = 0; + // Get the version of 'key' that is the closest from below or equal to 'snapshot_version' + // Return std::nullopt if the key doesn't exist or is deleted at given version + virtual std::optional getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId snapshot_version) const = 0; + virtual ~IReader() = default; }; diff --git a/kvbc/include/kvbc_adapter/categorization/kv_blockchain_adapter.hpp b/kvbc/include/kvbc_adapter/categorization/kv_blockchain_adapter.hpp index 61883415f0..f405537d9b 100644 --- a/kvbc/include/kvbc_adapter/categorization/kv_blockchain_adapter.hpp +++ b/kvbc/include/kvbc_adapter/categorization/kv_blockchain_adapter.hpp @@ -79,6 +79,11 @@ class KeyValueBlockchain : public IReader, public IBlockAdder { // Get the last block ID in the system. BlockId getLastBlockId() const override final { return kvbc_->getLastReachableBlockId(); } + + std::optional getFromSnapshot( + const std::string &category_id, const std::string &key, const BlockId snapshot_version) const override final { + return kvbc_->getFromSnapshot(category_id, key, snapshot_version); + } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kvbc/include/kvbc_adapter/idempotent_reader.h b/kvbc/include/kvbc_adapter/idempotent_reader.h index 2fcced0450..f2cebe6bb9 100644 --- a/kvbc/include/kvbc_adapter/idempotent_reader.h +++ b/kvbc/include/kvbc_adapter/idempotent_reader.h @@ -76,6 +76,12 @@ class IdempotentReader : public IReader { BlockId getLastBlockId() const override { return kvbc_->getLastBlockId(); } + std::optional getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId snapshot_version) const override { + return kvbc_->getFromSnapshot(category_id, key, snapshot_version); + } + private: const std::shared_ptr kvbc_; }; diff --git a/kvbc/include/kvbc_adapter/replica_adapter.hpp b/kvbc/include/kvbc_adapter/replica_adapter.hpp index ee95dfc6c6..917f30b0da 100644 --- a/kvbc/include/kvbc_adapter/replica_adapter.hpp +++ b/kvbc/include/kvbc_adapter/replica_adapter.hpp @@ -115,6 +115,12 @@ class ReplicaBlockchain : public IBlocksDeleter, // Get the last block ID in the system. BlockId getLastBlockId() const override final { return reader_->getLastBlockId(); } + + std::optional getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId snapshot_version) const override final { + return reader_->getFromSnapshot(category_id, key, snapshot_version); + } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kvbc/include/kvbc_adapter/v4blockchain/blocks_reader_adapter.hpp b/kvbc/include/kvbc_adapter/v4blockchain/blocks_reader_adapter.hpp index 6c2012220d..87a7ce7926 100644 --- a/kvbc/include/kvbc_adapter/v4blockchain/blocks_reader_adapter.hpp +++ b/kvbc/include/kvbc_adapter/v4blockchain/blocks_reader_adapter.hpp @@ -72,6 +72,13 @@ class BlocksReaderAdapter : public IReader { // Get the last block ID in the system. BlockId getLastBlockId() const override final { return kvbc_->getLastReachableBlockId(); } + + // Get the version of 'key' that is the closest from below or equal to 'snapshot_version' + // Return std::nullopt if the key doesn't exist or is deleted at given version + std::optional getFromSnapshot( + const std::string &category_id, const std::string &key, const BlockId snapshot_version) const override final { + return kvbc_->getFromSnapshot(category_id, key, snapshot_version); + } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// private: diff --git a/kvbc/include/v4blockchain/detail/blockchain.h b/kvbc/include/v4blockchain/detail/blockchain.h index 93aafc8de6..5e46b73e3d 100644 --- a/kvbc/include/v4blockchain/detail/blockchain.h +++ b/kvbc/include/v4blockchain/detail/blockchain.h @@ -30,6 +30,12 @@ #include "v4blockchain/detail/column_families.h" namespace concord::kvbc::v4blockchain::detail { + +template +auto getSliceArray(const Sliceable&... sls) { + return std::array<::rocksdb::Slice, sizeof...(sls)>{sls...}; +} + /* This class composes the blockchain out of detail::Block. It knows to : @@ -63,7 +69,10 @@ class Blockchain { // Loads from storage the last and first block ids respectivly. std::optional loadLastReachableBlockId(); std::optional loadGenesisBlockId(); - void setLastReachable(BlockId id) { last_reachable_block_id_ = id; } + void setLastReachable(BlockId id) { + last_reachable_block_id_ = id; + global_latest_block_id = id; + } void setBlockId(BlockId id); BlockId getLastReachable() const { return last_reachable_block_id_; } BlockId getGenesisBlockId() const { return genesis_block_id_; } @@ -71,7 +80,6 @@ class Blockchain { genesis_block_id_ = id; global_genesis_block_id = id; } - // Returns the buffer that represents the block std::optional getBlockData(concord::kvbc::BlockId id) const; std::optional getBlockUpdates(BlockId id) const; @@ -110,6 +118,7 @@ class Blockchain { uint64_t from_future{}; uint64_t from_storage{}; static std::atomic global_genesis_block_id; + static std::atomic global_latest_block_id; void deleteBlock(BlockId id, storage::rocksdb::NativeWriteBatch& wb) { ConcordAssertLE(id, last_reachable_block_id_); ConcordAssertGE(id, genesis_block_id_); diff --git a/kvbc/include/v4blockchain/detail/column_families.h b/kvbc/include/v4blockchain/detail/column_families.h index 281039fd68..0e4d1a7a5c 100644 --- a/kvbc/include/v4blockchain/detail/column_families.h +++ b/kvbc/include/v4blockchain/detail/column_families.h @@ -23,5 +23,6 @@ inline const auto LATEST_KEYS_CF = std::string{"v4_latest_keys"}; inline const auto IMMUTABLE_KEYS_CF = std::string{"v4_immutable_keys"}; inline const auto CATEGORIES_CF = std::string{"v4_categories"}; inline const auto MISC_CF = std::string{"v4_misc"}; +inline const auto KEYS_HISTORY_CF = std::string{"v4_keys_history"}; } // namespace concord::kvbc::v4blockchain::detail diff --git a/kvbc/include/v4blockchain/detail/keys_history.h b/kvbc/include/v4blockchain/detail/keys_history.h new file mode 100644 index 0000000000..a8d4c6b8f4 --- /dev/null +++ b/kvbc/include/v4blockchain/detail/keys_history.h @@ -0,0 +1,117 @@ +// Concord +// +// Copyright (c) 2022 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the +// "License"). You may not use this product except in compliance with the +// Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright +// notices and license terms. Your use of these subcomponents is subject to the +// terms and conditions of the subcomponent's license, as noted in the LICENSE +// file. + +#pragma once + +#include "rocksdb/native_client.h" +#include "categorization/updates.h" +#include "v4blockchain/detail/categories.h" +#include + +namespace concord::kvbc::v4blockchain::detail { + +class KeysHistory { + public: + KeysHistory(const std::shared_ptr&, + const std::optional>&); + + // Add the keys of the block to the keys history column family + void addBlockKeys(const concord::kvbc::categorization::Updates&, const BlockId, storage::rocksdb::NativeWriteBatch&); + + // Return the id of the block where the key was last updated to its value at given version + std::optional getVersionFromHistory(const std::string& category_id, + const std::string& key, + const BlockId version) const; + + // Delete the last added block keys + void revertLastBlockKeys(const concord::kvbc::categorization::Updates&, + const BlockId, + storage::rocksdb::NativeWriteBatch&); + + struct KHCompactionFilter : ::rocksdb::CompactionFilter { + static ::rocksdb::CompactionFilter* getFilter() { + static KHCompactionFilter instance; + return &instance; + } + KHCompactionFilter() {} + const char* Name() const override { return "KeysHistoryCompactionFilter"; } + bool Filter(int /*level*/, + const ::rocksdb::Slice& key, + const ::rocksdb::Slice& /*val*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override; + }; + + private: + void handleCategoryUpdates(const std::string& block_version, + const std::string& category_id, + const concord::kvbc::categorization::BlockMerkleInput&, + concord::storage::rocksdb::NativeWriteBatch&); + void handleCategoryUpdates(const std::string& block_version, + const std::string& category_id, + const concord::kvbc::categorization::VersionedInput&, + concord::storage::rocksdb::NativeWriteBatch&); + void handleCategoryUpdates(const std::string& block_version, + const std::string& category_id, + const concord::kvbc::categorization::ImmutableInput&, + concord::storage::rocksdb::NativeWriteBatch&); + + template + void handleUpdatesImp(const std::string& block_version, + const std::string& category_id, + const UPDATES& updates_kv, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + + template + void handleDeletesUpdatesImp(const std::string& block_version, + const std::string& category_id, + const DELETES& deletes, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + + void revertCategoryKeys(const std::string& category_id, + const categorization::BlockMerkleInput& updates, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + void revertCategoryKeys(const std::string& category_id, + const categorization::VersionedInput& updates, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + void revertCategoryKeys(const std::string& category_id, + const categorization::ImmutableInput& updates, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + + template + void revertKeysImp(const std::string& category_id, + const UPDATES& updates_kv, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + + template + void revertDeletedKeysImp(const std::string& category_id, + const DELETES& deletes, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + + void revertOneKeyAtVersion(const std::string& category_id, + const std::string& block_id, + const std::string& prefix, + const std::string& key, + concord::storage::rocksdb::NativeWriteBatch& write_batch); + + private: + std::shared_ptr native_client_; + v4blockchain::detail::Categories category_mapping_; +}; + +} // namespace concord::kvbc::v4blockchain::detail diff --git a/kvbc/include/v4blockchain/v4_blockchain.h b/kvbc/include/v4blockchain/v4_blockchain.h index e9073d6c3c..acc0008330 100644 --- a/kvbc/include/v4blockchain/v4_blockchain.h +++ b/kvbc/include/v4blockchain/v4_blockchain.h @@ -18,6 +18,7 @@ #include "v4blockchain/detail/st_chain.h" #include "v4blockchain/detail/latest_keys.h" #include "v4blockchain/detail/blockchain.h" +#include "v4blockchain/detail/keys_history.h" #include #include @@ -92,6 +93,9 @@ class KeyValueBlockchain { BlockId block_id) const; std::optional getLatest(const std::string &category_id, const std::string &key) const; + std::optional getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId version) const; void multiGet(const std::string &category_id, const std::vector &keys, @@ -269,6 +273,7 @@ class KeyValueBlockchain { v4blockchain::detail::Blockchain block_chain_; v4blockchain::detail::StChain state_transfer_chain_; v4blockchain::detail::LatestKeys latest_keys_; + v4blockchain::detail::KeysHistory keys_history_; // flag to mark whether a checkpoint is being taken. std::optional last_block_sn_; const float updates_to_final_size_ration_{2.5}; diff --git a/kvbc/src/Replica.cpp b/kvbc/src/Replica.cpp index 2dff5b6df8..2aaa9d7d24 100644 --- a/kvbc/src/Replica.cpp +++ b/kvbc/src/Replica.cpp @@ -457,6 +457,12 @@ BlockId Replica::getLastBlockId() const { return m_kvBlockchain->getLastBlockId(); } +std::optional Replica::getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId snapshot_version) const { + return m_kvBlockchain->getFromSnapshot(category_id, key, snapshot_version); +} + void Replica::checkpointInProcess(bool flag, kvbc::BlockId id) { m_kvBlockchain->checkpointInProcess(flag, id); } void Replica::set_command_handler(std::shared_ptr handler) { m_cmdHandler = handler; } diff --git a/kvbc/src/categorization/kv_blockchain.cpp b/kvbc/src/categorization/kv_blockchain.cpp index 59203e2b58..d6746a9f2a 100644 --- a/kvbc/src/categorization/kv_blockchain.cpp +++ b/kvbc/src/categorization/kv_blockchain.cpp @@ -413,6 +413,36 @@ void KeyValueBlockchain::trimBlocksFromSnapshot(BlockId block_id_at_checkpoint) } } +std::optional KeyValueBlockchain::getFromSnapshot(const std::string& category_id, + const std::string& key, + const BlockId snapshot_version) { + auto opt_latest = getLatest(category_id, key); + if (opt_latest) { + BlockId latest_version; + std::visit([&latest_version](const auto& opt_latest) { latest_version = opt_latest.block_id; }, *opt_latest); + if (snapshot_version >= latest_version) { + return opt_latest; + } + } + + auto latest_ver = getLatestVersion(category_id, key); + if (opt_latest || latest_ver) { + BlockId limit = 0; + BlockId max_blocks_to_look = bftEngine::ReplicaConfig::instance().keysHistoryMaxBlocksNum; + if (snapshot_version > max_blocks_to_look) { + limit = snapshot_version - max_blocks_to_look; + } + for (auto i = snapshot_version; i > limit; i--) { + auto opt_value = get(category_id, key, i); + // key was updated or deleted at version i + if (opt_value || (latest_ver && latest_ver.value().version == i)) { + return opt_value; + } + } + } + return std::nullopt; +} + /////////////////////// Delete block /////////////////////// bool KeyValueBlockchain::deleteBlock(const BlockId& block_id) { diagnostics::TimeRecorder scoped_timer(*histograms_.deleteBlock); diff --git a/kvbc/src/merkle_tree_storage_factory.cpp b/kvbc/src/merkle_tree_storage_factory.cpp index 3c03b6649f..a7f8d9bbff 100644 --- a/kvbc/src/merkle_tree_storage_factory.cpp +++ b/kvbc/src/merkle_tree_storage_factory.cpp @@ -19,6 +19,7 @@ #include "rocksdb/native_client.h" #include "v4blockchain/detail/column_families.h" #include "v4blockchain/detail/latest_keys.h" +#include "v4blockchain/detail/keys_history.h" #include #include @@ -50,6 +51,9 @@ std::shared_ptr completeRocksDBConfiguration( (d.name == concord::kvbc::v4blockchain::detail::IMMUTABLE_KEYS_CF)) { d.options.compaction_filter = concord::kvbc::v4blockchain::detail::LatestKeys::LKCompactionFilter::getFilter(); LOG_DEBUG(V4_BLOCK_LOG, "Setting compaction filter for " << d.name); + } else if (d.name == concord::kvbc::v4blockchain::detail::KEYS_HISTORY_CF) { + d.options.compaction_filter = concord::kvbc::v4blockchain::detail::KeysHistory::KHCompactionFilter::getFilter(); + LOG_DEBUG(V4_BLOCK_LOG, "Setting compaction filter for " << d.name); } } return db_options.statistics; diff --git a/kvbc/src/v4blockchain/detail/blockchain.cpp b/kvbc/src/v4blockchain/detail/blockchain.cpp index 67d5d0d2c9..fb2c55553e 100644 --- a/kvbc/src/v4blockchain/detail/blockchain.cpp +++ b/kvbc/src/v4blockchain/detail/blockchain.cpp @@ -18,6 +18,7 @@ namespace concord::kvbc::v4blockchain::detail { std::atomic Blockchain::global_genesis_block_id = INVALID_BLOCK_ID; +std::atomic Blockchain::global_latest_block_id = INVALID_BLOCK_ID; Blockchain::Blockchain(const std::shared_ptr& native_client) : native_client_{native_client} { @@ -29,7 +30,7 @@ Blockchain::Blockchain(const std::shared_ptr& native_client, + const std::optional>& categories) + : native_client_{native_client}, category_mapping_(native_client, categories) { + if (!bftEngine::ReplicaConfig::instance().enableKeysHistoryCF) { + return; + } + if (native_client_->createColumnFamilyIfNotExisting(v4blockchain::detail::KEYS_HISTORY_CF, + KHCompactionFilter::getFilter())) { + LOG_INFO(V4_BLOCK_LOG, "Created [" << v4blockchain::detail::KEYS_HISTORY_CF << "] column family for keys history"); + } +} + +void KeysHistory::addBlockKeys(const concord::kvbc::categorization::Updates& updates, + const BlockId block_id, + storage::rocksdb::NativeWriteBatch& write_batch) { + if (!bftEngine::ReplicaConfig::instance().enableKeysHistoryCF) { + return; + } + LOG_DEBUG(V4_BLOCK_LOG, "Adding keys of block [" << block_id << "] to the keys history CF"); + const std::string block_key = v4blockchain::detail::Blockchain::generateKey(block_id); + ConcordAssertEQ(block_key.size(), sizeof(BlockId)); + + for (const auto& [category_id, category_updates] : updates.categoryUpdates().kv) { + std::visit([category_id = category_id, &write_batch, &block_key, this]( + const auto& updates) { handleCategoryUpdates(block_key, category_id, updates, write_batch); }, + category_updates); + } +} + +void KeysHistory::handleCategoryUpdates(const std::string& block_version, + const std::string& category_id, + const concord::kvbc::categorization::BlockMerkleInput& updates, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + handleUpdatesImp(block_version, category_id, updates.kv, write_batch); + handleDeletesUpdatesImp(block_version, category_id, updates.deletes, write_batch); +} + +void KeysHistory::handleCategoryUpdates(const std::string& block_version, + const std::string& category_id, + const concord::kvbc::categorization::VersionedInput& updates, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + handleUpdatesImp(block_version, category_id, updates.kv, write_batch); + handleDeletesUpdatesImp(block_version, category_id, updates.deletes, write_batch); +} + +void KeysHistory::handleCategoryUpdates(const std::string& block_version, + const std::string& category_id, + const concord::kvbc::categorization::ImmutableInput& updates, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + handleUpdatesImp(block_version, category_id, updates.kv, write_batch); +} + +template +void KeysHistory::handleUpdatesImp(const std::string& block_version, + const std::string& category_id, + const UPDATES& updates_kv, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + const auto& prefix = category_mapping_.categoryPrefix(category_id); + const std::string empty_val = ""; + for (const auto& [k, _] : updates_kv) { + (void)_; // unused + LOG_DEBUG(V4_BLOCK_LOG, + "Update history of key " << std::hash{}(k) << " at version " + << concordUtils::fromBigEndianBuffer(block_version.data()) + << ". category_id " << category_id << " prefix " << prefix << " key is hex " + << concordUtils::bufferToHex(k.data(), k.size()) << " key size " << k.size() + << " raw key " << k); + write_batch.put( + v4blockchain::detail::KEYS_HISTORY_CF, getSliceArray(prefix, k, block_version), getSliceArray(empty_val)); + } +} + +template +void KeysHistory::handleDeletesUpdatesImp(const std::string& block_version, + const std::string& category_id, + const DELETES& deletes, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + const auto& prefix = category_mapping_.categoryPrefix(category_id); + const std::string empty_val = ""; + for (const auto& k : deletes) { + LOG_DEBUG(V4_BLOCK_LOG, + "Update history of key " << std::hash{}(k) << " at version " + << concordUtils::fromBigEndianBuffer(block_version.data()) + << ". category_id " << category_id << " prefix " << prefix << " key is hex " + << concordUtils::bufferToHex(k.data(), k.size()) << " key size " << k.size() + << " raw key " << k); + write_batch.put( + v4blockchain::detail::KEYS_HISTORY_CF, getSliceArray(prefix, k, block_version), getSliceArray(empty_val)); + } +} + +void KeysHistory::revertLastBlockKeys(const concord::kvbc::categorization::Updates& updates, + const BlockId block_id, + storage::rocksdb::NativeWriteBatch& write_batch) { + if (!bftEngine::ReplicaConfig::instance().enableKeysHistoryCF) { + return; + } + ConcordAssertGT(block_id, 0); + LOG_DEBUG(V4_BLOCK_LOG, "Reverting keys of block [" << block_id << "] "); + const std::string block_key = v4blockchain::detail::Blockchain::generateKey(block_id); + ConcordAssertEQ(block_key.size(), sizeof(BlockId)); + for (const auto& [category_id, category_updates] : updates.categoryUpdates().kv) { + std::visit([category_id = category_id, &write_batch, &block_key, this]( + const auto& updates) { revertCategoryKeys(category_id, updates, block_key, write_batch); }, + category_updates); + } +} + +void KeysHistory::revertCategoryKeys(const std::string& category_id, + const categorization::BlockMerkleInput& updates, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + revertKeysImp(category_id, updates.kv, block_version, write_batch); + revertDeletedKeysImp(category_id, updates.deletes, block_version, write_batch); +} + +void KeysHistory::revertCategoryKeys(const std::string& category_id, + const categorization::VersionedInput& updates, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + revertKeysImp(category_id, updates.kv, block_version, write_batch); + revertDeletedKeysImp(category_id, updates.deletes, block_version, write_batch); +} + +void KeysHistory::revertCategoryKeys(const std::string& category_id, + const categorization::ImmutableInput& updates, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + revertKeysImp(category_id, updates.kv, block_version, write_batch); +} + +template +void KeysHistory::revertKeysImp(const std::string& category_id, + const UPDATES& updates_kv, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + const auto& prefix = category_mapping_.categoryPrefix(category_id); + for (const auto& [k, _] : updates_kv) { + (void)_; + revertOneKeyAtVersion(category_id, block_version, prefix, k, write_batch); + } +} + +template +void KeysHistory::revertDeletedKeysImp(const std::string& category_id, + const DELETES& deletes, + const std::string& block_version, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + const std::string& prefix = category_mapping_.categoryPrefix(category_id); + for (const auto& k : deletes) { + revertOneKeyAtVersion(category_id, block_version, prefix, k, write_batch); + } +} + +void KeysHistory::revertOneKeyAtVersion(const std::string& category_id, + const std::string& block_version, + const std::string& prefix, + const std::string& key, + concord::storage::rocksdb::NativeWriteBatch& write_batch) { + std::string get_key = prefix; + get_key.append(key); + get_key.append(block_version); + auto opt_val = native_client_->get(v4blockchain::detail::KEYS_HISTORY_CF, get_key); + if (opt_val) { + write_batch.del(v4blockchain::detail::KEYS_HISTORY_CF, get_key); + LOG_DEBUG(V4_BLOCK_LOG, + "Revert key " << key << " in version " << concordUtils::fromBigEndianBuffer(block_version.data()) + << " category " << category_id << " prefix " << prefix); + } else { + LOG_WARN(V4_BLOCK_LOG, + "Couldn't find key " << key << " in version " + << concordUtils::fromBigEndianBuffer(block_version.data()) << " category " + << category_id << " prefix " << prefix); + } +} + +std::optional KeysHistory::getVersionFromHistory(const std::string& category_id, + const std::string& key, + const BlockId version) const { + if (!bftEngine::ReplicaConfig::instance().enableKeysHistoryCF) { + return std::nullopt; + } + const auto& prefix = category_mapping_.categoryPrefix(category_id); + const auto& version_key = v4blockchain::detail::Blockchain::generateKey(version); + std::string get_key = prefix; + get_key.append(key); + get_key.append(version_key); + + auto itr = native_client_->getIterator(v4blockchain::detail::KEYS_HISTORY_CF); + try { + itr.seekAtMost(get_key); + } catch (concord::storage::rocksdb::RocksDBException& ex) { + LOG_DEBUG(V4_BLOCK_LOG, "iterator seekAtMost failed. " << ex.what()); + return std::nullopt; + } + + if (!itr) { + LOG_DEBUG(V4_BLOCK_LOG, + "Reading key " << std::hash{}(key) << " not found at version " << version << " category_id " + << category_id << " prefix " << prefix << " key is hex " + << concordUtils::bufferToHex(key.data(), key.size()) << " raw key " << key); + return std::nullopt; + } + auto found_key = itr.key(); + const size_t total_key_size = found_key.size(); + auto user_key_with_prefix = found_key.substr(0, total_key_size - sizeof(BlockId)); + + if (user_key_with_prefix != prefix + key) { + LOG_DEBUG(V4_BLOCK_LOG, + "Reading key " << std::hash{}(key) << " not found at version " << version << " category_id " + << category_id << " prefix " << prefix << " key is hex " + << concordUtils::bufferToHex(key.data(), key.size()) << " raw key " << key); + return std::nullopt; + } + + auto actual_version = + concordUtils::fromBigEndianBuffer(found_key.c_str() + (total_key_size - sizeof(BlockId))); + LOG_DEBUG(V4_BLOCK_LOG, + "Reading key " << std::hash{}(key) << " snapshot " << version << " actual version " + << actual_version << " category_id " << category_id << " prefix " << prefix << " key is hex " + << concordUtils::bufferToHex(found_key.data(), found_key.size()) << " raw key " << key); + return actual_version; +} + +bool KeysHistory::KHCompactionFilter::Filter(int /*level*/, + const ::rocksdb::Slice& key, + const ::rocksdb::Slice& /*val*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const { + if (key.size() <= sizeof(BlockId)) return false; + auto ts_slice = ::rocksdb::Slice(key.data() + key.size() - sizeof(BlockId), sizeof(BlockId)); + auto key_version = concordUtils::fromBigEndianBuffer(ts_slice.data()); + auto max_blocks_to_save = bftEngine::ReplicaConfig::instance().keysHistoryMaxBlocksNum; + + if (max_blocks_to_save != 0) { + if (Blockchain::global_latest_block_id < max_blocks_to_save || + key_version > Blockchain::global_latest_block_id - max_blocks_to_save) + return false; + LOG_DEBUG(V4_BLOCK_LOG, + "Filtering key with version " << key_version << ". latest block is " << Blockchain::global_latest_block_id + << " max blocks to save is " << max_blocks_to_save); + } else { + // when keysHistoryMaxBlocksNum is set to 0, prune the history that's before the genesis block + if (key_version >= concord::kvbc::v4blockchain::detail::Blockchain::global_genesis_block_id) return false; + LOG_DEBUG(V4_BLOCK_LOG, + "Filtering key with version " + << key_version << " genesis is " + << concord::kvbc::v4blockchain::detail::Blockchain::global_genesis_block_id); + } + return true; +} + +} // namespace concord::kvbc::v4blockchain::detail diff --git a/kvbc/src/v4blockchain/detail/latest_keys.cpp b/kvbc/src/v4blockchain/detail/latest_keys.cpp index e6597033a6..dd89976695 100644 --- a/kvbc/src/v4blockchain/detail/latest_keys.cpp +++ b/kvbc/src/v4blockchain/detail/latest_keys.cpp @@ -21,11 +21,6 @@ using namespace concord::kvbc; namespace concord::kvbc::v4blockchain::detail { -template -auto getSliceArray(const Sliceable&... sls) { - return std::array<::rocksdb::Slice, sizeof...(sls)>{sls...}; -} - LatestKeys::LatestKeys(const std::shared_ptr& native_client, const std::optional>& categories) : native_client_{native_client}, category_mapping_(native_client, categories) { @@ -208,7 +203,7 @@ void LatestKeys::revertCategoryKeysImp(const std::string& cFamily, std::vector<::rocksdb::Status> statuses; for (const auto& [k, _] : updates) { - (void)_; // unsued + (void)_; // unused keys.push_back(prefix + k); } native_client_->multiGet(cFamily, keys, values, statuses, ro); @@ -226,7 +221,7 @@ void LatestKeys::revertCategoryKeysImp(const std::string& cFamily, data = val.GetSelf()->data(); size = val.GetSelf()->size(); } - // add the previous value , already contains currect version as postfix + // add the previous value, already contains correct version as postfix auto actual_version = concordUtils::fromBigEndianBuffer(data + (size - sizeof(BlockId))); LOG_DEBUG(V4_BLOCK_LOG, "Found previous version for key " << key << " rocks sn " << sn << " version " << actual_version); @@ -262,7 +257,7 @@ void LatestKeys::revertDeletedKeysImp(const std::string& category_id, auto& val = values[i]; const auto& key = keys[i]; if (status.ok()) { - // add the previous value , already contains currect version as postfix + // add the previous value, already contains correct version as postfix const char* data = nullptr; size_t size{}; if (val.IsPinned()) { @@ -317,8 +312,9 @@ std::optional LatestKeys::getValue(const std::string& cat << category_id << " prefix " << prefix << " key is hex " << concordUtils::bufferToHex(key.data(), key.size()) << " raw key " << key); switch (category_type) { - case concord::kvbc::categorization::CATEGORY_TYPE::block_merkle: + case concord::kvbc::categorization::CATEGORY_TYPE::block_merkle: { return categorization::MerkleValue{{actual_version, opt_val->substr(0, total_val_size - VALUE_POSTFIX_SIZE)}}; + } case concord::kvbc::categorization::CATEGORY_TYPE::immutable: { return categorization::ImmutableValue{{actual_version, opt_val->substr(0, total_val_size - VALUE_POSTFIX_SIZE)}}; } diff --git a/kvbc/src/v4blockchain/v4_blockchain.cpp b/kvbc/src/v4blockchain/v4_blockchain.cpp index b8ec36f9c8..1515c1516d 100644 --- a/kvbc/src/v4blockchain/v4_blockchain.cpp +++ b/kvbc/src/v4blockchain/v4_blockchain.cpp @@ -22,6 +22,7 @@ #include "util/throughput.hpp" #include "blockchain_misc.hpp" #include "util/filesystem.hpp" +#include "bftengine/ReplicaConfig.hpp" namespace concord::kvbc::v4blockchain { @@ -35,6 +36,7 @@ KeyValueBlockchain::KeyValueBlockchain( block_chain_{native_client_}, state_transfer_chain_{native_client_}, latest_keys_{native_client_, category_types}, + keys_history_{native_client_, category_types}, v4_metrics_comp_{concordMetrics::Component("v4_blockchain", std::make_shared())}, blocks_deleted_{v4_metrics_comp_.RegisterGauge( "numOfBlocksDeleted", block_chain_.getGenesisBlockId() > 0 ? (block_chain_.getGenesisBlockId() - 1) : 0)}, @@ -108,6 +110,7 @@ BlockId KeyValueBlockchain::add(const categorization::Updates &updates, BlockId block_id{}; { block_id = block_chain_.addBlock(block, write_batch); } { latest_keys_.addBlockKeys(updates, block_id, write_batch); } + { keys_history_.addBlockKeys(updates, block_id, write_batch); } return block_id; } @@ -208,6 +211,7 @@ void KeyValueBlockchain::storeLastReachableRevertBatch(const std::optional KeyValueBlockchain::getLatest(const std::st return latest_keys_.getValue(category_id, key); } +std::optional KeyValueBlockchain::getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId version) const { + auto opt_latest = getLatest(category_id, key); + if (opt_latest) { + BlockId latest_version; + std::visit([&latest_version](const auto &opt_latest) { latest_version = opt_latest.block_id; }, *opt_latest); + if (version >= latest_version) { + return opt_latest; + } + } + + if (bftEngine::ReplicaConfig::instance().enableKeysHistoryCF) { + auto block_id = keys_history_.getVersionFromHistory(category_id, key, version); + if (block_id) { + return get(category_id, key, block_id.value()); + } + } else { + // TODO: get() returns nullopt for a deleted key as well as for a key that was not updated at the block. So, if our + // key is deleted at the snapshot version, we will return its last value before it was deleted, instead of nullopt. + BlockId limit = 0; + BlockId max_blocks_to_look = bftEngine::ReplicaConfig::instance().keysHistoryMaxBlocksNum; + if (version > max_blocks_to_look) { + limit = version - max_blocks_to_look; + } + for (auto i = version; i > limit; i--) { + auto opt_value = get(category_id, key, i); + if (opt_value) { + return opt_value; + } + } + } + return std::nullopt; +} + void KeyValueBlockchain::multiGet(const std::string &category_id, const std::vector &keys, const std::vector &versions, diff --git a/kvbc/test/CMakeLists.txt b/kvbc/test/CMakeLists.txt index 9485068cda..1ba740a7a1 100644 --- a/kvbc/test/CMakeLists.txt +++ b/kvbc/test/CMakeLists.txt @@ -270,6 +270,19 @@ if (BUILD_ROCKSDB_STORAGE) stdc++fs ) + add_executable(v4_keys_history_unit_test + v4blockchain/v4_keys_history_test.cpp ) + file(COPY v4blockchain/rocksdb_config_test.ini DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) + add_test(v4_keys_history_unit_test v4_keys_history_unit_test) + target_link_libraries(v4_keys_history_unit_test PUBLIC + GTest::Main + GTest::GTest + util + corebft + kvbc + stdc++fs + ) + add_executable(v4_blockchain_unit_test v4blockchain/blockchain_test.cpp ) add_test(v4_blockchain_unit_test v4_blockchain_unit_test) diff --git a/kvbc/test/categorization/kv_blockchain_test.cpp b/kvbc/test/categorization/kv_blockchain_test.cpp index 747a95f301..78e5b072c9 100644 --- a/kvbc/test/categorization/kv_blockchain_test.cpp +++ b/kvbc/test/categorization/kv_blockchain_test.cpp @@ -2412,6 +2412,76 @@ TEST_F(categorized_kvbc, trim_blocks_from_snapshot_called_with_bigger_block_id) ASSERT_DEATH(kvbc.trimBlocksFromSnapshot(4), ""); } +TEST_F(categorized_kvbc, get_from_snapshot) { + const auto link_st_chain = true; + auto kvbc = KeyValueBlockchain{ + db, + link_st_chain, + std::map{{kExecutionProvableCategory, CATEGORY_TYPE::block_merkle}, + {kConcordInternalCategoryId, CATEGORY_TYPE::versioned_kv}}}; + { // block 1 + Updates updates; + BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key1", "merkle_value_block1"); + merkle_updates.addUpdate("merkle_key2", "merkle_value_block1"); + merkle_updates.addUpdate("merkle_key4", "merkle_value_block1"); + updates.add(kExecutionProvableCategory, std::move(merkle_updates)); + ASSERT_EQ(kvbc.addBlock(std::move(updates)), (BlockId)1); + } + { // block 2 + Updates updates; + BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key1", "merkle_value_block2"); + merkle_updates.addUpdate("merkle_key3", "merkle_value_block2"); + updates.add(kExecutionProvableCategory, std::move(merkle_updates)); + ASSERT_EQ(kvbc.addBlock(std::move(updates)), (BlockId)2); + } + { // block 3 + Updates updates; + BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key2", "merkle_value_block3"); + merkle_updates.addUpdate("merkle_key3", "merkle_value_block3"); + updates.add(kExecutionProvableCategory, std::move(merkle_updates)); + ASSERT_EQ(kvbc.addBlock(std::move(updates)), (BlockId)3); + } + { // block 4 + Updates updates; + BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key2", "merkle_value_block4"); + merkle_updates.addUpdate("merkle_key4", "merkle_value_block4"); + merkle_updates.addDelete("merkle_key3"); + updates.add(kExecutionProvableCategory, std::move(merkle_updates)); + ASSERT_EQ(kvbc.addBlock(std::move(updates)), (BlockId)4); + } + auto opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key2", 2); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 1); + opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key3", 2); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 2); + opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key1", 3); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 2); + opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key3", 3); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 3); + opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key3", 4); + ASSERT_FALSE(opt_val); + opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key1", 4); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 2); + + opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key4", 3); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 1); + + BlockId orig_max_blocks_to_look = bftEngine::ReplicaConfig::instance().keysHistoryMaxBlocksNum; + bftEngine::ReplicaConfig::instance().setkeysHistoryMaxBlocksNum(2); + opt_val = kvbc.getFromSnapshot(kExecutionProvableCategory, "merkle_key4", 3); + ASSERT_FALSE(opt_val); + bftEngine::ReplicaConfig::instance().setkeysHistoryMaxBlocksNum(orig_max_blocks_to_look); +} + } // end namespace int main(int argc, char** argv) { diff --git a/kvbc/test/kvbc_app_filter/kvbc_filter_test.cpp b/kvbc/test/kvbc_app_filter/kvbc_filter_test.cpp index 3e37e4b6ff..09c0e81795 100644 --- a/kvbc/test/kvbc_app_filter/kvbc_filter_test.cpp +++ b/kvbc/test/kvbc_app_filter/kvbc_filter_test.cpp @@ -199,6 +199,12 @@ class FakeStorage : public concord::kvbc::IReader { BlockId getLastBlockId() const override { return blockId_; } + std::optional getFromSnapshot( + const std::string &category_id, const std::string &key, const BlockId snapshot_version) const override final { + ADD_FAILURE() << "getFromSnapshot() should not be called by this test"; + return {}; + } + // Dummy method to fill DB for testing, each client id can watch only the // block id that equals to their client id. void fillWithData(BlockId num_of_blocks) { diff --git a/kvbc/test/pruning_test.cpp b/kvbc/test/pruning_test.cpp index 1a86fc58c3..31e4b04759 100644 --- a/kvbc/test/pruning_test.cpp +++ b/kvbc/test/pruning_test.cpp @@ -149,6 +149,12 @@ class TestStorage : public IReader, public IBlockAdder, public IBlocksDeleter { BlockId getLastBlockId() const override { return bc_.getLastReachableBlockId(); } + std::optional getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId snapshot_version) const override { + return std::nullopt; + } + // IBlocksDeleter interface void deleteGenesisBlock() override { const auto genesisBlock = bc_.getGenesisBlockId(); diff --git a/kvbc/test/replica_state_sync_test.cpp b/kvbc/test/replica_state_sync_test.cpp index d7ccd0f0df..a3da7afab7 100644 --- a/kvbc/test/replica_state_sync_test.cpp +++ b/kvbc/test/replica_state_sync_test.cpp @@ -133,6 +133,12 @@ class replica_state_sync_test : public Test, public IReader { BlockId getLastBlockId() const override { throw std::logic_error{"IReader::getLastBlockId() should not be called"}; } + std::optional getFromSnapshot(const std::string &category_id, + const std::string &key, + const BlockId snapshot_version) const override { + throw std::logic_error{"IReader::getFromSnapshot() should not be called"}; + } + protected: void addBlockWithBftSeqNum(std::uint64_t seq_number) { auto updates = Updates{}; diff --git a/kvbc/test/v4blockchain/v4_blockchain_test.cpp b/kvbc/test/v4blockchain/v4_blockchain_test.cpp index d7b324f24a..eb8b1f5661 100644 --- a/kvbc/test/v4blockchain/v4_blockchain_test.cpp +++ b/kvbc/test/v4blockchain/v4_blockchain_test.cpp @@ -264,7 +264,7 @@ TEST_F(v4_kvbc, simulation) { } // Add a block which contains updates per category. -// Each category handles its updates and returs an output which goes to the block structure. +// Each category handles its updates and returns an output which goes to the block structure. // The block structure is then inserted into the DB. // we test that the block that is written to DB contains the expected data. TEST_F(v4_kvbc, creation) { @@ -272,6 +272,7 @@ TEST_F(v4_kvbc, creation) { ASSERT_TRUE(db->hasColumnFamily(v4blockchain::detail::ST_CHAIN_CF)); ASSERT_TRUE(db->hasColumnFamily(v4blockchain::detail::LATEST_KEYS_CF)); ASSERT_TRUE(db->hasColumnFamily(v4blockchain::detail::IMMUTABLE_KEYS_CF)); + ASSERT_TRUE(db->hasColumnFamily(v4blockchain::detail::KEYS_HISTORY_CF)); } TEST_F(v4_kvbc, add_blocks) { @@ -2503,6 +2504,160 @@ TEST_F(v4_kvbc, digest_checks) { ASSERT_NE(empty_digest, blockchain->calculateBlockDigest(max_block)); } +TEST_F(v4_kvbc, get_from_snapshot) { + // Add blocks: + { // block 1 + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key1", "merkle_value_block1"); + merkle_updates.addUpdate("merkle_key2", "merkle_value_block1"); + updates.add("merkle", std::move(merkle_updates)); + ASSERT_EQ(blockchain->add(std::move(updates)), (BlockId)1); + } + { // block 2 + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key1", "merkle_value_block2"); + merkle_updates.addUpdate("merkle_key3", "merkle_value_block2"); + updates.add("merkle", std::move(merkle_updates)); + ASSERT_EQ(blockchain->add(std::move(updates)), (BlockId)2); + } + { // block 3 + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key2", "merkle_value_block3"); + merkle_updates.addUpdate("merkle_key3", "merkle_value_block3"); + updates.add("merkle", std::move(merkle_updates)); + ASSERT_EQ(blockchain->add(std::move(updates)), (BlockId)3); + } + { // block 4 + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate("merkle_key2", "merkle_value_block4"); + updates.add("merkle", std::move(merkle_updates)); + ASSERT_EQ(blockchain->add(std::move(updates)), (BlockId)4); + } + auto opt_val = blockchain->getFromSnapshot("merkle", "merkle_key2", 2); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 1); + opt_val = blockchain->getFromSnapshot("merkle", "merkle_key3", 2); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 2); + opt_val = blockchain->getFromSnapshot("merkle", "merkle_key1", 3); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 2); + opt_val = blockchain->getFromSnapshot("merkle", "merkle_key3", 3); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 3); + opt_val = blockchain->getFromSnapshot("merkle", "merkle_key1", 4); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 2); + + BlockId orig_max_blocks_to_look = bftEngine::ReplicaConfig::instance().keysHistoryMaxBlocksNum; + bftEngine::ReplicaConfig::instance().setkeysHistoryMaxBlocksNum(2); + // compact keys-history CF to clear "merkle_key1" history + db->rawDB().CompactRange(::rocksdb::CompactRangeOptions{}, + db->columnFamilyHandle(v4blockchain::detail::KEYS_HISTORY_CF), + nullptr, + nullptr); + // make sure that getFromSnapshot() uses getLatest() when it is possible + opt_val = blockchain->getFromSnapshot("merkle", "merkle_key1", 3); + ASSERT_TRUE(opt_val); + ASSERT_EQ(std::get(*opt_val).block_id, 2); + bftEngine::ReplicaConfig::instance().setkeysHistoryMaxBlocksNum(orig_max_blocks_to_look); +} + +TEST_F(v4_kvbc, get_from_snapshot_all_categories) { + std::mt19937 rgen; + std::uniform_int_distribution dist(10, 100); + const uint64_t max_block = dist(rgen); + const uint32_t num_merkle_each = dist(rgen); + const uint32_t num_versioned_each = dist(rgen); + const uint32_t num_immutable_each = dist(rgen); + + // Keys are: _key_ + // Values are: _value__ + for (uint64_t blk = 1; blk <= max_block; ++blk) { + categorization::Updates updates; + // merkle keys: add once in 3 blocks + if (blk % 3 == 1) { + categorization::BlockMerkleUpdates merkle_updates; + for (uint32_t kid = 1; kid <= num_merkle_each; ++kid) { + std::string key = "merkle_key_" + std::to_string(kid); + std::string val = "merkle_value_" + std::to_string(blk) + "_" + std::to_string(kid); + merkle_updates.addUpdate(std::move(key), std::move(val)); + } + updates.add("merkle", std::move(merkle_updates)); + } + // versioned key: add once in 2 blocks + if (blk % 2 == 0) { + categorization::VersionedUpdates ver_updates; + ver_updates.calculateRootHash(false); + for (uint32_t kid = 1; kid <= num_versioned_each; ++kid) { + std::string key = "versioned_key_" + std::to_string(kid); + std::string val = "versioned_value_" + std::to_string(blk) + "_" + std::to_string(kid); + ver_updates.addUpdate(std::move(key), std::move(val)); + } + updates.add("versioned", std::move(ver_updates)); + } + // immutable keys: add in every block + categorization::ImmutableUpdates immutable_updates; + for (uint32_t kid = 1; kid <= num_immutable_each; ++kid) { + std::string key = "immutable_key_" + std::to_string(kid); + std::string val = "immutable_value_" + std::to_string(blk) + "_" + std::to_string(kid); + immutable_updates.addUpdate(std::move(key), {std::move(val), {std::to_string(blk), std::to_string(kid)}}); + } + updates.add("immutable", std::move(immutable_updates)); + ASSERT_EQ(blockchain->add(std::move(updates)), (BlockId)blk); + } + ASSERT_EQ(blockchain->getGenesisBlockId(), 1); + ASSERT_EQ(blockchain->getLastReachableBlockId(), max_block); + + //////////// Checking the getFromSnapshot //////////// + for (uint64_t blk = 1; blk <= max_block; ++blk) { + // merkle + for (uint32_t kid = 1; kid <= num_merkle_each; ++kid) { + const std::string key = "merkle_key_" + std::to_string(kid); + auto val = blockchain->getFromSnapshot("merkle", key, blk); + ASSERT_TRUE(val); + const uint64_t expected_version = ((blk - 1) / 3) * 3 + 1; + const std::string expected_val = "merkle_value_" + std::to_string(expected_version) + "_" + std::to_string(kid); + auto merkle_val = std::get(*val); + ASSERT_EQ(merkle_val.block_id, expected_version); + ASSERT_EQ(merkle_val.data, expected_val); + } + // versioned + for (uint32_t kid = 1; kid <= num_versioned_each; ++kid) { + const std::string key = "versioned_key_" + std::to_string(kid); + auto val = blockchain->getFromSnapshot("versioned", key, blk); + if (blk > 1) { + ASSERT_TRUE(val); + auto ver_val = std::get(*val); + const uint64_t expected_version = blk - (blk % 2); + const std::string expected_val = + "versioned_value_" + std::to_string(expected_version) + "_" + std::to_string(kid); + ASSERT_EQ(ver_val.block_id, expected_version); + ASSERT_EQ(ver_val.data, expected_val); + } else { + ASSERT_FALSE(val); + } + } + + // immutable + for (uint32_t kid = 1; kid <= num_immutable_each; ++kid) { + const std::string key = "immutable_key_" + std::to_string(kid); + auto val = blockchain->getFromSnapshot("immutable", key, blk); + ASSERT_TRUE(val); + auto immutable_val = std::get(*val); + const uint64_t expected_version = blk; + const std::string expected_val = + "immutable_value_" + std::to_string(expected_version) + "_" + std::to_string(kid); + ASSERT_EQ(immutable_val.block_id, expected_version); + ASSERT_EQ(immutable_val.data, expected_val); + } + } +} + // TEST_F(v4_kvbc, trim_blocks) { // uint64_t max_block = 100; // uint32_t num_merkle_each = 0; diff --git a/kvbc/test/v4blockchain/v4_keys_history_test.cpp b/kvbc/test/v4blockchain/v4_keys_history_test.cpp new file mode 100644 index 0000000000..2c4f2f9966 --- /dev/null +++ b/kvbc/test/v4blockchain/v4_keys_history_test.cpp @@ -0,0 +1,477 @@ +// Concord +// +// Copyright (c) 2020 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the +// "License"). You may not use this product except in compliance with the +// Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright +// notices and license terms. Your use of these subcomponents is subject to the +// terms and conditions of the subcomponent's license, as noted in the LICENSE +// file. + +#include "gtest/gtest.h" +#include "v4blockchain/v4_blockchain.h" +#include "storage/test/storage_test_common.h" +#include "bftengine/ReplicaConfig.hpp" +#include +#include + +using concord::storage::rocksdb::NativeClient; +using namespace concord::kvbc; +using namespace ::testing; + +namespace { + +class v4_kvbc : public Test { + protected: + void SetUp() override { + destroyDb(); + db = TestRocksDb::createNative(); + categories = std::map{ + {"merkle", categorization::CATEGORY_TYPE::block_merkle}, + {"versioned", categorization::CATEGORY_TYPE::versioned_kv}, + {"versioned_2", categorization::CATEGORY_TYPE::versioned_kv}, + {"immutable", categorization::CATEGORY_TYPE::immutable}}; + category_mapping_ = std::make_unique(db, categories); + } + + void TearDown() override { destroyDb(); } + + void destroyDb() { + db.reset(); + ASSERT_EQ(0, db.use_count()); + cleanup(); + } + + typedef struct keyAndVersion_ { + std::string key; + BlockId block_id; + bool operator<(const keyAndVersion_& o) const { return key < o.key || (key == o.key && block_id < o.block_id); } + } keyAndVersion; + + typedef struct expectedHistory_ { + std::optional last_modified; + std::string cat_id; + } expectedHistory; + + using ExpectedMap = std::map; + + std::optional> add_key_with_random(std::mt19937& gen, + const std::string& category, + const BlockId block_id, + const int key_idx, + ExpectedMap& expected_snapshots) { + // Keys are: _key_ + // Values are: _value__ + std::optional> ret_val{std::nullopt}; + std::uniform_int_distribution dist_bool{0, 1}; + const std::string key = category + "_key" + std::to_string(key_idx); + const std::string val = category + "_val" + std::to_string(block_id) + std::to_string(key_idx); + const keyAndVersion merkle_key_ver{key, block_id}; + if (dist_bool(gen)) { + ret_val = std::make_pair(key, val); + expected_snapshots.emplace(merkle_key_ver, expectedHistory{block_id, category}); + } else { + auto val_last_block = expected_snapshots.find({key, block_id - 1}); + if (val_last_block != expected_snapshots.end()) { + expected_snapshots.emplace(merkle_key_ver, val_last_block->second); + } else { + expected_snapshots.emplace(merkle_key_ver, expectedHistory{std::nullopt, category}); + } + } + return ret_val; + } + + std::string delete_random_key(std::mt19937& gen, + const int num_keys, + const std::string& category, + const BlockId block_id, + ExpectedMap& expected_snapshots) { + std::uniform_int_distribution dist_for_delete_key{0, num_keys}; + const int key_idx = dist_for_delete_key(gen); + std::string key = category + "_key" + std::to_string(key_idx); + const keyAndVersion key_ver_update{key, block_id}; + // a key might be deleted before it was created or be deleted twice. anyway, KeysHistory does not care about that. + expected_snapshots[key_ver_update] = expectedHistory{block_id, category}; + return key; + } + + void assert_expected_match_keys_history(const v4blockchain::detail::KeysHistory& keys_history, + const ExpectedMap& expected_snapshots, + const BlockId latest_block) { + for (const auto& [k, v] : expected_snapshots) { + auto opt_val = keys_history.getVersionFromHistory(v.cat_id, k.key, k.block_id); + if (v.last_modified) { + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), v.last_modified.value()); + } else { + ASSERT_EQ(opt_val, std::nullopt); + } + } + } + + void add_merkle_keys_to_block(v4blockchain::detail::KeysHistory& keys_history, + const BlockId block_id, + const int num_keys) { + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + for (int j = 0; j < num_keys; j++) { + const std::string merkle_key = std::to_string(j); + const std::string merkle_val = merkle_category + "_key_" + std::to_string(block_id); + merkle_updates.addUpdate(std::string(merkle_key), std::string(merkle_val)); + } + updates.add(merkle_category, std::move(merkle_updates)); + auto wb = db->getBatch(); + keys_history.addBlockKeys(updates, block_id, wb); + db->write(std::move(wb)); + } + + protected: + std::map categories; + std::unique_ptr category_mapping_; + std::shared_ptr db; + const std::string merkle_category = "merkle"; + const std::string versioned_category = "versioned"; + const std::string immutable_category = "immutable"; +}; + +TEST_F(v4_kvbc, creation) { + v4blockchain::detail::KeysHistory keys_history{db, categories}; + ASSERT_TRUE(db->hasColumnFamily(v4blockchain::detail::KEYS_HISTORY_CF)); + auto options = db->columnFamilyOptions(v4blockchain::detail::KEYS_HISTORY_CF); + ASSERT_NE(options.compaction_filter, nullptr); + ASSERT_EQ(std::string(options.compaction_filter->Name()), std::string("KeysHistoryCompactionFilter")); +} + +TEST_F(v4_kvbc, add_and_get_keys_history) { + v4blockchain::detail::KeysHistory keys_history{db, categories}; + std::random_device seed; + std::mt19937 gen{seed()}; // seed the generator + std::uniform_int_distribution dist{5, 20}; // set min and max + const BlockId num_blocks = (BlockId)dist(gen); // generate number + const int num_keys = dist(gen); // generate number + ExpectedMap expected_snapshots; + + for (BlockId i = 1; i <= num_blocks; i++) { + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + categorization::VersionedUpdates ver_updates; + categorization::ImmutableUpdates immutable_updates; + // add or update keys with random + for (int j = 0; j < num_keys; j++) { + auto opt_key_val = add_key_with_random(gen, merkle_category, i, j, expected_snapshots); + if (opt_key_val) { + merkle_updates.addUpdate(std::string(opt_key_val.value().first), std::string(opt_key_val.value().second)); + } + opt_key_val = add_key_with_random(gen, versioned_category, i, j, expected_snapshots); + if (opt_key_val) { + ver_updates.addUpdate(std::string(opt_key_val.value().first), std::string(opt_key_val.value().second)); + } + opt_key_val = add_key_with_random(gen, immutable_category, i, j, expected_snapshots); + if (opt_key_val) { + immutable_updates.addUpdate(std::string(opt_key_val.value().first), + {std::string(opt_key_val.value().second), {"1"}}); + } + } + // delete a random key + if (i > 1) { + merkle_updates.addDelete(delete_random_key(gen, num_keys, merkle_category, i, expected_snapshots)); + } + updates.add(merkle_category, std::move(merkle_updates)); + updates.add(versioned_category, std::move(ver_updates)); + updates.add(immutable_category, std::move(immutable_updates)); + auto wb = db->getBatch(); + keys_history.addBlockKeys(updates, i, wb); + db->write(std::move(wb)); + } + ///////////// Checking getVersionFromHistory ///////////// + assert_expected_match_keys_history(keys_history, expected_snapshots, num_blocks); +} + +TEST_F(v4_kvbc, revertLastBlockKeys) { + v4blockchain::detail::KeysHistory keys_history{db, categories}; + std::random_device seed; + std::mt19937 gen{seed()}; // seed the generator + std::uniform_int_distribution dist{5, 20}; // set min and max + const BlockId num_blocks = (BlockId)dist(gen); // generate number + ExpectedMap expected_snapshots; + categorization::Updates updates_to_revert; + + // Keys are: _key__ + // Values are: _value__ + for (BlockId i = 1; i <= num_blocks; i++) { + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + categorization::VersionedUpdates ver_updates; + categorization::ImmutableUpdates immutable_updates; + + const int num_keys = dist(gen); // generate number + // add or update keys + // don't add updates of the last block to expected_snapshots since it will be reverted + for (int j = 0; j < num_keys; j++) { + const std::string merkle_key = merkle_category + "_key_" + std::to_string(i) + "_" + std::to_string(j); + const std::string merkle_val = merkle_category + "_val_" + std::to_string(i) + "_" + std::to_string(j); + merkle_updates.addUpdate(std::string(merkle_key), std::string(merkle_val)); + const keyAndVersion merkle_key_ver{merkle_key, i}; + if (i < num_blocks) expected_snapshots.emplace(merkle_key_ver, expectedHistory{i, merkle_category}); + + const std::string ver_key = versioned_category + "_key_" + std::to_string(i) + "_" + std::to_string(j); + const std::string ver_val = versioned_category + "_val_" + std::to_string(i) + "_" + std::to_string(j); + ver_updates.addUpdate(std::string(ver_key), std::string(ver_val)); + const keyAndVersion ver_key_ver{ver_key, i}; + if (i < num_blocks) expected_snapshots.emplace(ver_key_ver, expectedHistory{i, versioned_category}); + + const std::string immutable_key = immutable_category + "_key_" + std::to_string(i) + "_" + std::to_string(j); + const std::string immutable_val = immutable_category + "_val_" + std::to_string(i) + "_" + std::to_string(j); + immutable_updates.addUpdate(std::string(immutable_key), {std::string(immutable_val), {"1"}}); + const keyAndVersion immutable_key_ver{immutable_key, i}; + if (i < num_blocks) expected_snapshots.emplace(immutable_key_ver, expectedHistory{i, immutable_category}); + } + // delete + if (i > 1) { + std::uniform_int_distribution dist_old_block{1, int(i - 1)}; + const int key_id = 3; + BlockId block_id = (BlockId)dist_old_block(gen); + const auto merkle_key = merkle_category + "_key_" + std::to_string(block_id) + "_" + std::to_string(key_id); + merkle_updates.addDelete(std::string(merkle_key)); + if (i < num_blocks) expected_snapshots[{merkle_key, i}] = expectedHistory{i, merkle_category}; + + block_id = (BlockId)dist_old_block(gen); + const auto ver_key = versioned_category + "_key_" + std::to_string(block_id) + "_" + std::to_string(key_id); + ver_updates.addDelete(std::string(ver_key)); + if (i < num_blocks) expected_snapshots[{ver_key, i}] = expectedHistory{i, versioned_category}; + } + updates.add(merkle_category, std::move(merkle_updates)); + updates.add(versioned_category, std::move(ver_updates)); + updates.add(immutable_category, std::move(immutable_updates)); + auto wb = db->getBatch(); + keys_history.addBlockKeys(updates, i, wb); + db->write(std::move(wb)); + + // keep the updates of the last block which is about to be reverted + if (i == num_blocks) { + updates_to_revert = updates; + } + } + + auto revert_wb = db->getBatch(); + keys_history.revertLastBlockKeys(updates_to_revert, num_blocks, revert_wb); + db->write(std::move(revert_wb)); + + assert_expected_match_keys_history(keys_history, expected_snapshots, num_blocks); +} + +TEST_F(v4_kvbc, filter_keys) { + v4blockchain::detail::KeysHistory keys_history{db, categories}; + std::random_device seed; + std::mt19937 gen{seed()}; // seed the generator + std::uniform_int_distribution dist{20, 100}; // set min and max + const BlockId num_blocks = (BlockId)dist(gen); // generate number + const int num_keys = dist(gen); + std::uniform_int_distribution dist2{5, 19}; + const BlockId num_blocks_to_save_in_history = (BlockId)dist2(gen); + + auto orig_max_blocks_to_save = bftEngine::ReplicaConfig::instance().keysHistoryMaxBlocksNum; + bftEngine::ReplicaConfig::instance().setkeysHistoryMaxBlocksNum(num_blocks_to_save_in_history); + + // add blocks + for (BlockId i = 1; i <= num_blocks; i++) { + add_merkle_keys_to_block(keys_history, i, num_keys); + } + v4blockchain::detail::Blockchain::global_latest_block_id = num_blocks; + + // compaction triggers filter + auto prefix = category_mapping_->categoryPrefix(merkle_category); + auto first_key = prefix + std::to_string(1) + v4blockchain::detail::Blockchain::generateKey(1); + auto last_key = prefix + std::to_string(num_blocks) + v4blockchain::detail::Blockchain::generateKey(num_blocks); + auto start_key = ::rocksdb::Slice(concord::storage::rocksdb::detail::toSlice(first_key)); + auto end_key = ::rocksdb::Slice(concord::storage::rocksdb::detail::toSlice(last_key)); + + db->rawDB().CompactRange(::rocksdb::CompactRangeOptions{}, + db->columnFamilyHandle(v4blockchain::detail::KEYS_HISTORY_CF), + &start_key, + &end_key); + + // make sure that keys of old blocks were filtered out from keys history + for (BlockId i = 1; i <= num_blocks; i++) { + for (int j = 0; j < num_keys; j++) { + auto key = std::to_string(j); + auto opt_val = keys_history.getVersionFromHistory(merkle_category, key, i); + if (i > num_blocks - num_blocks_to_save_in_history) { + ASSERT_TRUE(opt_val); + ASSERT_EQ(opt_val.value(), i); + } else { + ASSERT_FALSE(opt_val); + } + } + } + + // check the case when keysHistoryMaxBlocksNum == 0 + bftEngine::ReplicaConfig::instance().setkeysHistoryMaxBlocksNum(0); + + for (BlockId i = num_blocks + 1; i <= 2 * num_blocks; i++) { + add_merkle_keys_to_block(keys_history, i, num_keys); + } + v4blockchain::detail::Blockchain::global_genesis_block_id = num_blocks + (BlockId)dist2(gen); + + // compaction triggers filter + first_key = prefix + std::to_string(1) + v4blockchain::detail::Blockchain::generateKey(1); + last_key = prefix + std::to_string(2 * num_blocks) + v4blockchain::detail::Blockchain::generateKey(2 * num_blocks); + start_key = ::rocksdb::Slice(concord::storage::rocksdb::detail::toSlice(first_key)); + end_key = ::rocksdb::Slice(concord::storage::rocksdb::detail::toSlice(last_key)); + + db->rawDB().CompactRange(::rocksdb::CompactRangeOptions{}, + db->columnFamilyHandle(v4blockchain::detail::KEYS_HISTORY_CF), + &start_key, + &end_key); + + // make sure that keys of blocks below the genesis were filtered out from keys history + for (BlockId i = num_blocks + 1; i <= 2 * num_blocks; i++) { + for (int j = 0; j < num_keys; j++) { + auto key = std::to_string(j); + auto opt_val = keys_history.getVersionFromHistory(merkle_category, key, i); + if (i >= v4blockchain::detail::Blockchain::global_genesis_block_id) { + ASSERT_TRUE(opt_val); + ASSERT_EQ(opt_val.value(), i); + } else { + ASSERT_FALSE(opt_val); + } + } + } + + // cleanup test changes + bftEngine::ReplicaConfig::instance().setkeysHistoryMaxBlocksNum(orig_max_blocks_to_save); + v4blockchain::detail::Blockchain::global_latest_block_id = 0; + v4blockchain::detail::Blockchain::global_genesis_block_id = 0; +} + +TEST_F(v4_kvbc, follow_key_history) { + v4blockchain::detail::KeysHistory keys_history{db, categories}; + std::random_device seed; + std::mt19937 gen{seed()}; // seed the generator + std::uniform_int_distribution dist{10, 40}; // set min and max + const BlockId num_blocks = (BlockId)dist(gen); // generate number + const int num_keys = dist(gen); + std::optional opt_val; + const auto key1 = "key1"; + + // keys history CF is empty + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, num_blocks); + ASSERT_EQ(opt_val, std::nullopt); + + // add blocks [1, num_blocks] + for (BlockId i = 1; i <= num_blocks; i++) { + add_merkle_keys_to_block(keys_history, i, num_keys); + // key1 has not added yet + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, i); + ASSERT_EQ(opt_val, std::nullopt); + } + + // add key1 to block [num_blocks + 1] + categorization::Updates updates; + categorization::BlockMerkleUpdates merkle_updates; + merkle_updates.addUpdate(key1, "initial_val"); + updates.add(merkle_category, std::move(merkle_updates)); + auto wb = db->getBatch(); + keys_history.addBlockKeys(updates, num_blocks + 1, wb); + db->write(std::move(wb)); + + // make sure that key1's history is correct + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, num_blocks); + ASSERT_EQ(opt_val, std::nullopt); + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, num_blocks + 1); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), num_blocks + 1); + + // add some additional blocks [num_blocks + 2, additional_blocks] + const BlockId additional_blocks = num_blocks + (BlockId)dist(gen); // generate number + for (BlockId i = num_blocks + 2; i <= additional_blocks; i++) { + add_merkle_keys_to_block(keys_history, i, num_keys); + // key1 was last updated in block [num_blocks + 1] + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, i); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), num_blocks + 1); + } + + // update key1 at block [additional_blocks + 1] + categorization::Updates updates1; + categorization::BlockMerkleUpdates merkle_updates1; + merkle_updates1.addUpdate(key1, "updated_val"); + updates1.add(merkle_category, std::move(merkle_updates1)); + wb = db->getBatch(); + keys_history.addBlockKeys(updates1, additional_blocks + 1, wb); + db->write(std::move(wb)); + + // make sure that key1's history is correct + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, additional_blocks); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), num_blocks + 1); + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, additional_blocks + 1); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), additional_blocks + 1); + + // add some more blocks [additional_blocks + 2, more_blocks] + const BlockId more_blocks = additional_blocks + (BlockId)dist(gen); // generate number + for (BlockId i = additional_blocks + 2; i <= more_blocks; i++) { + add_merkle_keys_to_block(keys_history, i, num_keys); + // key1 was last updated on num_blocks + 1 + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, i); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), additional_blocks + 1); + } + + // delete key1 in block [more_blocks + 1] + categorization::Updates updates2; + categorization::BlockMerkleUpdates merkle_updates2; + merkle_updates2.addDelete(key1); + updates2.add(merkle_category, std::move(merkle_updates2)); + wb = db->getBatch(); + keys_history.addBlockKeys(updates2, more_blocks + 1, wb); + db->write(std::move(wb)); + + // make sure that key1's history is correct + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, more_blocks); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), additional_blocks + 1); + // deletion of key is saved to keys history as well + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, more_blocks + 1); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), more_blocks + 1); + + // add some extra blocks [more_blocks + 2, extra_blocks] + const BlockId extra_blocks = more_blocks + (BlockId)dist(gen); // generate number + for (BlockId i = more_blocks + 2; i <= extra_blocks; i++) { + add_merkle_keys_to_block(keys_history, i, num_keys); + // key1 was last updated on num_blocks + 1 + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, i); + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), more_blocks + 1); + } + + // make sure also after we are done adding blocks + const BlockId total_blocks_number = num_blocks + additional_blocks + more_blocks + extra_blocks; + for (BlockId i = 1; i <= total_blocks_number; i++) { + opt_val = keys_history.getVersionFromHistory(merkle_category, key1, i); + if (i <= num_blocks) { + ASSERT_EQ(opt_val, std::nullopt); + } else if (i <= additional_blocks) { + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), num_blocks + 1); + } else if (i <= more_blocks) { + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), additional_blocks + 1); + } else if (i <= extra_blocks) { + ASSERT_NE(opt_val, std::nullopt); + ASSERT_EQ(opt_val.value(), more_blocks + 1); + } + } +} + +} // end namespace + +int main(int argc, char** argv) { + InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/thin-replica-server/test/thin_replica_server_test.cpp b/thin-replica-server/test/thin_replica_server_test.cpp index a00d2666fb..3d4b611300 100644 --- a/thin-replica-server/test/thin_replica_server_test.cpp +++ b/thin-replica-server/test/thin_replica_server_test.cpp @@ -283,6 +283,13 @@ class FakeStorage : public concord::kvbc::IReader { return {}; } + std::optional getFromSnapshot(const std::string& category_id, + const std::string& key, + const BlockId snapshot_version) const override { + ADD_FAILURE() << "getFromSnapshot() should not be called by this test"; + return {}; + } + BlockId getGenesisBlockId() const override { return genesis_block_id; } BlockId getLastBlockId() const override { return block_id_; }