Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip]Elerer/no row cache #2631

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CONCORD_BFT_DOCKER_REPO?=concordbft/
CONCORD_BFT_DOCKER_IMAGE?=concord-bft
CONCORD_BFT_DOCKER_IMAGE_VERSION?=0.41
CONCORD_BFT_DOCKER_IMAGE_VERSION?=0.40
CONCORD_BFT_DOCKER_CONTAINER?=concord-bft

CONCORD_BFT_DOCKERFILE?=Dockerfile
Expand Down
5 changes: 4 additions & 1 deletion bftengine/include/bftengine/DbCheckpointManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class DbCheckpointManager {
std::shared_ptr<bftEngine::impl::PersistentStorage> p,
std::shared_ptr<concordMetrics::Aggregator> aggregator,
const std::function<BlockId()>& getLastBlockIdCb,
const PrepareCheckpointCallback& prepareCheckpointCb);
const PrepareCheckpointCallback& prepareCheckpointCb,
const std::function<void(bool)>& checkpointInProcessCb);
std::map<CheckpointId, DbCheckpointMetadata::DbCheckPointDescriptor> getListOfDbCheckpoints() const {
return dbCheckptMetadata_.dbCheckPoints_;
}
Expand All @@ -122,6 +123,7 @@ class DbCheckpointManager {
void sendInternalCreateDbCheckpointMsg(const SeqNum& seqNum, bool noop);
BlockId getLastReachableBlock() const;
SeqNum getLastStableSeqNum() const;
void setCheckpointInProcess(bool) const;
void setOnStableSeqNumCb_(std::function<void(SeqNum)> cb) { onStableSeqNumCb_ = cb; }
void onStableSeqNum(SeqNum s) {
if (onStableSeqNumCb_) onStableSeqNumCb_(s);
Expand Down Expand Up @@ -165,6 +167,7 @@ class DbCheckpointManager {
void cleanUp();
std::function<BlockId()> getLastBlockIdCb_;
PrepareCheckpointCallback prepareCheckpointCb_;
std::function<void(bool)> checkpointInProcessCb_;
// get total size recursively
uint64_t directorySize(const _fs::path& directory, const bool& excludeHardLinks, bool recursive);
// get checkpoint metadata
Expand Down
6 changes: 5 additions & 1 deletion bftengine/include/bftengine/ReplicaConfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class ReplicaConfig : public concord::serialize::SerializableFactory<ReplicaConf
0,
"Port to be used to communicate with the diagnostic server using"
"the concord-ctl script");
CONFIG_PARAM(kvBlockchainVersion, std::uint32_t, 1u, "Default version of KV blockchain for this replica");

// Parameter to enable/disable waiting for transaction data to be persisted.
// Not predefined configuration parameters
Expand Down Expand Up @@ -404,6 +405,7 @@ class ReplicaConfig : public concord::serialize::SerializableFactory<ReplicaConf
serialize(outStream, enablePreProcessorMemoryPool);
serialize(outStream, diagnosticsServerPort);
serialize(outStream, useUnifiedCertificates);
serialize(outStream, kvBlockchainVersion);
}
void deserializeDataMembers(std::istream& inStream) {
deserialize(inStream, isReadOnly);
Expand Down Expand Up @@ -503,6 +505,7 @@ class ReplicaConfig : public concord::serialize::SerializableFactory<ReplicaConf
deserialize(inStream, enablePreProcessorMemoryPool);
deserialize(inStream, diagnosticsServerPort);
deserialize(inStream, useUnifiedCertificates);
deserialize(inStream, kvBlockchainVersion);
}

private:
Expand Down Expand Up @@ -595,7 +598,8 @@ inline std::ostream& operator<<(std::ostream& os, const ReplicaConfig& rc) {
rc.operatorEnabled_,
rc.enablePreProcessorMemoryPool,
rc.diagnosticsServerPort,
rc.useUnifiedCertificates);
rc.useUnifiedCertificates,
rc.kvBlockchainVersion);
os << ", ";
for (auto& [param, value] : rc.config_params_) os << param << ": " << value << "\n";
return os;
Expand Down
10 changes: 9 additions & 1 deletion bftengine/src/bftengine/DbCheckpointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Status DbCheckpointManager::createDbCheckpoint(const CheckpointId& checkPointId,
for (auto& cb : onDbCheckpointCreated_) {
if (cb) cb(seqNum);
}
checkpointInProcessCb_(false);
}
updateMetrics();
return Status::OK();
Expand Down Expand Up @@ -167,7 +168,8 @@ void DbCheckpointManager::initializeDbCheckpointManager(std::shared_ptr<concord:
std::shared_ptr<bftEngine::impl::PersistentStorage> p,
std::shared_ptr<concordMetrics::Aggregator> aggregator,
const std::function<BlockId()>& getLastBlockIdCb,
const PrepareCheckpointCallback& prepareCheckpointCb) {
const PrepareCheckpointCallback& prepareCheckpointCb,
const std::function<void(bool)>& checkpointInProcessCb) {
dbClient_ = dbClient;
ps_ = p;
dbCheckPointDirPath_ = ReplicaConfig::instance().getdbCheckpointDirPath();
Expand All @@ -176,6 +178,7 @@ void DbCheckpointManager::initializeDbCheckpointManager(std::shared_ptr<concord:
std::min(ReplicaConfig::instance().maxNumberOfDbCheckpoints, bftEngine::impl::MAX_ALLOWED_CHECKPOINTS);
metrics_.SetAggregator(aggregator);
if (getLastBlockIdCb) getLastBlockIdCb_ = getLastBlockIdCb;
if (checkpointInProcessCb) checkpointInProcessCb_ = checkpointInProcessCb;
prepareCheckpointCb_ = prepareCheckpointCb;
if (ReplicaConfig::instance().dbCheckpointFeatureEnabled) {
// in case of upgrade, we need to set the lastStableCheckpointSeqNum from persistence
Expand Down Expand Up @@ -374,6 +377,11 @@ SeqNum DbCheckpointManager::getLastStableSeqNum() const {
if (getLastStableSeqNumCb_) return getLastStableSeqNumCb_();
return 0;
}

void DbCheckpointManager::setCheckpointInProcess(bool flag) const {
if (checkpointInProcessCb_) checkpointInProcessCb_(flag);
}

BlockId DbCheckpointManager::getLastReachableBlock() const {
if (getLastBlockIdCb_) return getLastBlockIdCb_();
return 0;
Expand Down
1 change: 1 addition & 0 deletions bftengine/src/bftengine/RequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void RequestHandler::execute(IRequestsHandler::ExecutionRequestsQueue& requests,
std::vector<std::uint8_t>(req.request, req.request + req.requestSize), createDbChkPtMsg);
if (!createDbChkPtMsg.noop) {
const auto& lastStableSeqNum = DbCheckpointManager::instance().getLastStableSeqNum();
DbCheckpointManager::instance().setCheckpointInProcess(true);
if (lastStableSeqNum == static_cast<SeqNum>(createDbChkPtMsg.seqNum)) {
DbCheckpointManager::instance().createDbCheckpointAsync(createDbChkPtMsg.seqNum, timestamp, std::nullopt);
} else {
Expand Down
2 changes: 1 addition & 1 deletion client/client_pool/test/client_pool_timer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ using TestClient = std::shared_ptr<void>;

TEST(client_pool_timer, work_items) {
uint16_t num_times_called = 0;
std::chrono::milliseconds timeout = 1ms;
std::chrono::milliseconds timeout = 10ms;
auto timer = Timer<TestClient>(timeout, [&num_times_called](TestClient&& c) -> void { num_times_called++; });

// Wait for timeout
Expand Down
20 changes: 11 additions & 9 deletions install_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ apt-get update && apt-get ${APT_GET_FLAGS} install \
sudo \
vim \
iproute2 \
wget
wget \
bison \
flex

update-alternatives --install /usr/bin/clang clang /usr/lib/llvm-9/bin/clang 100
update-alternatives --install /usr/bin/clang++ clang++ /usr/lib/llvm-9/bin/clang++ 100
Expand Down Expand Up @@ -185,16 +187,16 @@ git clone https://github.com/relic-toolkit/relic && \

cd ${HOME}
wget ${WGET_FLAGS} \
https://github.com/facebook/rocksdb/archive/v6.8.1.tar.gz && \
tar -xzf v6.8.1.tar.gz && \
rm v6.8.1.tar.gz && \
cd rocksdb-6.8.1 && \
https://github.com/facebook/rocksdb/archive/v6.29.3.tar.gz && \
tar -xzf v6.29.3.tar.gz && \
rm v6.29.3.tar.gz && \
cd rocksdb-6.29.3 && \
EXTRA_CXXFLAGS="-fno-omit-frame-pointer -g " \
EXTRA_CFLAGS="-fno-omit-frame-pointer -g " \
PORTABLE=1 make -j$(nproc) USE_RTTI=1 shared_lib && \
PORTABLE=1 make install-shared && \
cd ${HOME} && \
rm -r rocksdb-6.8.1
rm -r rocksdb-6.29.3

cd ${HOME}
git clone https://github.com/emil-e/rapidcheck.git && \
Expand Down Expand Up @@ -335,15 +337,15 @@ git clone -b v0.9.7 --depth 1 https://github.com/yhirose/cpp-httplib && \

# Thrift is the protocol used by Jaeger to export metrics
cd $HOME
wget ${WGET_FLAGS} https://archive.apache.org/dist/thrift/0.11.0/thrift-0.11.0.tar.gz && \
tar xzf thrift-0.11.0.tar.gz && \
git clone -b 0.11.0 --depth 1 https://github.com/apache/thrift.git thrift-0.11.0 && \
cd thrift-0.11.0 && \
./bootstrap.sh && \
./configure CXXFLAGS='-g -O2' \
--without-python --enable-static --disable-shared \
--disable-tests --disable-tutorial --disable-coverage && \
make -j$(nproc) install && \
cd ${HOME} && \
rm -r thrift-0.11.0 thrift-0.11.0.tar.gz
rm -r thrift-0.11.0

# TODO: Upgrade to opentelemetry-cpp
# Tracing via Jaeger and Thrift protocol
Expand Down
21 changes: 20 additions & 1 deletion kvbc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ add_library(kvbc src/ClientImp.cpp
src/resources-manager/ReplicaResources.cpp
src/resources-manager/AdaptivePruningManager.cpp
src/resources-manager/IntervalMappingResourceManager.cpp

src/blockchain_misc.cpp
src/kvbc_adapter/common/state_snapshot_adapter.cpp
src/kvbc_adapter/categorization/db_checkpoint_adapter.cpp
src/kvbc_adapter/categorization/kv_blockchain_adapter.cpp
src/kvbc_adapter/categorization/app_state_adapter.cpp
src/kvbc_adapter/categorization/blocks_deleter_adapter.cpp

src/kvbc_adapter/v4blockchain/blocks_deleter_adapter.cpp
src/kvbc_adapter/v4blockchain/app_state_adapter.cpp

src/kvbc_adapter/replica_adapter.cpp
)

if (BUILD_ROCKSDB_STORAGE)
Expand All @@ -43,7 +55,14 @@ if (BUILD_ROCKSDB_STORAGE)
src/categorization/blocks.cpp
src/categorization/blockchain.cpp
src/categorization/block_merkle_category.cpp
src/migrations/block_merkle_latest_ver_cf_migration.cpp)
src/migrations/block_merkle_latest_ver_cf_migration.cpp

src/v4blockchain/v4_blockchain.cpp
src/v4blockchain/detail/latest_keys.cpp
src/v4blockchain/detail/categories.cpp
src/v4blockchain/detail/blocks.cpp
src/v4blockchain/detail/st_chain.cpp
src/v4blockchain/detail/blockchain.cpp)

endif (BUILD_ROCKSDB_STORAGE)
target_link_libraries(kvbc PUBLIC corebft util)
Expand Down
29 changes: 14 additions & 15 deletions kvbc/benchmark/kvbcbench/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <cstddef>
#include <iostream>
#include <memory>
#include <random>

#include <boost/program_options.hpp>
#include <boost/program_options/errors.hpp>
Expand All @@ -27,20 +26,20 @@
#include <rocksdb/statistics.h>
#include <rocksdb/table.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/perf_context.h>

#include "categorization/base_types.h"
#include "categorization/column_families.h"
#include "categorization/updates.h"
#include "categorized_kvbc_msgs.cmf.hpp"
#include "categorization/kv_blockchain.h"
#include "kvbc_adapter/replica_adapter.hpp"
#include "performance_handler.h"
#include "rocksdb/native_client.h"
#include "diagnostics.h"
#include "diagnostics_server.h"
#include "input.h"
#include "pre_execution.h"
#include "ReplicaResources.h"

using namespace std;

Expand Down Expand Up @@ -233,8 +232,8 @@ std::shared_ptr<rocksdb::Statistics> completeRocksdbConfiguration(

// Use the same block cache and table options for all column familes for now.
for (auto& d : cf_descs) {
auto* cf_table_options =
reinterpret_cast<::rocksdb::BlockBasedTableOptions*>(d.options.table_factory->GetOptions());
auto* cf_table_options = reinterpret_cast<::rocksdb::BlockBasedTableOptions*>(
d.options.table_factory->GetOptions<::rocksdb::BlockBasedTableOptions>());
cf_table_options->block_cache = table_options.block_cache;
cf_table_options->filter_policy.reset(::rocksdb::NewBloomFilterPolicy(10, false));
}
Expand All @@ -261,7 +260,7 @@ PreExecConfig preExecConfig(const po::variables_map& config,

void addBlocks(const po::variables_map& config,
std::shared_ptr<storage::rocksdb::NativeClient>& db,
categorization::KeyValueBlockchain& kvbc,
adapter::ReplicaBlockchain& kvbc,
InputData& input,
std::shared_ptr<diagnostics::Recorder>& add_block_recorder,
std::shared_ptr<diagnostics::Recorder>& conflict_detection_recorder) {
Expand Down Expand Up @@ -315,11 +314,11 @@ void addBlocks(const po::variables_map& config,
updates.add(kCategoryMerkle, categorization::BlockMerkleUpdates(std::move(merkle_input)));
updates.add(kCategoryImmutable, std::move(immutable_updates));
updates.add(kCategoryVersioned, std::move(versioned_updates));
kvbc.addBlock(std::move(updates));
kvbc.add(std::move(updates));
} else {
auto&& merkle_input = std::move(input.block_merkle_input[i - 1]);
updates.add(kCategoryMerkle, categorization::BlockMerkleUpdates(std::move(merkle_input)));
kvbc.addBlock(std::move(updates));
kvbc.add(std::move(updates));
}
}
}
Expand Down Expand Up @@ -361,13 +360,13 @@ int main(int argc, char** argv) {
};
auto opts = storage::rocksdb::NativeClient::UserOptions{"kvbcbench_rocksdb_opts.ini", completeInit};
auto db = storage::rocksdb::NativeClient::newClient(config["rocksdb-path"].as<std::string>(), false, opts);
auto kvbc = kvbc::categorization::KeyValueBlockchain(
db,
false,
std::map<std::string, kvbc::categorization::CATEGORY_TYPE>{
{kCategoryMerkle, kvbc::categorization::CATEGORY_TYPE::block_merkle},
{kCategoryImmutable, kvbc::categorization::CATEGORY_TYPE::immutable},
{kCategoryVersioned, kvbc::categorization::CATEGORY_TYPE::versioned_kv}});
auto kvbc =
kvbc::adapter::ReplicaBlockchain(db,
false,
std::map<std::string, kvbc::categorization::CATEGORY_TYPE>{
{kCategoryMerkle, kvbc::categorization::CATEGORY_TYPE::block_merkle},
{kCategoryImmutable, kvbc::categorization::CATEGORY_TYPE::immutable},
{kCategoryVersioned, kvbc::categorization::CATEGORY_TYPE::versioned_kv}});

auto pre_exec_config = preExecConfig(config, input.block_merkle_read_keys.size(), input.ver_read_keys.size());
auto pre_exec_sim = PreExecutionSimulator(pre_exec_config, input.block_merkle_read_keys, input.ver_read_keys, kvbc);
Expand Down
6 changes: 3 additions & 3 deletions kvbc/benchmark/kvbcbench/pre_execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "assertUtils.hpp"
#include "categorization/block_merkle_category.h"
#include "categorization/kv_blockchain.h"
#include "kvbc_adapter/replica_adapter.hpp"
#include "input.h"

namespace concord::kvbc::bench {
Expand Down Expand Up @@ -46,7 +46,7 @@ class PreExecutionSimulator {
PreExecutionSimulator(const PreExecConfig& config,
const ReadKeys& merkle_read_keys,
const ReadKeys& versioned_read_keys,
categorization::KeyValueBlockchain& kvbc)
adapter::ReplicaBlockchain& kvbc)
: config_(config), merkle_read_keys_(merkle_read_keys), versioned_read_keys_(versioned_read_keys), kvbc_(kvbc) {}

void start() {
Expand Down Expand Up @@ -106,7 +106,7 @@ class PreExecutionSimulator {
const ReadKeys& merkle_read_keys_;
const ReadKeys& versioned_read_keys_;

categorization::KeyValueBlockchain& kvbc_;
adapter::ReplicaBlockchain& kvbc_;
};

} // namespace concord::kvbc::bench
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ void completeRocksdbConfiguration(::rocksdb::Options& db_options,

// Use the same block cache and table options for all column familes for now.
for (auto& d : cf_descs) {
auto* cf_table_options =
reinterpret_cast<::rocksdb::BlockBasedTableOptions*>(d.options.table_factory->GetOptions());
auto* cf_table_options = reinterpret_cast<::rocksdb::BlockBasedTableOptions*>(
d.options.table_factory->GetOptions<::rocksdb::BlockBasedTableOptions>());
cf_table_options->block_cache = table_options.block_cache;
cf_table_options->filter_policy.reset(::rocksdb::NewBloomFilterPolicy(10, false));
}
Expand Down
Loading