Skip to content

Commit

Permalink
Add rocksdb column family of keys history to get fast access to snaps…
Browse files Browse the repository at this point in the history
…hots (#2938)

Keys: category_name|user_key|big_endian_block_id
Values: empty string

Add unit tests
  • Loading branch information
Efrat1 authored and cloudnoize committed Apr 24, 2023
1 parent 0246bc0 commit a8e483f
Show file tree
Hide file tree
Showing 28 changed files with 1,291 additions and 13 deletions.
13 changes: 13 additions & 0 deletions bftengine/include/bftengine/ReplicaConfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ class ReplicaConfig : public concord::serialize::SerializableFactory<ReplicaConf
concord::crypto::SignatureAlgorithm::EdDSA,
"A flag to specify the operator message signing algorithm. It is defaulted to use EDDSA algo.");

CONFIG_PARAM(keysHistoryMaxBlocksNum,
std::uint64_t,
10000u,
"threshold for pruning from keys history, versions below latest_block_id - this_value can be pruned."
"If set to 0, versions below the genesis block will be pruned, i.e. the whole available history will be "
"kept (archival).");

CONFIG_PARAM(enableKeysHistoryCF, bool, true, "Feature flag for keys history column family");

// Parameter to enable/disable waiting for transaction data to be persisted.
// Not predefined configuration parameters
// Example of usage:
Expand Down Expand Up @@ -424,6 +433,8 @@ class ReplicaConfig : public concord::serialize::SerializableFactory<ReplicaConf
serialize(outStream, kvBlockchainVersion);
serialize(outStream, operatorMsgSigningAlgo);
serialize(outStream, replicaMsgSigningAlgo);
serialize(outStream, keysHistoryMaxBlocksNum);
serialize(outStream, enableKeysHistoryCF);
}
void deserializeDataMembers(std::istream& inStream) {
deserialize(inStream, isReadOnly);
Expand Down Expand Up @@ -524,6 +535,8 @@ class ReplicaConfig : public concord::serialize::SerializableFactory<ReplicaConf
deserialize(inStream, kvBlockchainVersion);
deserialize(inStream, operatorMsgSigningAlgo);
deserialize(inStream, replicaMsgSigningAlgo);
deserialize(inStream, keysHistoryMaxBlocksNum);
deserialize(inStream, enableKeysHistoryCF);
}

private:
Expand Down
1 change: 1 addition & 0 deletions kvbc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ if (BUILD_ROCKSDB_STORAGE)
src/v4blockchain/v4_blockchain.cpp
src/v4blockchain/detail/latest_keys.cpp
src/v4blockchain/detail/categories.cpp
src/v4blockchain/detail/keys_history.cpp
src/v4blockchain/detail/blocks.cpp
src/v4blockchain/detail/st_chain.cpp
src/v4blockchain/detail/blockchain.cpp)
Expand Down
4 changes: 4 additions & 0 deletions kvbc/include/Replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class Replica : public IReplica,

// Get the last block ID in the system.
BlockId getLastBlockId() const override final;

