Skip to content

Commit

Permalink
Merge pull request #2128 from dartdart26/block-merkle-key-to-latest-v…
Browse files Browse the repository at this point in the history
…ersion-with-migaration

Use raw key in the block merkle latest version CF
  • Loading branch information
arc-vmware authored Dec 16, 2021
2 parents b4ba1fc + d7a9d38 commit c1718e6
Show file tree
Hide file tree
Showing 15 changed files with 879 additions and 45 deletions.
5 changes: 4 additions & 1 deletion kvbc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cmake_minimum_required (VERSION 3.2)
project(libkvbc VERSION 0.1.0.0 LANGUAGES CXX)

find_package(Boost ${MIN_BOOST_VERSION} COMPONENTS filesystem REQUIRED)

add_library(concord_block_update INTERFACE)
target_include_directories(concord_block_update INTERFACE
Expand Down Expand Up @@ -38,7 +39,8 @@ if (BUILD_ROCKSDB_STORAGE)
src/categorization/kv_blockchain.cpp
src/categorization/blocks.cpp
src/categorization/blockchain.cpp
src/categorization/block_merkle_category.cpp)
src/categorization/block_merkle_category.cpp
src/migrations/block_merkle_latest_ver_cf_migration.cpp)

endif (BUILD_ROCKSDB_STORAGE)
target_link_libraries(kvbc PUBLIC corebft util)
Expand All @@ -54,6 +56,7 @@ if(NOT BUILD_THIRDPARTY)
find_package(OpenSSL REQUIRED)
endif()
target_link_libraries(kvbc PRIVATE OpenSSL::Crypto)
target_link_libraries(kvbc PRIVATE ${Boost_LIBRARIES})

