Skip to content

Commit

Permalink
Use raw key in the block merkle latest version CF
Browse files Browse the repository at this point in the history
Use the actual raw key instead of the key's hash in the block merkle's
BLOCK_MERKLE_LATEST_KEY_VERSION_CF column family. We need that so we can
serve a state snapshot in lexicographic order on keys. Hashes prevented
us, because:
 * we lose the key itself when we hash and then prune the block the key
   was added in (as we only persist the key itself in blocks)
 * we won't be able to stream in lexicographic order if we store the key
   hashes only

The BLOCK_MERKLE_KEYS_CF column family and other parts of block merkle
don't change in any way.

The migration keeps track of what steps have been executed such that if
the migration process crashes, it can start fresh or continue from where
it left off. Persisting migration state is implemented by introducing
the `Migration` EDBKeyType.

**Important note**: when this change is merged, the DB will no longer be
backwards compatible. Therefore, a tool that migrates an existing DB so
that it is compatible with the change in this PR is provided too.

In terms of the migration tool, we provide:
 * ability to recover if migration crashed in the middle
 * unit tests
 * automatically detect if migration is needed
  • Loading branch information
dartdart26 authored and arc-vmware committed Dec 16, 2021
1 parent 3d4ebf6 commit d7a9d38
Show file tree
Hide file tree
Showing 15 changed files with 879 additions and 45 deletions.
5 changes: 4 additions & 1 deletion kvbc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cmake_minimum_required (VERSION 3.2)
project(libkvbc VERSION 0.1.0.0 LANGUAGES CXX)

find_package(Boost ${MIN_BOOST_VERSION} COMPONENTS filesystem REQUIRED)

add_library(concord_block_update INTERFACE)
target_include_directories(concord_block_update INTERFACE
Expand Down Expand Up @@ -38,7 +39,8 @@ if (BUILD_ROCKSDB_STORAGE)
src/categorization/kv_blockchain.cpp
src/categorization/blocks.cpp
src/categorization/blockchain.cpp
src/categorization/block_merkle_category.cpp)
src/categorization/block_merkle_category.cpp
src/migrations/block_merkle_latest_ver_cf_migration.cpp)

endif (BUILD_ROCKSDB_STORAGE)
target_link_libraries(kvbc PUBLIC corebft util)
Expand All @@ -54,6 +56,7 @@ if(NOT BUILD_THIRDPARTY)
find_package(OpenSSL REQUIRED)
endif()
target_link_libraries(kvbc PRIVATE OpenSSL::Crypto)
target_link_libraries(kvbc PRIVATE ${Boost_LIBRARIES})