std::optional<categorization::Value> getFromSnapshot(const std::string &category_id,
const std::string &key,
const BlockId snapshot_version) const override final;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// checkpoint
void checkpointInProcess(bool flag, kvbc::BlockId);
Expand Down
4 changes: 4 additions & 0 deletions kvbc/include/categorization/kv_blockchain.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ class KeyValueBlockchain {
// The key used in the default column family for persisting the current public state hash.
static std::string publicStateHashKey();

std::optional<Value> getFromSnapshot(const std::string& category_id,
const std::string& key,
const BlockId snapshot_version);

private:
BlockId addBlock(CategoryInput&& category_updates, concord::storage::rocksdb::NativeWriteBatch& write_batch);

Expand Down
6 changes: 6 additions & 0 deletions kvbc/include/db_interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ class IReader {
// Get the last block ID in the system.
virtual BlockId getLastBlockId() const = 0;

// Get the version of 'key' that is the closest from below or equal to 'snapshot_version'
// Return std::nullopt if the key doesn't exist or is deleted at given version
virtual std::optional<categorization::Value> getFromSnapshot(const std::string &category_id,
const std::string &key,
const BlockId snapshot_version) const = 0;

virtual ~IReader() = default;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class KeyValueBlockchain : public IReader, public IBlockAdder {

// Get the last block ID in the system.
BlockId getLastBlockId() const override final { return kvbc_->getLastReachableBlockId(); }

std::optional<concord::kvbc::categorization::Value> getFromSnapshot(
const std::string &category_id, const std::string &key, const BlockId snapshot_version) const override final {
return kvbc_->getFromSnapshot(category_id, key, snapshot_version);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
6 changes: 6 additions & 0 deletions kvbc/include/kvbc_adapter/idempotent_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class IdempotentReader : public IReader {

BlockId getLastBlockId() const override { return kvbc_->getLastBlockId(); }

std::optional<concord::kvbc::categorization::Value> getFromSnapshot(const std::string &category_id,
const std::string &key,
const BlockId snapshot_version) const override {
return kvbc_->getFromSnapshot(category_id, key, snapshot_version);
}

private:
const std::shared_ptr<const ReplicaBlockchain> kvbc_;
};
Expand Down
6 changes: 6 additions & 0 deletions kvbc/include/kvbc_adapter/replica_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ class ReplicaBlockchain : public IBlocksDeleter,

// Get the last block ID in the system.
BlockId getLastBlockId() const override final { return reader_->getLastBlockId(); }

std::optional<categorization::Value> getFromSnapshot(const std::string &category_id,
const std::string &key,
const BlockId snapshot_version) const override final {
return reader_->getFromSnapshot(category_id, key, snapshot_version);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ class BlocksReaderAdapter : public IReader {

// Get the last block ID in the system.
BlockId getLastBlockId() const override final { return kvbc_->getLastReachableBlockId(); }

// Get the version of 'key' that is the closest from below or equal to 'snapshot_version'
// Return std::nullopt if the key doesn't exist or is deleted at given version
std::optional<concord::kvbc::categorization::Value> getFromSnapshot(
const std::string &category_id, const std::string &key, const BlockId snapshot_version) const override final {
return kvbc_->getFromSnapshot(category_id, key, snapshot_version);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

private:
Expand Down
13 changes: 11 additions & 2 deletions kvbc/include/v4blockchain/detail/blockchain.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
#include "v4blockchain/detail/column_families.h"

namespace concord::kvbc::v4blockchain::detail {

template <typename... Sliceable>
auto getSliceArray(const Sliceable&... sls) {
return std::array<::rocksdb::Slice, sizeof...(sls)>{sls...};
}

/*
This class composes the blockchain out of detail::Block.
It knows to :
Expand Down Expand Up @@ -63,15 +69,17 @@ class Blockchain {
// Loads from storage the last and first block ids respectivly.
std::optional<BlockId> loadLastReachableBlockId();
std::optional<BlockId> loadGenesisBlockId();
void setLastReachable(BlockId id) { last_reachable_block_id_ = id; }
void setLastReachable(BlockId id) {
last_reachable_block_id_ = id;
global_latest_block_id = id;
}
void setBlockId(BlockId id);
BlockId getLastReachable() const { return last_reachable_block_id_; }
BlockId getGenesisBlockId() const { return genesis_block_id_; }
void setGenesisBlockId(BlockId id) {
genesis_block_id_ = id;
global_genesis_block_id = id;
}

// Returns the buffer that represents the block
std::optional<std::string> getBlockData(concord::kvbc::BlockId id) const;
std::optional<categorization::Updates> getBlockUpdates(BlockId id) const;
Expand Down Expand Up @@ -110,6 +118,7 @@ class Blockchain {
uint64_t from_future{};
uint64_t from_storage{};
static std::atomic<BlockId> global_genesis_block_id;
static std::atomic<BlockId> global_latest_block_id;
void deleteBlock(BlockId id, storage::rocksdb::NativeWriteBatch& wb) {
ConcordAssertLE(id, last_reachable_block_id_);
ConcordAssertGE(id, genesis_block_id_);
Expand Down
1 change: 1 addition & 0 deletions kvbc/include/v4blockchain/detail/column_families.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ inline const auto LATEST_KEYS_CF = std::string{"v4_latest_keys"};
inline const auto IMMUTABLE_KEYS_CF = std::string{"v4_immutable_keys"};
inline const auto CATEGORIES_CF = std::string{"v4_categories"};
inline const auto MISC_CF = std::string{"v4_misc"};
inline const auto KEYS_HISTORY_CF = std::string{"v4_keys_history"};

} // namespace concord::kvbc::v4blockchain::detail
117 changes: 117 additions & 0 deletions kvbc/include/v4blockchain/detail/keys_history.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Concord
//
// Copyright (c) 2022 VMware, Inc. All Rights Reserved.
//
// This product is licensed to you under the Apache 2.0 license (the
// "License"). You may not use this product except in compliance with the
// Apache 2.0 License.
//
// This product may include a number of subcomponents with separate copyright
// notices and license terms. Your use of these subcomponents is subject to the
// terms and conditions of the subcomponent's license, as noted in the LICENSE
// file.

#pragma once

#include "rocksdb/native_client.h"
#include "categorization/updates.h"
#include "v4blockchain/detail/categories.h"
#include <rocksdb/compaction_filter.h>

namespace concord::kvbc::v4blockchain::detail {

class KeysHistory {
public:
KeysHistory(const std::shared_ptr<concord::storage::rocksdb::NativeClient>&,
const std::optional<std::map<std::string, concord::kvbc::categorization::CATEGORY_TYPE>>&);

// Add the keys of the block to the keys history column family
void addBlockKeys(const concord::kvbc::categorization::Updates&, const BlockId, storage::rocksdb::NativeWriteBatch&);

// Return the id of the block where the key was last updated to its value at given version
std::optional<BlockId> getVersionFromHistory(const std::string& category_id,
const std::string& key,
const BlockId version) const;

// Delete the last added block keys
void revertLastBlockKeys(const concord::kvbc::categorization::Updates&,
const BlockId,
storage::rocksdb::NativeWriteBatch&);

struct KHCompactionFilter : ::rocksdb::CompactionFilter {
static ::rocksdb::CompactionFilter* getFilter() {
static KHCompactionFilter instance;
return &instance;
}
KHCompactionFilter() {}
const char* Name() const override { return "KeysHistoryCompactionFilter"; }
bool Filter(int /*level*/,
const ::rocksdb::Slice& key,
const ::rocksdb::Slice& /*val*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override;
};

private:
void handleCategoryUpdates(const std::string& block_version,
const std::string& category_id,
const concord::kvbc::categorization::BlockMerkleInput&,
concord::storage::rocksdb::NativeWriteBatch&);
void handleCategoryUpdates(const std::string& block_version,
const std::string& category_id,
const concord::kvbc::categorization::VersionedInput&,
concord::storage::rocksdb::NativeWriteBatch&);
void handleCategoryUpdates(const std::string& block_version,
const std::string& category_id,
const concord::kvbc::categorization::ImmutableInput&,
concord::storage::rocksdb::NativeWriteBatch&);

template <typename UPDATES>
void handleUpdatesImp(const std::string& block_version,
const std::string& category_id,
const UPDATES& updates_kv,
concord::storage::rocksdb::NativeWriteBatch& write_batch);

template <typename DELETES>
void handleDeletesUpdatesImp(const std::string& block_version,
const std::string& category_id,
const DELETES& deletes,
concord::storage::rocksdb::NativeWriteBatch& write_batch);

void revertCategoryKeys(const std::string& category_id,
const categorization::BlockMerkleInput& updates,
const std::string& block_version,
concord::storage::rocksdb::NativeWriteBatch& write_batch);
void revertCategoryKeys(const std::string& category_id,
const categorization::VersionedInput& updates,
const std::string& block_version,
concord::storage::rocksdb::NativeWriteBatch& write_batch);
void revertCategoryKeys(const std::string& category_id,
const categorization::ImmutableInput& updates,
const std::string& block_version,
concord::storage::rocksdb::NativeWriteBatch& write_batch);

template <typename UPDATES>
void revertKeysImp(const std::string& category_id,
const UPDATES& updates_kv,
const std::string& block_version,
concord::storage::rocksdb::NativeWriteBatch& write_batch);

template <typename DELETES>
void revertDeletedKeysImp(const std::string& category_id,
const DELETES& deletes,
const std::string& block_version,
concord::storage::rocksdb::NativeWriteBatch& write_batch);

void revertOneKeyAtVersion(const std::string& category_id,
const std::string& block_id,
const std::string& prefix,
const std::string& key,
concord::storage::rocksdb::NativeWriteBatch& write_batch);

private:
std::shared_ptr<concord::storage::rocksdb::NativeClient> native_client_;
v4blockchain::detail::Categories category_mapping_;
};

} // namespace concord::kvbc::v4blockchain::detail
5 changes: 5 additions & 0 deletions kvbc/include/v4blockchain/v4_blockchain.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "v4blockchain/detail/st_chain.h"
#include "v4blockchain/detail/latest_keys.h"
#include "v4blockchain/detail/blockchain.h"
#include "v4blockchain/detail/keys_history.h"
#include <memory>
#include <string>

Expand Down Expand Up @@ -92,6 +93,9 @@ class KeyValueBlockchain {
BlockId block_id) const;

std::optional<categorization::Value> getLatest(const std::string &category_id, const std::string &key) const;
std::optional<categorization::Value> getFromSnapshot(const std::string &category_id,
const std::string &key,
const BlockId version) const;

void multiGet(const std::string &category_id,
const std::vector<std::string> &keys,
Expand Down Expand Up @@ -269,6 +273,7 @@ class KeyValueBlockchain {
v4blockchain::detail::Blockchain block_chain_;
v4blockchain::detail::StChain state_transfer_chain_;
v4blockchain::detail::LatestKeys latest_keys_;
v4blockchain::detail::KeysHistory keys_history_;
// flag to mark whether a checkpoint is being taken.
std::optional<uint64_t> last_block_sn_;
const float updates_to_final_size_ration_{2.5};
Expand Down
6 changes: 6 additions & 0 deletions kvbc/src/Replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ BlockId Replica::getLastBlockId() const {
return m_kvBlockchain->getLastBlockId();
}

std::optional<categorization::Value> Replica::getFromSnapshot(const std::string &category_id,
const std::string &key,
const BlockId snapshot_version) const {
return m_kvBlockchain->getFromSnapshot(category_id, key, snapshot_version);
}

void Replica::checkpointInProcess(bool flag, kvbc::BlockId id) { m_kvBlockchain->checkpointInProcess(flag, id); }

void Replica::set_command_handler(std::shared_ptr<ICommandsHandler> handler) { m_cmdHandler = handler; }
Expand Down
30 changes: 30 additions & 0 deletions kvbc/src/categorization/kv_blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,36 @@ void KeyValueBlockchain::trimBlocksFromSnapshot(BlockId block_id_at_checkpoint)
}
}

std::optional<Value> KeyValueBlockchain::getFromSnapshot(const std::string& category_id,
const std::string& key,
const BlockId snapshot_version) {
auto opt_latest = getLatest(category_id, key);
if (opt_latest) {
BlockId latest_version;
std::visit([&latest_version](const auto& opt_latest) { latest_version = opt_latest.block_id; }, *opt_latest);
if (snapshot_version >= latest_version) {
return opt_latest;
}
}

auto latest_ver = getLatestVersion(category_id, key);
if (opt_latest || latest_ver) {
BlockId limit = 0;
BlockId max_blocks_to_look = bftEngine::ReplicaConfig::instance().keysHistoryMaxBlocksNum;
if (snapshot_version > max_blocks_to_look) {
limit = snapshot_version - max_blocks_to_look;
}
for (auto i = snapshot_version; i > limit; i--) {
auto opt_value = get(category_id, key, i);
// key was updated or deleted at version i
if (opt_value || (latest_ver && latest_ver.value().version == i)) {
return opt_value;
}
}
}
return std::nullopt;
}

/////////////////////// Delete block ///////////////////////
bool KeyValueBlockchain::deleteBlock(const BlockId& block_id) {
diagnostics::TimeRecorder scoped_timer(*histograms_.deleteBlock);
Expand Down
4 changes: 4 additions & 0 deletions kvbc/src/merkle_tree_storage_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "rocksdb/native_client.h"
#include "v4blockchain/detail/column_families.h"
#include "v4blockchain/detail/latest_keys.h"
#include "v4blockchain/detail/keys_history.h"

#include <rocksdb/filter_policy.h>
#include <rocksdb/statistics.h>
Expand Down Expand Up @@ -50,6 +51,9 @@ std::shared_ptr<rocksdb::Statistics> completeRocksDBConfiguration(
(d.name == concord::kvbc::v4blockchain::detail::IMMUTABLE_KEYS_CF)) {
d.options.compaction_filter = concord::kvbc::v4blockchain::detail::LatestKeys::LKCompactionFilter::getFilter();
LOG_DEBUG(V4_BLOCK_LOG, "Setting compaction filter for " << d.name);
} else if (d.name == concord::kvbc::v4blockchain::detail::KEYS_HISTORY_CF) {
d.options.compaction_filter = concord::kvbc::v4blockchain::detail::KeysHistory::KHCompactionFilter::getFilter();
LOG_DEBUG(V4_BLOCK_LOG, "Setting compaction filter for " << d.name);
}
}
return db_options.statistics;
Expand Down
Loading

0 comments on commit a8e483f

Please sign in to comment.