if (BUILD_TESTING)
add_subdirectory(test)
Expand Down
7 changes: 3 additions & 4 deletions kvbc/include/categorization/block_merkle_category.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class BlockMerkleCategory {
// Returns the latest *block* version of a key.
// Returns std::nullopt if the key doesn't exist.
std::optional<TaggedVersion> getLatestVersion(const std::string& key) const;
std::optional<TaggedVersion> getLatestVersion(const Hash& key) const;

// Get values for keys at specific versions.
// `keys` and `versions` must be the same size.
Expand All @@ -68,7 +67,6 @@ class BlockMerkleCategory {
// If a key is missing, std::nullopt is returned for its version.
void multiGetLatestVersion(const std::vector<std::string>& keys,
std::vector<std::optional<TaggedVersion>>& versions) const;
void multiGetLatestVersion(const std::vector<Hash>& keys, std::vector<std::optional<TaggedVersion>>& versions) const;

std::vector<std::string> getBlockStaleKeys(BlockId, const BlockMerkleOutput&) const;
// Delete the given block ID as a genesis one.
Expand Down Expand Up @@ -172,8 +170,9 @@ class BlockMerkleCategory {
// 3. Atomically write the batch to the database.
void deleteStaleBatch(uint64_t start, uint64_t end);

// Retrieve the latest versions for all raw keys in a block and return them along with the hashed keys.
std::pair<std::vector<Hash>, std::vector<std::optional<TaggedVersion>>> getLatestVersions(
// Retrieve the latest versions for all raw keys in a block and return them along with the keys and the hashed keys.
// Returned tuple contains (list_of_key_hashes, list_of_keys, list_of_versions).
std::tuple<std::vector<Hash>, std::vector<std::string>, std::vector<std::optional<TaggedVersion>>> getLatestVersions(
const BlockMerkleOutput& out) const;

// Return a map from block id to all hashed keys that were still active in previously pruned blocks.
Expand Down
77 changes: 77 additions & 0 deletions kvbc/include/migrations/block_merkle_latest_ver_cf_migration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Concord
//
// Copyright (c) 2021 VMware, Inc. All Rights Reserved.
//
// This product is licensed to you under the Apache 2.0 license (the "License").
// You may not use this product except in compliance with the Apache 2.0 License.
//
// This product may include a number of subcomponents with separate copyright
// notices and license terms. Your use of these subcomponents is subject to the
// terms and conditions of the sub-component's license, as noted in the
// LICENSE file.

#pragma once

#include "rocksdb/native_client.h"

#include <rocksdb/utilities/checkpoint.h>

#include <exception>
#include <memory>
#include <optional>
#include <string>
#include <string_view>

namespace concord::kvbc::migrations {

// Migrates a RocksDB DB from using key hashes in the `block_merkle_latest_key_version` to using raw keys.
// Keeps track of what steps have been executed such that if the migration process crashes, it can start fresh or
// continue from where it left off.
class BlockMerkleLatestVerCfMigration {
public:
// Note: `export_path` must be on the same filesystem as `db_path`.
BlockMerkleLatestVerCfMigration(const std::string& db_path, const std::string& export_path);

public:
static const std::string& temporaryColumnFamily();
static const std::string& migrationKey();

// Migration states.
static inline const std::string kStateImportedTempCf{"imported-temp-cf"};
static inline const std::string kStateMigrated{"migrated"};
static inline const std::string kStateMigrationNotNeededOrCompleted{"migration-not-needed-or-completed"};

enum class ExecutionStatus {
kExecuted, // executed as part of this call
kNotNeededOrAlreadyExecuted, // migration is not needed or already executed and, therefore, nothing done in this
// call
};

public:
// Executes the migration, throwing on error.
// If execute() returns, it is always a success. The ExecutionStatus gives indication as to what the actual outcome
// is.
ExecutionStatus execute();

std::shared_ptr<storage::rocksdb::NativeClient> db() { return db_; }

// Following methods are used for testing purposes only. Do not use in production.
public:
void removeExportDir();
void checkpointDB();
void exportLatestVerCf();
void importTempLatestVerCf();
void clearExistingLatestVerCf();
void iterateAndMigrate();
void dropTempLatestVerCf();
void commitComplete();

private:
const std::string db_path_;
const std::string export_path_;
std::shared_ptr<storage::rocksdb::NativeClient> db_;
std::unique_ptr<::rocksdb::Checkpoint> checkpoint_;
std::unique_ptr<::rocksdb::ExportImportFilesMetaData> export_metadata_;
};

} // namespace concord::kvbc::migrations
64 changes: 31 additions & 33 deletions kvbc/src/categorization/block_merkle_category.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,16 @@ std::vector<Buffer> versionedKeys(const std::vector<std::string>& keys, const st
return versioned_keys;
}

void putLatestKeyVersion(NativeWriteBatch& batch, const Hash& key_hash, TaggedVersion version) {
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key_hash, serializeThreadLocal(LatestKeyVersion{version.encode()}));
void putLatestKeyVersion(NativeWriteBatch& batch, const std::string& key, TaggedVersion version) {
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key, serializeThreadLocal(LatestKeyVersion{version.encode()}));
}

void putKeys(NativeWriteBatch& batch,
uint64_t block_id,
std::vector<KeyHash>&& hashed_added_keys,
std::vector<KeyHash>&& hashed_deleted_keys,
BlockMerkleInput& updates) {
auto kv_it = updates.kv.begin();
auto kv_it = updates.kv.cbegin();
for (auto key_it = hashed_added_keys.begin(); key_it != hashed_added_keys.end(); key_it++) {
// Only serialize the Header of a DBValue, to prevent the need to copy a potentially large value.
auto header = toSlice(serializeThreadLocal(DbValueHeader{false, static_cast<uint32_t>(kv_it->second.size())}));
Expand All @@ -177,17 +177,19 @@ void putKeys(NativeWriteBatch& batch,
batch.put(BLOCK_MERKLE_KEYS_CF, serialize(VersionedKey{*key_it, block_id}), val);

// Put the latest version of the key
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key_it->value, serialize(LatestKeyVersion{block_id}));
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, kv_it->first, serialize(LatestKeyVersion{block_id}));

kv_it++;
++kv_it;
}

const bool deleted = true;
auto deletes_it = updates.deletes.cbegin();
for (auto key_it = hashed_deleted_keys.begin(); key_it != hashed_deleted_keys.end(); key_it++) {
// Write a tombstone to the value. This is necessary for deleteLastReachable().
auto tombstone = serialize(DbValueHeader{true, 0});
batch.put(BLOCK_MERKLE_KEYS_CF, serialize(VersionedKey{*key_it, block_id}), tombstone);
putLatestKeyVersion(batch, key_it->value, TaggedVersion(deleted, block_id));
putLatestKeyVersion(batch, *deletes_it, TaggedVersion(deleted, block_id));
++deletes_it;
}
}

Expand Down Expand Up @@ -253,13 +255,15 @@ void removeMerkleNodes(NativeWriteBatch& batch, BlockId block_id, uint64_t tree_
// Return any active key hashes.
std::vector<KeyHash> deleteInactiveKeys(BlockId block_id,
std::vector<Hash>&& hashed_keys,
std::vector<std::string>&& keys,
const std::vector<std::optional<TaggedVersion>>& latest_versions,
NativeWriteBatch& batch,
size_t& deletes_counter) {
std::vector<KeyHash> active_keys;
for (auto i = 0u; i < hashed_keys.size(); i++) {
auto& tagged_version = latest_versions[i];
auto& hashed_key = hashed_keys[i];
auto& key = keys[i];
ConcordAssert(tagged_version.has_value());
ConcordAssertLE(block_id, tagged_version->version);

Expand All @@ -268,7 +272,7 @@ std::vector<KeyHash> deleteInactiveKeys(BlockId block_id,
// The latest version is a tombstone. We can delete the key and version.
auto versioned_key = serialize(VersionedKey{KeyHash{hashed_key}, block_id});
batch.del(BLOCK_MERKLE_KEYS_CF, versioned_key);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
deletes_counter++;
} else {
active_keys.push_back(KeyHash{hashed_key});
Expand Down Expand Up @@ -356,21 +360,16 @@ std::optional<Value> BlockMerkleCategory::get(const Hash& hashed_key, BlockId bl
}

std::optional<Value> BlockMerkleCategory::getLatest(const std::string& key) const {
auto hashed_key = hash(key);
if (auto latest = getLatestVersion(hashed_key)) {
if (auto latest = getLatestVersion(key)) {
if (!latest->deleted) {
return get(hashed_key, latest->version);
return get(hash(key), latest->version);
}
}
return std::nullopt;
}

std::optional<TaggedVersion> BlockMerkleCategory::getLatestVersion(const std::string& key) const {
return getLatestVersion(hash(key));
}

std::optional<TaggedVersion> BlockMerkleCategory::getLatestVersion(const Hash& hashed_key) const {
const auto serialized = db_->getSlice(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key);
const auto serialized = db_->getSlice(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
if (!serialized) {
return std::nullopt;
}
Expand Down Expand Up @@ -418,16 +417,10 @@ void BlockMerkleCategory::multiGet(const std::vector<Buffer>& versioned_keys,

void BlockMerkleCategory::multiGetLatestVersion(const std::vector<std::string>& keys,
std::vector<std::optional<TaggedVersion>>& versions) const {
auto hashed_keys = hashedKeys(keys);
multiGetLatestVersion(hashed_keys, versions);
}

void BlockMerkleCategory::multiGetLatestVersion(const std::vector<Hash>& hashed_keys,
std::vector<std::optional<TaggedVersion>>& versions) const {
auto slices = std::vector<::rocksdb::PinnableSlice>{};
auto statuses = std::vector<::rocksdb::Status>{};

db_->multiGet(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_keys, slices, statuses);
db_->multiGet(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, keys, slices, statuses);
versions.clear();
for (auto i = 0ull; i < slices.size(); ++i) {
const auto& status = statuses[i];
Expand All @@ -448,7 +441,7 @@ void BlockMerkleCategory::multiGetLatest(const std::vector<std::string>& keys,
std::vector<std::optional<Value>>& values) const {
auto hashed_keys = hashedKeys(keys);
std::vector<std::optional<TaggedVersion>> versions;
multiGetLatestVersion(hashed_keys, versions);
multiGetLatestVersion(keys, versions);

// Generate the set of versioned keys for all keys that have latest versions and are not deleted
auto versioned_keys = std::vector<Buffer>{};
Expand Down Expand Up @@ -546,7 +539,8 @@ std::pair<SetOfKeyValuePairs, KeysVector> BlockMerkleCategory::rewriteAlreadyPru

std::vector<std::string> BlockMerkleCategory::getBlockStaleKeys(BlockId block_id, const BlockMerkleOutput& out) const {
std::vector<Hash> hash_stale_keys;
auto [hashed_keys, latest_versions] = getLatestVersions(out);
auto [hashed_keys, _, latest_versions] = getLatestVersions(out);
(void)_;
for (auto i = 0u; i < hashed_keys.size(); i++) {
auto& tagged_version = latest_versions[i];
auto& hashed_key = hashed_keys[i];
Expand Down Expand Up @@ -576,14 +570,15 @@ std::vector<std::string> BlockMerkleCategory::getBlockStaleKeys(BlockId block_id
size_t BlockMerkleCategory::deleteGenesisBlock(BlockId block_id,
const BlockMerkleOutput& out,
NativeWriteBatch& batch) {
auto [hashed_keys, latest_versions] = getLatestVersions(out);
auto [hashed_keys, keys, latest_versions] = getLatestVersions(out);
auto overwritten_active_keys_from_pruned_blocks = findActiveKeysFromPrunedBlocks(hashed_keys);
size_t num_of_deletes = 0;
for (auto& kv : overwritten_active_keys_from_pruned_blocks) {
num_of_deletes += kv.second.size();
}
auto [block_adds, block_removes] = rewriteAlreadyPrunedBlocks(overwritten_active_keys_from_pruned_blocks, batch);
auto active_keys = deleteInactiveKeys(block_id, std::move(hashed_keys), latest_versions, batch, num_of_deletes);
auto active_keys =
deleteInactiveKeys(block_id, std::move(hashed_keys), std::move(keys), latest_versions, batch, num_of_deletes);
if (active_keys.empty()) {
block_removes.push_back(merkleKey(block_id));
} else {
Expand Down Expand Up @@ -616,34 +611,37 @@ void BlockMerkleCategory::deleteLastReachableBlock(BlockId block_id,
// Preserve the deleted flag from the value into the version index.
auto deleted = Deleted{};
deserialize(iter.valueView(), deleted);
putLatestKeyVersion(batch, hashed_key.value, TaggedVersion(deleted.value, prev_key.version));
putLatestKeyVersion(batch, key, TaggedVersion(deleted.value, prev_key.version));
} else {
// This is the only version of the key - remove the latest version index too.
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key.value);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
}
} else {
// No previous keys means this is the only version of the key - remove the latest version index too.
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key.value);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
}
// Remove the value for the key at `block_id`.
batch.del(BLOCK_MERKLE_KEYS_CF, versioned_key);
}
removeMerkleNodes(batch, block_id, out.state_root_version);
}

std::pair<std::vector<Hash>, std::vector<std::optional<TaggedVersion>>> BlockMerkleCategory::getLatestVersions(
const BlockMerkleOutput& out) const {
std::tuple<std::vector<Hash>, std::vector<std::string>, std::vector<std::optional<TaggedVersion>>>
BlockMerkleCategory::getLatestVersions(const BlockMerkleOutput& out) const {
std::vector<Hash> hashed_keys;
std::vector<std::string> keys;
hashed_keys.reserve(out.keys.size());
keys.reserve(out.keys.size());
for (auto& [key, _] : out.keys) {
(void)_;
hashed_keys.push_back(hash(key));
keys.push_back(key);
}

std::vector<std::optional<TaggedVersion>> latest_versions;
multiGetLatestVersion(hashed_keys, latest_versions);
multiGetLatestVersion(keys, latest_versions);

return std::make_pair(hashed_keys, latest_versions);
return std::make_tuple(hashed_keys, keys, latest_versions);
}

void BlockMerkleCategory::putLastDeletedTreeVersion(uint64_t tree_version, NativeWriteBatch& batch) {
Expand Down
6 changes: 6 additions & 0 deletions kvbc/src/categorization/kv_blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "kvbc_key_types.hpp"
#include "categorization/db_categories.h"
#include "endianness.hpp"
#include "migrations/block_merkle_latest_ver_cf_migration.h"

#include <stdexcept>

Expand Down Expand Up @@ -71,6 +72,11 @@ KeyValueBlockchain::KeyValueBlockchain(const std::shared_ptr<concord::storage::r
linkSTChainFrom(getLastReachableBlockId() + 1);
delete_metrics_comp_.Register();
add_metrics_comp_.Register();

// When we use this version of the code that uses the migrated DB format (or a completely fresh blockchain), we no
// longer need migration. That assumes we never run this version of the code on an old DB format (before migrating).
native_client_->put(migrations::BlockMerkleLatestVerCfMigration::migrationKey(),
migrations::BlockMerkleLatestVerCfMigration::kStateMigrationNotNeededOrCompleted);
}

void KeyValueBlockchain::initNewBlockchainCategories(
Expand Down
Loading

0 comments on commit c1718e6

Please sign in to comment.