if (BUILD_TESTING)
add_subdirectory(test)
Expand Down
7 changes: 3 additions & 4 deletions kvbc/include/categorization/block_merkle_category.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class BlockMerkleCategory {
// Returns the latest *block* version of a key.
// Returns std::nullopt if the key doesn't exist.
std::optional<TaggedVersion> getLatestVersion(const std::string& key) const;
std::optional<TaggedVersion> getLatestVersion(const Hash& key) const;

// Get values for keys at specific versions.
// `keys` and `versions` must be the same size.
Expand All @@ -68,7 +67,6 @@ class BlockMerkleCategory {
// If a key is missing, std::nullopt is returned for its version.
void multiGetLatestVersion(const std::vector<std::string>& keys,
std::vector<std::optional<TaggedVersion>>& versions) const;
void multiGetLatestVersion(const std::vector<Hash>& keys, std::vector<std::optional<TaggedVersion>>& versions) const;

std::vector<std::string> getBlockStaleKeys(BlockId, const BlockMerkleOutput&) const;
// Delete the given block ID as a genesis one.
Expand Down Expand Up @@ -172,8 +170,9 @@ class BlockMerkleCategory {
// 3. Atomically write the batch to the database.
void deleteStaleBatch(uint64_t start, uint64_t end);

// Retrieve the latest versions for all raw keys in a block and return them along with the hashed keys.
std::pair<std::vector<Hash>, std::vector<std::optional<TaggedVersion>>> getLatestVersions(
// Retrieve the latest versions for all raw keys in a block and return them along with the keys and the hashed keys.
// Returned tuple contains (list_of_key_hashes, list_of_keys, list_of_versions).
std::tuple<std::vector<Hash>, std::vector<std::string>, std::vector<std::optional<TaggedVersion>>> getLatestVersions(
const BlockMerkleOutput& out) const;

// Return a map from block id to all hashed keys that were still active in previously pruned blocks.
Expand Down
77 changes: 77 additions & 0 deletions kvbc/include/migrations/block_merkle_latest_ver_cf_migration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Concord
//
// Copyright (c) 2021 VMware, Inc. All Rights Reserved.
//
// This product is licensed to you under the Apache 2.0 license (the "License").
// You may not use this product except in compliance with the Apache 2.0 License.
//
// This product may include a number of subcomponents with separate copyright
// notices and license terms. Your use of these subcomponents is subject to the
// terms and conditions of the sub-component's license, as noted in the
// LICENSE file.

#pragma once

#include "rocksdb/native_client.h"

#include <rocksdb/utilities/checkpoint.h>

#include <exception>
#include <memory>
#include <optional>
#include <string>
#include <string_view>

namespace concord::kvbc::migrations {

// Migrates a RocksDB DB from using key hashes in the `block_merkle_latest_key_version` to using raw keys.
// Keeps track of what steps have been executed such that if the migration process crashes, it can start fresh or
// continue from where it left off.
class BlockMerkleLatestVerCfMigration {
public:
// Note: `export_path` must be on the same filesystem as `db_path`.
BlockMerkleLatestVerCfMigration(const std::string& db_path, const std::string& export_path);

public:
static const std::string& temporaryColumnFamily();
static const std::string& migrationKey();

// Migration states.
static inline const std::string kStateImportedTempCf{"imported-temp-cf"};
static inline const std::string kStateMigrated{"migrated"};
static inline const std::string kStateMigrationNotNeededOrCompleted{"migration-not-needed-or-completed"};

enum class ExecutionStatus {
kExecuted, // executed as part of this call
kNotNeededOrAlreadyExecuted, // migration is not needed or already executed and, therefore, nothing done in this
// call
};

public:
// Executes the migration, throwing on error.
// If execute() returns, it is always a success. The ExecutionStatus gives indication as to what the actual outcome
// is.
ExecutionStatus execute();

std::shared_ptr<storage::rocksdb::NativeClient> db() { return db_; }

// Following methods are used for testing purposes only. Do not use in production.
public:
void removeExportDir();
void checkpointDB();
void exportLatestVerCf();
void importTempLatestVerCf();
void clearExistingLatestVerCf();
void iterateAndMigrate();
void dropTempLatestVerCf();
void commitComplete();

private:
const std::string db_path_;
const std::string export_path_;
std::shared_ptr<storage::rocksdb::NativeClient> db_;
std::unique_ptr<::rocksdb::Checkpoint> checkpoint_;
std::unique_ptr<::rocksdb::ExportImportFilesMetaData> export_metadata_;
};

} // namespace concord::kvbc::migrations
64 changes: 31 additions & 33 deletions kvbc/src/categorization/block_merkle_category.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,16 @@ std::vector<Buffer> versionedKeys(const std::vector<std::string>& keys, const st
return versioned_keys;
}

void putLatestKeyVersion(NativeWriteBatch& batch, const Hash& key_hash, TaggedVersion version) {
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key_hash, serializeThreadLocal(LatestKeyVersion{version.encode()}));
void putLatestKeyVersion(NativeWriteBatch& batch, const std::string& key, TaggedVersion version) {
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key, serializeThreadLocal(LatestKeyVersion{version.encode()}));
}

void putKeys(NativeWriteBatch& batch,
uint64_t block_id,
std::vector<KeyHash>&& hashed_added_keys,
std::vector<KeyHash>&& hashed_deleted_keys,
BlockMerkleInput& updates) {
auto kv_it = updates.kv.begin();
auto kv_it = updates.kv.cbegin();
for (auto key_it = hashed_added_keys.begin(); key_it != hashed_added_keys.end(); key_it++) {
// Only serialize the Header of a DBValue, to prevent the need to copy a potentially large value.
auto header = toSlice(serializeThreadLocal(DbValueHeader{false, static_cast<uint32_t>(kv_it->second.size())}));
Expand All @@ -177,17 +177,19 @@ void putKeys(NativeWriteBatch& batch,
batch.put(BLOCK_MERKLE_KEYS_CF, serialize(VersionedKey{*key_it, block_id}), val);

// Put the latest version of the key
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key_it->value, serialize(LatestKeyVersion{block_id}));
batch.put(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, kv_it->first, serialize(LatestKeyVersion{block_id}));

kv_it++;
++kv_it;
}

const bool deleted = true;
auto deletes_it = updates.deletes.cbegin();
for (auto key_it = hashed_deleted_keys.begin(); key_it != hashed_deleted_keys.end(); key_it++) {
// Write a tombstone to the value. This is necessary for deleteLastReachable().
auto tombstone = serialize(DbValueHeader{true, 0});
batch.put(BLOCK_MERKLE_KEYS_CF, serialize(VersionedKey{*key_it, block_id}), tombstone);
putLatestKeyVersion(batch, key_it->value, TaggedVersion(deleted, block_id));
putLatestKeyVersion(batch, *deletes_it, TaggedVersion(deleted, block_id));
++deletes_it;
}
}

Expand Down Expand Up @@ -253,13 +255,15 @@ void removeMerkleNodes(NativeWriteBatch& batch, BlockId block_id, uint64_t tree_
// Return any active key hashes.
std::vector<KeyHash> deleteInactiveKeys(BlockId block_id,
std::vector<Hash>&& hashed_keys,
std::vector<std::string>&& keys,
const std::vector<std::optional<TaggedVersion>>& latest_versions,
NativeWriteBatch& batch,
size_t& deletes_counter) {
std::vector<KeyHash> active_keys;
for (auto i = 0u; i < hashed_keys.size(); i++) {
auto& tagged_version = latest_versions[i];
auto& hashed_key = hashed_keys[i];
auto& key = keys[i];
ConcordAssert(tagged_version.has_value());
ConcordAssertLE(block_id, tagged_version->version);

Expand All @@ -268,7 +272,7 @@ std::vector<KeyHash> deleteInactiveKeys(BlockId block_id,
// The latest version is a tombstone. We can delete the key and version.
auto versioned_key = serialize(VersionedKey{KeyHash{hashed_key}, block_id});
batch.del(BLOCK_MERKLE_KEYS_CF, versioned_key);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
deletes_counter++;
} else {
active_keys.push_back(KeyHash{hashed_key});
Expand Down Expand Up @@ -356,21 +360,16 @@ std::optional<Value> BlockMerkleCategory::get(const Hash& hashed_key, BlockId bl
}

std::optional<Value> BlockMerkleCategory::getLatest(const std::string& key) const {
auto hashed_key = hash(key);
if (auto latest = getLatestVersion(hashed_key)) {
if (auto latest = getLatestVersion(key)) {
if (!latest->deleted) {
return get(hashed_key, latest->version);
return get(hash(key), latest->version);
}
}
return std::nullopt;
}

std::optional<TaggedVersion> BlockMerkleCategory::getLatestVersion(const std::string& key) const {
return getLatestVersion(hash(key));
}

std::optional<TaggedVersion> BlockMerkleCategory::getLatestVersion(const Hash& hashed_key) const {
const auto serialized = db_->getSlice(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key);
const auto serialized = db_->getSlice(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
if (!serialized) {
return std::nullopt;
}
Expand Down Expand Up @@ -418,16 +417,10 @@ void BlockMerkleCategory::multiGet(const std::vector<Buffer>& versioned_keys,

void BlockMerkleCategory::multiGetLatestVersion(const std::vector<std::string>& keys,
std::vector<std::optional<TaggedVersion>>& versions) const {
auto hashed_keys = hashedKeys(keys);
multiGetLatestVersion(hashed_keys, versions);
}

void BlockMerkleCategory::multiGetLatestVersion(const std::vector<Hash>& hashed_keys,
std::vector<std::optional<TaggedVersion>>& versions) const {
auto slices = std::vector<::rocksdb::PinnableSlice>{};
auto statuses = std::vector<::rocksdb::Status>{};

db_->multiGet(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_keys, slices, statuses);
db_->multiGet(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, keys, slices, statuses);
versions.clear();
for (auto i = 0ull; i < slices.size(); ++i) {
const auto& status = statuses[i];
Expand All @@ -448,7 +441,7 @@ void BlockMerkleCategory::multiGetLatest(const std::vector<std::string>& keys,
std::vector<std::optional<Value>>& values) const {
auto hashed_keys = hashedKeys(keys);
std::vector<std::optional<TaggedVersion>> versions;
multiGetLatestVersion(hashed_keys, versions);
multiGetLatestVersion(keys, versions);

// Generate the set of versioned keys for all keys that have latest versions and are not deleted
auto versioned_keys = std::vector<Buffer>{};
Expand Down Expand Up @@ -546,7 +539,8 @@ std::pair<SetOfKeyValuePairs, KeysVector> BlockMerkleCategory::rewriteAlreadyPru

std::vector<std::string> BlockMerkleCategory::getBlockStaleKeys(BlockId block_id, const BlockMerkleOutput& out) const {
std::vector<Hash> hash_stale_keys;
auto [hashed_keys, latest_versions] = getLatestVersions(out);
auto [hashed_keys, _, latest_versions] = getLatestVersions(out);
(void)_;
for (auto i = 0u; i < hashed_keys.size(); i++) {
auto& tagged_version = latest_versions[i];
auto& hashed_key = hashed_keys[i];
Expand Down Expand Up @@ -576,14 +570,15 @@ std::vector<std::string> BlockMerkleCategory::getBlockStaleKeys(BlockId block_id
size_t BlockMerkleCategory::deleteGenesisBlock(BlockId block_id,
const BlockMerkleOutput& out,
NativeWriteBatch& batch) {
auto [hashed_keys, latest_versions] = getLatestVersions(out);
auto [hashed_keys, keys, latest_versions] = getLatestVersions(out);
auto overwritten_active_keys_from_pruned_blocks = findActiveKeysFromPrunedBlocks(hashed_keys);
size_t num_of_deletes = 0;
for (auto& kv : overwritten_active_keys_from_pruned_blocks) {
num_of_deletes += kv.second.size();
}
auto [block_adds, block_removes] = rewriteAlreadyPrunedBlocks(overwritten_active_keys_from_pruned_blocks, batch);
auto active_keys = deleteInactiveKeys(block_id, std::move(hashed_keys), latest_versions, batch, num_of_deletes);
auto active_keys =
deleteInactiveKeys(block_id, std::move(hashed_keys), std::move(keys), latest_versions, batch, num_of_deletes);
if (active_keys.empty()) {
block_removes.push_back(merkleKey(block_id));
} else {
Expand Down Expand Up @@ -616,34 +611,37 @@ void BlockMerkleCategory::deleteLastReachableBlock(BlockId block_id,
// Preserve the deleted flag from the value into the version index.
auto deleted = Deleted{};
deserialize(iter.valueView(), deleted);
putLatestKeyVersion(batch, hashed_key.value, TaggedVersion(deleted.value, prev_key.version));
putLatestKeyVersion(batch, key, TaggedVersion(deleted.value, prev_key.version));
} else {
// This is the only version of the key - remove the latest version index too.
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key.value);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
}
} else {
// No previous keys means this is the only version of the key - remove the latest version index too.
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, hashed_key.value);
batch.del(BLOCK_MERKLE_LATEST_KEY_VERSION_CF, key);
}
// Remove the value for the key at `block_id`.
batch.del(BLOCK_MERKLE_KEYS_CF, versioned_key);
}
removeMerkleNodes(batch, block_id, out.state_root_version);
}

std::pair<std::vector<Hash>, std::vector<std::optional<TaggedVersion>>> BlockMerkleCategory::getLatestVersions(
const BlockMerkleOutput& out) const {
std::tuple<std::vector<Hash>, std::vector<std::string>, std::vector<std::optional<TaggedVersion>>>
BlockMerkleCategory::getLatestVersions(const BlockMerkleOutput& out) const {
std::vector<Hash> hashed_keys;
std::vector<std::string> keys;
hashed_keys.reserve(out.keys.size());
keys.reserve(out.keys.size());
for (auto& [key, _] : out.keys) {
(void)_;
hashed_keys.push_back(hash(key));
keys.push_back(key);
}

std::vector<std::optional<TaggedVersion>> latest_versions;
multiGetLatestVersion(hashed_keys, latest_versions);
multiGetLatestVersion(keys, latest_versions);

return std::make_pair(hashed_keys, latest_versions);
return std::make_tuple(hashed_keys, keys, latest_versions);
}

void BlockMerkleCategory::putLastDeletedTreeVersion(uint64_t tree_version, NativeWriteBatch& batch) {
Expand Down
6 changes: 6 additions & 0 deletions kvbc/src/categorization/kv_blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "kvbc_key_types.hpp"
#include "categorization/db_categories.h"
#include "endianness.hpp"
#include "migrations/block_merkle_latest_ver_cf_migration.h"

#include <stdexcept>

Expand Down Expand Up @@ -71,6 +72,11 @@ KeyValueBlockchain::KeyValueBlockchain(const std::shared_ptr<concord::storage::r
linkSTChainFrom(getLastReachableBlockId() + 1);
delete_metrics_comp_.Register();
add_metrics_comp_.Register();

// When we use this version of the code that uses the migrated DB format (or a completely fresh blockchain), we no
// longer need migration. That assumes we never run this version of the code on an old DB format (before migrating).
native_client_->put(migrations::BlockMerkleLatestVerCfMigration::migrationKey(),
migrations::BlockMerkleLatestVerCfMigration::kStateMigrationNotNeededOrCompleted);
}

void KeyValueBlockchain::initNewBlockchainCategories(
Expand Down
Loading

0 comments on commit d7a9d38

Please sign in to comment.